使用 POSIX 线程和 C++,我有一个“插入操作”,一次只能安全地完成一个操作。

如果我有多个线程使用pthread_join插入,则在完成新线程后产生。它们是否都会立即接收“线程完成”信号并生成多个插入,或者是否可以安全地假设首先接收“线程完成”信号的线程将生成一个新线程,阻止其他线程创建新线程。

/* --- GLOBAL --- */
pthread_t insertThread;



/* --- DIFFERENT THREADS --- */
// Wait for Current insert to finish
pthread_join(insertThread, NULL); 

// Done start a new one
pthread_create(&insertThread, NULL, Insert, Data);

感谢您的答复

该程序基本上是一个巨大的哈希表,它通过套接字接收客户端的请求。

每个新的客户端连接都会生成一个新线程,然后可以从中执行多个操作,特别是查找或插入。查找可以并行进行。但插入件需要“重新组合”成单个螺纹。您可以说,查找操作可以在不为客户端生成新线程的情况下完成,但是它们可能需要一段时间导致服务器锁定,从而丢弃新请求。该设计试图尽可能减少系统调用和线程创建。

但现在我知道这并不安全,就像我最初认为我应该能够拼凑一些东西一样

谢谢

有帮助吗?

解决方案

opengroup.org 上的 pthread_join:

多次同时调用指定同一目标线程的 pthread_join() 的结果是未定义的。

因此,您确实不应该有多个线程加入您之前的 insertThread。

首先,当你使用 C++ 时,我建议 升压线程. 。它们类似于线程的 POSIX 模型,并且也可以在 Windows 上运行。它可以帮助你使用 C++,即通过使函数对象更容易使用。

其次,当您总是必须等待上一个线程完成才能开始下一个元素时,为什么要启动一个新线程来插入元素?似乎不是多线程的经典使用。

虽然...对此的一种经典解决方案是让一个工作线程从事件队列中获取作业,而其他线程将操作发布到事件队列上。

如果你真的只是想或多或少地保持现在的样子,你必须这样做:

  • 创建一个条件变量,例如 insert_finished.
  • 所有想要执行插入的线程都等待条件变量。
  • 一旦一个线程完成插入,它就会触发条件变量。
  • 由于条件变量需要互斥锁,因此您只需通知 全部 等待线程,它们都想开始插入,但由于一次只有一个线程可以获取互斥体,所以所有线程将顺序执行插入。

但您应该注意同步不是以过于临时的方式实现的。因为这被称为 insert, ,我怀疑您想要操作数据结构,因此您可能想首先实现线程安全的数据结构,而不是共享数据结构访问和所有客户端之间的同步。我还怀疑还会有更多的操作 insert, ,这将需要适当的同步......

其他提示

根据Single Unix规范:“指定同一目标线程的多个同时调用pthread_join()的结果未定义。”

“正常方式”实现单个线程来获取任务将是设置一个条件变量(不要忘记相关的互斥锁):空闲线程在pthread_cond_wait()(或pthread_cond_timedwait())中等待,并且当完成工作的线程完成时,它用pthread_cond_signal()唤醒其中一个空闲的。

是的,因为大多数人建议最好的方法似乎是从队列中读取工作线程。

下面的一些代码片段
    pthread_t       insertThread = NULL;
    pthread_mutex_t insertConditionNewMutex = PTHREAD_MUTEX_INITIALIZER;
    pthread_mutex_t insertConditionDoneMutex    = PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t  insertConditionNew      = PTHREAD_COND_INITIALIZER;
    pthread_cond_t  insertConditionDone     = PTHREAD_COND_INITIALIZER;

       //Thread for new incoming connection
        void * newBatchInsert()
        {
           for(each Word)
           {
                            //Push It into the queue
                            pthread_mutex_lock(&lexicon[newPendingWord->length - 1]->insertQueueMutex);
                                lexicon[newPendingWord->length - 1]->insertQueue.push(newPendingWord);
                            pthread_mutex_unlock(&lexicon[newPendingWord->length - 1]->insertQueueMutex);

           }

                    //Send signal to worker Thread
                    pthread_mutex_lock(&insertConditionNewMutex);
                        pthread_cond_signal(&insertConditionNew);
                    pthread_mutex_unlock(&insertConditionNewMutex);

                    //Wait Until it's finished
                    pthread_cond_wait(&insertConditionDone, &insertConditionDoneMutex);

        }


            //Worker thread
            void * insertWorker(void *)
            {

                while(1)        
                {

                    pthread_cond_wait(&insertConditionNew, &insertConditionNewMutex);

                    for (int ii = 0; ii < maxWordLength; ++ii)
                    {                   
                            while (!lexicon[ii]->insertQueue.empty())
                            {

                                queueNode * newPendingWord = lexicon[ii]->insertQueue.front();


                                lexicon[ii]->insert(newPendingWord->word);

                                pthread_mutex_lock(&lexicon[ii]->insertQueueMutex);
                                lexicon[ii]->insertQueue.pop();
                                pthread_mutex_unlock(&lexicon[ii]->insertQueueMutex);

                            }

                    }

                    //Send signal that it's done
                    pthread_mutex_lock(&insertConditionDoneMutex);
                        pthread_cond_broadcast(&insertConditionDone);
                    pthread_mutex_unlock(&insertConditionDoneMutex);

                }

            }

            int main(int argc, char * const argv[]) 
            {

                pthread_create(&insertThread, NULL, &insertWorker, NULL);


                lexiconServer = new server(serverPort, (void *) newBatchInsert);

                return 0;
            }

其他人已经指出这有不确定的行为。我只是补充说,完成任务的最简单方法(只允许一个线程执行部分代码)是使用一个简单的互斥锁 - 你需要执行该代码的线程是MUTally EXclusive,这就是mutex来到的地方它的名字: - )

如果需要在特定线程(如Java AWT)中运行代码,则需要条件变量。但是,您应该三思而后行,这个解决方案是否真的有回报。想象一下,如果你打电话给你的“插入操作”,你需要多少个上下文切换。每秒10000次。

正如您刚才提到的那样,您使用的哈希表有几个与插入并行的查找,我建议您检查是否可以使用并发哈希表。

当您同时插入元素时,确切的查找结果是不确定的,这样的并发哈希映射可能正是您所需要的。我没有在C ++中使用并发哈希表,但是因为它们在Java中可用,所以你肯定会找到一个用C ++编写的库。

我找到的唯一一个支持插入而不锁定新查找的库 - Sunrise DD (我不确定它是否支持并发插入)

但是,从Google的稀疏哈希地图切换,内存使用量增加了一倍以上。查找应该很少发生,而不是尝试和编写我自己的库 结合了两者的优点,我宁愿锁定表暂停查找,同时安全地进行更改。

再次感谢

在我看来,您希望将插入序列化为哈希表。

为此你想要一个锁 - 不要产生新的线程。

从您的描述看起来非常低效,因为每次要插入内容时都要重新创建插入线程。创建线程的成本不是0.

这个问题的一个更常见的解决方案是生成一个等待队列的插入线程(即在循环为空时坐在循环中)。然后,其他线程将工作项添加到队列中。插入线程按照添加顺序(或根据需要按优先级)选择队列中的项目并执行相应的操作。

您所要做的就是确保对队列的添加受到保护,以便一次只有一个线程可以访问修改实际队列,并且插入线程不会进行繁忙等待,而是在没有任何东西时休眠在队列中(参见条件变量)。

理想情况下,您不希望在单个进程中使用多个线程池,即使它们执行不同的操作也是如此。线程的可恢复性是一个重要的体系结构定义,如果使用C,则会导致在主线程中创建pthread_join。

当然,对于C ++线程池又称ThreadFactory,我们的想法是保持线程原语是抽象的,这样就可以处理传递给它的任何函数/操作类型。

一个典型的例子是一个Web服务器,它将具有连接池和线程池,它们为连接提供服务,然后进一步处理它们,但是,所有这些都是从一个共同的线程池进程派生的。

摘要:在主线程以外的任何地方避免PTHREAD_JOIN。

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top