Розетки:Как отправить данные клиенту, не «ожидая» их получения/анализа

StackOverflow https://stackoverflow.com/questions/1134658

Вопрос

У меня есть сервер сокетов, написанный на C++ с использованием boost::asio, и я отправляю данные клиенту.

Сервер отправляет данные порциями, а клиент анализирует каждый фрагмент по мере его получения.Оба сейчас в значительной степени однопоточные.

Какой дизайн мне следует использовать на сервере, чтобы гарантировать, что сервер просто записывает данные так быстро, как только может, и никогда не ждет, пока клиент их проанализирует?Я предполагаю, что мне нужно сделать что-то асинхронное на сервере.

Я предполагаю, что для этого можно внести изменения и в клиенте, но в идеале сервер не должен ждать клиента независимо от того, как он написан.

Я записываю данные в сокет следующим образом:

size_t bytesWritten = m_Socket.Write( boost::asio::buffer(buffer, bufferSize));

Обновлять:

Я собираюсь попробовать использовать механизм Boost для асинхронной записи в сокет.Видеть http://www.boost.org/doc/libs/1_36_0/doc/html/boost_asio/tutorial/tutdaytime3/src.html

например

 boost::asio::async_write(socket_, boost::asio::buffer(message_),
        boost::bind(&tcp_connection::handle_write, shared_from_this(),
          boost::asio::placeholders::error,
          boost::asio::placeholders::bytes_transferred));
  • Алекс
Это было полезно?

Решение

Если вы установите для сокета неблокирующий режим, записи должны завершиться неудачно, если в противном случае они блокировались бы.Затем вы можете поставить данные в очередь по своему усмотрению и организовать еще одну попытку позже для их записи.Я не знаю, как установить параметры сокета в API сокетов Boost, но это то, что вам нужно.

Но это, вероятно, больше хлопот, чем пользы.Вам нужно будет выбрать сокет, готовый к записи, предположительно из нескольких открытых одновременно, запихнуть в него больше данных, пока он не заполнится, и повторить.Я не знаю, есть ли у API буст-сокетов эквивалент select, чтобы вы могли одновременно ждать в нескольких сокетах, пока какой-либо из них не будет готов к записи.

Причина, по которой серверы обычно запускают поток (или порождают процесс) для каждого клиентского соединения, заключается именно в том, что они могут продолжать обслуживать других клиентов, пока те ожидают ввода-вывода, избегая при этом создания собственных очередей.Самый простой способ «организовать еще одну попытку позже» — просто заблокировать ввод-вывод в выделенном потоке.

Чего вы не можете сделать, если только boost не сделал что-то необычное в своем API сокетов, так это потребовать, чтобы ОС или библиотека сокетов ставили в очередь произвольные объемы данных для вас без блокировки.Может существовать асинхронный API, который перезвонит вам при записи данных.

Другие советы

Вы можете обеспечить асинхронную связь, передавая данные не по TCP, а по UDP.Однако если вам нужно использовать TCP, позвольте клиенту быстро сохранить данные и обработать их в другом потоке или асинхронно с помощью задания cron.

Когда вы передаете данные в сокет, он не ждет, пока получатель обработает их.Он даже не ждет передачи данных.Данные помещаются в исходящую очередь, которая обрабатывается ОС в фоновом режиме.Функция записи возвращает количество байтов, поставленных в очередь для передачи, а не количество фактически переданных байтов.

В продолжение комментариев к посту Стефана:

Определенно возможно буферизовать как на стороне клиента, так и на стороне сервера.Но обязательно примите во внимание то, что написал Нил.Если мы просто начнем буферизовать данные вслепую и если обработка никогда не сможет справиться с отправкой, тогда наш буфер будет расти так, как нам, вероятно, не хочется.

Недавно я реализовал простой «NetworkPipe», который должен был функционировать как соединение между одним клиентом/сервером, сервером/клиентом, где внешний пользователь не знает/не заботится о том, является ли канал клиентом или сервером.Я реализовал ситуацию буферизации, аналогичную той, о которой вы спрашиваете, как?Ну, класс был многопоточным, и это был единственный способ, которым я смог четко буферизовать данные.Вот основной процесс, которому я следовал, и обратите внимание, что я установил максимальный размер для труб:

  1. Процесс 1 запускает канал, по умолчанию используется сервер.Теперь внутренний поток ждет клиента.
  2. Процесс 2 запускает канал, уже сервер, по умолчанию — Клиент.
  3. Теперь мы подключены, первое, что нужно сделать, это обменяться максимальными размерами буфера.
  4. Процесс 1 записывает данные (он отмечает, что на другом конце буфер пуст [см. № 3])
  5. Внутренний поток процесса 2 (теперь ожидающий вызова select() для сокета) видит, что данные отправляются, считывает их и буферизует.Теперь процесс 2 отправляет обратно новый размер буфера в P1.

Итак, это действительно упрощенная версия, но, по сути, используя ее потоки, я всегда могу дождаться вызова блокирующего выбора, как только данные поступают, я могу прочитать и буферизовать их, я отправляю обратно новый буферизованный размер.Вы могли бы сделать что-то подобное и буферизовать данные вслепую, на самом деле это немного проще, потому что вам не нужно обмениваться размерами буфера, но, вероятно, это плохая идея.Таким образом, приведенный выше пример позволил внешним пользователям читать/записывать данные, не блокируя их поток (если только буфер на другом конце не заполнен).

Я реализовал решение, используя метод boost::asio::async_write.

По сути:

  • У меня есть один поток на каждого клиента (мои потоки выполняют работу, связанную с процессором)
  • Когда каждый поток накапливает некоторый объем данных, он записывает их в сокет с помощью async_write, не заботясь о том, завершились ли предыдущие записи.
  • Код тщательно управляет временем жизни сокета и записываемыми буферами данных, поскольку обработка ЦП завершается до того, как все данные будут записаны.

Это хорошо работает для меня.Это позволяет потоку сервера завершить работу сразу после завершения работы процессора.

В целом время, необходимое клиенту для получения и анализа всех данных, сократилось.Аналогичным образом сокращается время (время часов на стене), которое сервер тратит на каждого клиента.

Фрагмент кода:

void SocketStream::Write(const char* data, unsigned int dataLength)
{
    // Make a copy of the data
    // we'll delete it when we get called back via HandleWrite
    char* dataCopy = new char[dataLength];
    memcpy( dataCopy,  data, dataLength );

    boost::asio::async_write
        (
        *m_pSocket,
        boost::asio::buffer(dataCopy, dataLength),
        boost::bind
            (
            &SocketStream::HandleWrite,                     // the address of the method to callback when the write is done
            shared_from_this(),                             // a pointer to this, using shared_from_this to keep us alive
            dataCopy,                                       // first parameter to the HandleWrite method
            boost::asio::placeholders::error,               // placeholder so that async_write can pass us values
            boost::asio::placeholders::bytes_transferred
            )
        );
}

void SocketStream::HandleWrite(const char* data, const boost::system::error_code& error, size_t bytes_transferred)
{
    // Deallocate the buffer now that its been written out
    delete data;

    if ( !error )
    {
        m_BytesWritten += bytes_transferred;
    }
    else
    {
        cout << "SocketStream::HandleWrite received error: " << error.message().c_str() << endl;
    }
}
Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top