Вопрос

Используя POSIX threads & C ++, у меня есть "Операция вставки", которая может быть выполнена безопасно только по одному за раз.

Если у меня есть несколько потоков, ожидающих вставки с использованием pthread_join, то создаю новый поток когда он завершится.Будут ли все они получать сигнал "поток завершен" одновременно и создавать несколько вставок, или можно с уверенностью предположить, что поток, который первым получит сигнал "поток завершен", создаст новый поток, блокирующий другим создание новых потоков.

/* --- GLOBAL --- */
pthread_t insertThread;



/* --- DIFFERENT THREADS --- */
// Wait for Current insert to finish
pthread_join(insertThread, NULL); 

// Done start a new one
pthread_create(&insertThread, NULL, Insert, Data);

Спасибо вам за ответы

Программа по сути представляет собой огромную хэш-таблицу, которая принимает запросы от клиентов через сокеты.

Каждое новое клиентское соединение порождает новый поток, из которого оно затем может выполнять несколько операций, в частности поиск или вставку.поисковые запросы могут проводиться параллельно.Но вставки должны быть "повторно объединены" в один поток.Вы могли бы сказать, что операции поиска могут быть выполнены без создания нового потока для клиента, однако они могут занять некоторое время, вызывая блокировку сервера и отбрасывая новые запросы.Дизайн пытается максимально свести к минимуму системные вызовы и создание потоков.

Но теперь, когда я знаю, что это небезопасно, как я сначала думал, я должен быть в состоянии собрать что-нибудь вместе

Спасибо

Это было полезно?

Решение

От opengroup.org на pthread_join:

Результаты нескольких одновременных вызовов pthread_join(), указывающих один и тот же целевой поток, не определены.

Таким образом, у вас действительно не должно быть нескольких потоков, присоединяющихся к вашему предыдущему insertThread .

Во-первых, поскольку вы используете C ++, я рекомендую boost.поток.Они напоминают модель потоков POSIX, а также работают в Windows.И это поможет вам с C ++, т. е.делая функциональные объекты более удобными для использования.

Во-вторых, почему вы хотите запустить новый поток для вставки элемента, когда вам всегда приходится ждать завершения предыдущего, прежде чем начинать следующий?Кажется, это не классическое использование нескольких потоков.

Хотя...Одним из классических решений этого было бы, чтобы один рабочий поток получал задания из очереди событий, а другие потоки размещали операцию в очереди событий.

Если вы действительно просто хотите сохранить все более или менее таким, как у вас есть сейчас, вам придется сделать это:

  • Создайте переменную условия, например insert_finished.
  • Все потоки, которые хотят выполнить вставку, ожидают от переменной условия.
  • Как только один поток завершает свою вставку, он запускает переменную условия.
  • Поскольку для переменной условия требуется мьютекс, вы можете просто уведомить ВСЕ ожидающие потоки, все они хотят начать вставку, но поскольку только один поток может получать мьютекс одновременно, все потоки будут выполнять вставку последовательно.

Но вы должны позаботиться о том, чтобы ваша синхронизация не была реализована слишком случайным образом.Как это называется insert, Я подозреваю, что вы хотите манипулировать структурой данных, поэтому вы, вероятно, хотите сначала реализовать потокобезопасную структуру данных, вместо того чтобы совместно использовать синхронизацию между доступом к структуре данных и всеми клиентами.Я также подозреваю, что операций будет больше, чем просто insert, для чего потребуется надлежащая синхронизация...

Другие советы

В соответствии с единой спецификацией Unix:"Результаты нескольких одновременных вызовов pthread_join(), указывающих один и тот же целевой поток, не определены".

"Обычный способ" достижения одного потока для получения задачи состоял бы в настройке переменной условия (не забудьте соответствующий мьютекс):незанятые потоки ожидают в pthread_cond_wait() (или pthread_cond_timedwait()), и когда поток, выполняющий работу, завершается, он пробуждает один из незанятых потоков с помощью pthread_cond_signal().

Да, как и рекомендовало большинство людей, лучшим способом, по-видимому, является чтение рабочего потока из очереди.Некоторые фрагменты кода ниже

    pthread_t       insertThread = NULL;
    pthread_mutex_t insertConditionNewMutex = PTHREAD_MUTEX_INITIALIZER;
    pthread_mutex_t insertConditionDoneMutex    = PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t  insertConditionNew      = PTHREAD_COND_INITIALIZER;
    pthread_cond_t  insertConditionDone     = PTHREAD_COND_INITIALIZER;

       //Thread for new incoming connection
        void * newBatchInsert()
        {
           for(each Word)
           {
                            //Push It into the queue
                            pthread_mutex_lock(&lexicon[newPendingWord->length - 1]->insertQueueMutex);
                                lexicon[newPendingWord->length - 1]->insertQueue.push(newPendingWord);
                            pthread_mutex_unlock(&lexicon[newPendingWord->length - 1]->insertQueueMutex);

           }

                    //Send signal to worker Thread
                    pthread_mutex_lock(&insertConditionNewMutex);
                        pthread_cond_signal(&insertConditionNew);
                    pthread_mutex_unlock(&insertConditionNewMutex);

                    //Wait Until it's finished
                    pthread_cond_wait(&insertConditionDone, &insertConditionDoneMutex);

        }


            //Worker thread
            void * insertWorker(void *)
            {

                while(1)        
                {

                    pthread_cond_wait(&insertConditionNew, &insertConditionNewMutex);

                    for (int ii = 0; ii < maxWordLength; ++ii)
                    {                   
                            while (!lexicon[ii]->insertQueue.empty())
                            {

                                queueNode * newPendingWord = lexicon[ii]->insertQueue.front();


                                lexicon[ii]->insert(newPendingWord->word);

                                pthread_mutex_lock(&lexicon[ii]->insertQueueMutex);
                                lexicon[ii]->insertQueue.pop();
                                pthread_mutex_unlock(&lexicon[ii]->insertQueueMutex);

                            }

                    }

                    //Send signal that it's done
                    pthread_mutex_lock(&insertConditionDoneMutex);
                        pthread_cond_broadcast(&insertConditionDone);
                    pthread_mutex_unlock(&insertConditionDoneMutex);

                }

            }

            int main(int argc, char * const argv[]) 
            {

                pthread_create(&insertThread, NULL, &insertWorker, NULL);


                lexiconServer = new server(serverPort, (void *) newBatchInsert);

                return 0;
            }

Другие уже указывали, что это имеет неопределенное поведение.Я бы просто добавил, что действительно самый простой способ выполнить вашу задачу (разрешить только одному потоку выполнять часть кода) - это использовать простой мьютекс - вам нужно, чтобы потоки, выполняющие этот код, были взаимоисключающими, и именно отсюда мьютекс получил свое название :-)

Если вам нужно, чтобы код выполнялся в определенном потоке (например, Java AWT), то вам нужны условные переменные.Однако вам следует дважды подумать, действительно ли это решение окупается.Представьте, сколько переключений контекста вам понадобится, если вы вызываете свою "Операцию вставки" 10000 раз в секунду.

Поскольку вы только что упомянули, что используете хэш-таблицу с несколькими поисками, параллельными вставкам, я бы рекомендовал проверить, можете ли вы использовать параллельную хэш-таблицу.

Поскольку точные результаты поиска не являются детерминированными при одновременной вставке элементов, такая параллельная хэш-карта может быть именно тем, что вам нужно.Однако я не использовал параллельные хэш-таблицы в C ++, но поскольку они доступны в Java, вы наверняка найдете библиотеку, делающую это на C ++.

Единственная библиотека, которую я нашел, которая поддерживает вставки без блокировки новых поисковых запросов - Восход солнца DD (И я не уверен, поддерживает ли он одновременные вставки)

Однако переход от Google к Разреженная хэш - карта использование памяти более чем удваивается.Запросы должны выполняться довольно редко, поэтому вместо того, чтобы пытаться написать свою собственную библиотеку которая сочетает в себе преимущества обоих, я бы предпочел просто заблокировать таблицу, приостанавливающую поиск, пока изменения вносятся безопасно.

Еще раз спасибо

Мне кажется, что вы хотите сериализовать вставки в хэш-таблицу.

Для этого вам нужна блокировка, а не создание новых потоков.

Из вашего описания это выглядит очень неэффективно, поскольку вы заново создаете поток вставки каждый раз, когда хотите что-то вставить.Стоимость создания потока не равна 0.

Более распространенным решением этой проблемы является создание потока insert, который ожидает в очереди (т. Е. находится в спящем режиме цикла, пока цикл пуст).Затем другие потоки добавляют рабочие элементы в очередь.Поток insert выбирает элементы очереди в том порядке, в котором они были добавлены (или по приоритету, если хотите), и выполняет соответствующее действие.

Все, что вам нужно сделать, это убедиться, что добавление в очередь защищено, чтобы только один поток одновременно имел доступ к изменению фактической очереди, и что поток вставки не выполняет ожидание занятости, а скорее переходит в спящий режим, когда в очереди ничего нет (см. переменная условия).

В идеале вам не нужно использовать несколько пулов потоков в одном процессе, даже если они выполняют разные операции.Возобновляемость потока является важным архитектурным определением, которое приводит к созданию pthread_join в основном потоке, если вы используете C.

Конечно, для C ++ threadpool, также известного как ThreadFactory , идея состоит в том, чтобы сохранить примитивы потоков абстрактными, чтобы он мог обрабатывать любой из переданных ему типов функций / операций.

Типичным примером может быть веб-сервер, который будет иметь пулы соединений и пулы потоков, которые обслуживают соединения и затем обрабатывают их дальше, но все они являются производными от общего процесса threadpool.

Краткие сведения :ИЗБЕГАЙТЕ PTHREAD_JOIN В любом другом месте, кроме основного потока.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top