POSIX 스레드를 사용하여 읽기 읽기를 구현하는 방법
-
03-07-2019 - |
문제
대략 인터페이스를 순종하는 생산자/소비자 시나리오를 구현하고 싶습니다.
class Consumer {
private:
vector<char> read(size_t n) {
// If the internal buffer has `n` elements, then dequeue them
// Otherwise wait for more data and try again
}
public:
void run() {
read(10);
read(4839);
// etc
}
void feed(const vector<char> &more) {
// Safely queue the data
// Notify `read` that there is now more data
}
};
이 경우 feed
그리고 run
별도의 스레드에서 실행됩니다 read
차단 읽기 여야합니다 (좋아요 recv
그리고 fread
). 분명히, 나는 내 deque에 어떤 종류의 상호 배제가 필요할 것이며, 정보를 제공하기 위해 어떤 종류의 알림 시스템이 필요합니다. read
다시 시도합니다.
나는 듣는다 조건 변수 갈 길이지만, 모든 멀티 스레딩 경험은 창문이 있으며 머리를 감싸는 데 어려움을 겪고 있습니다.
도움을 주셔서 감사합니다!
(예, 벡터를 반환하는 것이 비효율적이라는 것을 알고 있습니다. 그 안에 들어 가지 말자.)
해결책
이 코드는 생산 준비가 아닙니다. 라이브러리 호출 결과에 대한 오류 점검이 수행되지 않습니다.
Lockthread에서 Mutex의 잠금/잠금 해제를 감싸서 예외입니다. 그러나 그것은 그것에 관한 것입니다.
또한이 작업을 진지하게 수행한다면 물체 내부의 뮤텍스와 조건 변수를 감싸서 다른 소비자 방법 내부에서 코트를 남용 할 수 있습니다. 그러나 조건 변수를 사용하기 전에 잠금 장치를 획득해야한다는 점에 유의하십시오.이 간단한 상황은 그대로있을 수 있습니다.
관심이 없어 Boost Threading Library를 확인 했습니까?
#include <iostream>
#include <vector>
#include <pthread.h>
class LockThread
{
public:
LockThread(pthread_mutex_t& m)
:mutex(m)
{
pthread_mutex_lock(&mutex);
}
~LockThread()
{
pthread_mutex_unlock(&mutex);
}
private:
pthread_mutex_t& mutex;
};
class Consumer
{
pthread_mutex_t lock;
pthread_cond_t cond;
std::vector<char> unreadData;
public:
Consumer()
{
pthread_mutex_init(&lock,NULL);
pthread_cond_init(&cond,NULL);
}
~Consumer()
{
pthread_cond_destroy(&cond);
pthread_mutex_destroy(&lock);
}
private:
std::vector<char> read(size_t n)
{
LockThread locker(lock);
while (unreadData.size() < n)
{
// Must wait until we have n char.
// This is a while loop because feed may not put enough in.
// pthread_cond() releases the lock.
// Thread will not be allowed to continue until
// signal is called and this thread reacquires the lock.
pthread_cond_wait(&cond,&lock);
// Once released from the condition you will have re-aquired the lock.
// Thus feed() must have exited and released the lock first.
}
/*
* Not sure if this is exactly what you wanted.
* But the data is copied out of the thread safe buffer
* into something that can be returned.
*/
std::vector<char> result(n); // init result with size n
std::copy(&unreadData[0],
&unreadData[n],
&result[0]);
unreadData.erase(unreadData.begin(),
unreadData.begin() + n);
return (result);
}
public:
void run()
{
read(10);
read(4839);
// etc
}
void feed(const std::vector<char> &more)
{
LockThread locker(lock);
// Once we acquire the lock we can safely modify the buffer.
std::copy(more.begin(),more.end(),std::back_inserter(unreadData));
// Only signal the thread if you have the lock
// Otherwise race conditions happen.
pthread_cond_signal(&cond);
// destructor releases the lock and thus allows read thread to continue.
}
};
int main()
{
Consumer c;
}
다른 팁
나는 내가 "syncronized queue"라고 부르는 것을 사용하는 경향이 있습니다. 나는 일반 대기열을 감싸고 원하는대로 잠금 및 읽기 블록을 위해 세마포어 클래스를 사용합니다.
#ifndef SYNCQUEUE_20061005_H_
#define SYNCQUEUE_20061005_H_
#include <queue>
#include "Semaphore.h"
// similar, but slightly simpler interface to std::queue
// this queue implementation will serialize pushes and pops
// and block on a pop while empty (as apposed to throwing an exception)
// it also locks as neccessary on insertion and removal to avoid race
// conditions
template <class T, class C = std::deque<T> > class SyncQueue {
protected:
std::queue<T, C> m_Queue;
Semaphore m_Semaphore;
Mutex m_Mutex;
public:
typedef typename std::queue<T, C>::value_type value_type;
typedef typename std::queue<T, C>::size_type size_type;
explicit SyncQueue(const C& a = C()) : m_Queue(a), m_Semaphore(0) {}
bool empty() const { return m_Queue.empty(); }
size_type size() const { return m_Queue.size(); }
void push(const value_type& x);
value_type pop();
};
template <class T, class C>
void SyncQueue<T, C>::push(const SyncQueue<T, C>::value_type &x) {
// atomically push item
m_Mutex.lock();
m_Queue.push(x);
m_Mutex.unlock();
// let blocking semaphore know another item has arrived
m_Semaphore.v();
}
template <class T, class C>
typename SyncQueue<T, C>::value_type SyncQueue<T, C>::pop() {
// block until we have at least one item
m_Semaphore.p();
// atomically read and pop front item
m_Mutex.lock();
value_type ret = m_Queue.front();
m_Queue.pop();
m_Mutex.unlock();
return ret;
}
#endif
스레딩 구현에서 적절한 프리미티브를 사용하여 세마포어 및 뮤트를 구현할 수 있습니다.
참고 :이 구현은 대기열의 단일 요소에 대한 예이지만 N이 제공 될 때까지 버퍼 결과를 버퍼링하는 함수로 쉽게 래핑 할 수 있습니다. 숯불 줄이라면 이와 같은 것 :
std::vector<char> func(int size) {
std::vector<char> result;
while(result.size() != size) {
result.push_back(my_sync_queue.pop());
}
return result;
}
나는 반-피다 코드를 버릴 것이다. 내 의견은 다음과 같습니다.
1) 여기에서 매우 큰 잠금 곡물. 더 빠른 액세스가 필요한 경우 데이터 구조를 다시 생각할 수 있습니다. STL은 ThreadSafe가 아닙니다.
2) Mutex가 통과 할 때까지 잠금이 차단됩니다. Mutex 구조는 잠금/잠금 해제 메커니즘으로 한 번에 1 개의 실을 통과시킬 수 있다는 것입니다. 폴링이나 예외 구조가 필요하지 않습니다.
3) 이것은 문제에서 구문 적으로 해킹 된 컷입니다. API 나 C ++ 구문에 정확하지는 않지만 의미 적으로 올바른 솔루션을 제공한다고 생각합니다.
4) 의견에 대한 응답으로 편집.
class piper
{
pthread_mutex queuemutex;
pthread_mutex readymutex;
bool isReady; //init to false by constructor
//whatever else
};
piper::read()
{//whatever
pthread_mutex_lock(&queuemutex)
if(myqueue.size() >= n)
{
return_queue_vector.push_back(/* you know what to do here */)
pthread_mutex_lock(&readymutex)
isReady = false;
pthread_mutex_unlock(&readymutex)
}
pthread_mutex_unlock(&queuemutex)
}
piper::push_em_in()
{
//more whatever
pthread_mutex_lock(&queuemutex)
//push push push
if(myqueue.size() >= n)
{
pthread_mutex_lock(&readymutex)
isReady = true;
pthread_mutex_unlock(&readymutex)
}
pthread_mutex_unlock(&queuemutex)
}
재미를 위해 Boost를 사용한 빠르고 더러운 구현이 있습니다. 지원하는 플랫폼에서 후드 아래에서 pthreads를 사용하고 Windows에서 Windows 작업을 사용합니다.
boost::mutex access;
boost::condition cond;
// consumer
data read()
{
boost::mutex::scoped_lock lock(access);
// this blocks until the data is ready
cond.wait(lock);
// queue is ready
return data_from_queue();
}
// producer
void push(data)
{
boost::mutex::scoped_lock lock(access);
// add data to queue
if (queue_has_enough_data())
cond.notify_one();
}
더 재미있게, 여기 내 최종 버전이 있습니다. 정당한 이유없이 stl-in. :-)
#include <algorithm>
#include <deque>
#include <pthread.h>
template<typename T>
class MultithreadedReader {
std::deque<T> buffer;
pthread_mutex_t moreDataMutex;
pthread_cond_t moreDataCond;
protected:
template<typename OutputIterator>
void read(size_t count, OutputIterator result) {
pthread_mutex_lock(&moreDataMutex);
while (buffer.size() < count) {
pthread_cond_wait(&moreDataCond, &moreDataMutex);
}
std::copy(buffer.begin(), buffer.begin() + count, result);
buffer.erase(buffer.begin(), buffer.begin() + count);
pthread_mutex_unlock(&moreDataMutex);
}
public:
MultithreadedReader() {
pthread_mutex_init(&moreDataMutex, 0);
pthread_cond_init(&moreDataCond, 0);
}
~MultithreadedReader() {
pthread_cond_destroy(&moreDataCond);
pthread_mutex_destroy(&moreDataMutex);
}
template<typename InputIterator>
void feed(InputIterator first, InputIterator last) {
pthread_mutex_lock(&moreDataMutex);
buffer.insert(buffer.end(), first, last);
pthread_cond_signal(&moreDataCond);
pthread_mutex_unlock(&moreDataMutex);
}
};
glib 비동기 큐는 당신이 찾고있는 빈 줄을 읽는 데 잠금과 수면을 제공합니다. 보다 http://library.gnome.org/devel/glib/2.20/glib-asynchronous-queues.html 당신은 그것들을 gthreads 또는 gthread 수영장과 결합 할 수 있습니다.