Розетки:Как отправить данные клиенту, не «ожидая» их получения/анализа
-
16-09-2019 - |
Вопрос
У меня есть сервер сокетов, написанный на C++ с использованием boost::asio, и я отправляю данные клиенту.
Сервер отправляет данные порциями, а клиент анализирует каждый фрагмент по мере его получения.Оба сейчас в значительной степени однопоточные.
Какой дизайн мне следует использовать на сервере, чтобы гарантировать, что сервер просто записывает данные так быстро, как только может, и никогда не ждет, пока клиент их проанализирует?Я предполагаю, что мне нужно сделать что-то асинхронное на сервере.
Я предполагаю, что для этого можно внести изменения и в клиенте, но в идеале сервер не должен ждать клиента независимо от того, как он написан.
Я записываю данные в сокет следующим образом:
size_t bytesWritten = m_Socket.Write( boost::asio::buffer(buffer, bufferSize));
Обновлять:
Я собираюсь попробовать использовать механизм Boost для асинхронной записи в сокет.Видеть http://www.boost.org/doc/libs/1_36_0/doc/html/boost_asio/tutorial/tutdaytime3/src.html
например
boost::asio::async_write(socket_, boost::asio::buffer(message_),
boost::bind(&tcp_connection::handle_write, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
- Алекс
Решение
Если вы установите для сокета неблокирующий режим, записи должны завершиться неудачно, если в противном случае они блокировались бы.Затем вы можете поставить данные в очередь по своему усмотрению и организовать еще одну попытку позже для их записи.Я не знаю, как установить параметры сокета в API сокетов Boost, но это то, что вам нужно.
Но это, вероятно, больше хлопот, чем пользы.Вам нужно будет выбрать сокет, готовый к записи, предположительно из нескольких открытых одновременно, запихнуть в него больше данных, пока он не заполнится, и повторить.Я не знаю, есть ли у API буст-сокетов эквивалент select
, чтобы вы могли одновременно ждать в нескольких сокетах, пока какой-либо из них не будет готов к записи.
Причина, по которой серверы обычно запускают поток (или порождают процесс) для каждого клиентского соединения, заключается именно в том, что они могут продолжать обслуживать других клиентов, пока те ожидают ввода-вывода, избегая при этом создания собственных очередей.Самый простой способ «организовать еще одну попытку позже» — просто заблокировать ввод-вывод в выделенном потоке.
Чего вы не можете сделать, если только boost не сделал что-то необычное в своем API сокетов, так это потребовать, чтобы ОС или библиотека сокетов ставили в очередь произвольные объемы данных для вас без блокировки.Может существовать асинхронный API, который перезвонит вам при записи данных.
Другие советы
Вы можете обеспечить асинхронную связь, передавая данные не по TCP, а по UDP.Однако если вам нужно использовать TCP, позвольте клиенту быстро сохранить данные и обработать их в другом потоке или асинхронно с помощью задания cron.
Когда вы передаете данные в сокет, он не ждет, пока получатель обработает их.Он даже не ждет передачи данных.Данные помещаются в исходящую очередь, которая обрабатывается ОС в фоновом режиме.Функция записи возвращает количество байтов, поставленных в очередь для передачи, а не количество фактически переданных байтов.
В продолжение комментариев к посту Стефана:
Определенно возможно буферизовать как на стороне клиента, так и на стороне сервера.Но обязательно примите во внимание то, что написал Нил.Если мы просто начнем буферизовать данные вслепую и если обработка никогда не сможет справиться с отправкой, тогда наш буфер будет расти так, как нам, вероятно, не хочется.
Недавно я реализовал простой «NetworkPipe», который должен был функционировать как соединение между одним клиентом/сервером, сервером/клиентом, где внешний пользователь не знает/не заботится о том, является ли канал клиентом или сервером.Я реализовал ситуацию буферизации, аналогичную той, о которой вы спрашиваете, как?Ну, класс был многопоточным, и это был единственный способ, которым я смог четко буферизовать данные.Вот основной процесс, которому я следовал, и обратите внимание, что я установил максимальный размер для труб:
- Процесс 1 запускает канал, по умолчанию используется сервер.Теперь внутренний поток ждет клиента.
- Процесс 2 запускает канал, уже сервер, по умолчанию — Клиент.
- Теперь мы подключены, первое, что нужно сделать, это обменяться максимальными размерами буфера.
- Процесс 1 записывает данные (он отмечает, что на другом конце буфер пуст [см. № 3])
- Внутренний поток процесса 2 (теперь ожидающий вызова select() для сокета) видит, что данные отправляются, считывает их и буферизует.Теперь процесс 2 отправляет обратно новый размер буфера в P1.
Итак, это действительно упрощенная версия, но, по сути, используя ее потоки, я всегда могу дождаться вызова блокирующего выбора, как только данные поступают, я могу прочитать и буферизовать их, я отправляю обратно новый буферизованный размер.Вы могли бы сделать что-то подобное и буферизовать данные вслепую, на самом деле это немного проще, потому что вам не нужно обмениваться размерами буфера, но, вероятно, это плохая идея.Таким образом, приведенный выше пример позволил внешним пользователям читать/записывать данные, не блокируя их поток (если только буфер на другом конце не заполнен).
Я реализовал решение, используя метод boost::asio::async_write.
По сути:
- У меня есть один поток на каждого клиента (мои потоки выполняют работу, связанную с процессором)
- Когда каждый поток накапливает некоторый объем данных, он записывает их в сокет с помощью async_write, не заботясь о том, завершились ли предыдущие записи.
- Код тщательно управляет временем жизни сокета и записываемыми буферами данных, поскольку обработка ЦП завершается до того, как все данные будут записаны.
Это хорошо работает для меня.Это позволяет потоку сервера завершить работу сразу после завершения работы процессора.
В целом время, необходимое клиенту для получения и анализа всех данных, сократилось.Аналогичным образом сокращается время (время часов на стене), которое сервер тратит на каждого клиента.
Фрагмент кода:
void SocketStream::Write(const char* data, unsigned int dataLength)
{
// Make a copy of the data
// we'll delete it when we get called back via HandleWrite
char* dataCopy = new char[dataLength];
memcpy( dataCopy, data, dataLength );
boost::asio::async_write
(
*m_pSocket,
boost::asio::buffer(dataCopy, dataLength),
boost::bind
(
&SocketStream::HandleWrite, // the address of the method to callback when the write is done
shared_from_this(), // a pointer to this, using shared_from_this to keep us alive
dataCopy, // first parameter to the HandleWrite method
boost::asio::placeholders::error, // placeholder so that async_write can pass us values
boost::asio::placeholders::bytes_transferred
)
);
}
void SocketStream::HandleWrite(const char* data, const boost::system::error_code& error, size_t bytes_transferred)
{
// Deallocate the buffer now that its been written out
delete data;
if ( !error )
{
m_BytesWritten += bytes_transferred;
}
else
{
cout << "SocketStream::HandleWrite received error: " << error.message().c_str() << endl;
}
}