c/c++语言开发共享C Pthreads – 线程安全队列实现的问题

我是multithreading的新手,我试图实现一个简单的线程安全任务队列,每个线程可以从中拉出工作,直到没有剩下的任务。 任何线程都不会排队任务。


static pthread_mutex_t task_mutex = PTHREAD_MUTEX_INITIALIZER; typedef struct Task{ int number; }Task; typedef struct Cell{ Task t; struct Cell* next; }Cell; typedef struct TQueue{ struct Cell* head; struct Cell* tail; }TQueue; int empty(TQueue *Queue) return queue->head == queue->tail; void startQueue(TQueue *queue){ queue->head = malloc(sizeof(Cell)); queue->tail = queue->head; } void enqueue(TQueue *queue, Task C){ queue->tail->next = malloc(sizeof(Cell)); queue->tail = queue->tail->next; queue->tail->t = C; queue->tail->next = NULL; } Task * dequeue(TQueue* queue){ pthread_mutex_lock( &task_mutex); Task * t; if(empty(queue)) t = NULL; else{ struct Cell* p = queue->head; queue->head = queue->head->next; t = &queue->head->t; free(p); } pthread_mutex_unlock( &task_mutex); return t; } void * work( void* arg){ TQueue* queue = (TQueue *)arg; Task* t = malloc(sizeof(Task)); for(t = dequeue(queue); t != NULL; t = dequeue(queue)) printf("%d ", t->number); free(t); pthread_exit(NULL); return 0; } 


 int main(){ TQueue* queue = malloc(sizeof(TQueue)); startQueue(queue); pthread_t threads[3]; Task t[3]; for(int i = 0; i < 3; i++){ t[i].number = i + 1; enqueue(queue, t[i]); } for(int i = 0; i < 3; i++) pthread_create(&threads[i], NULL, work, (void*)queue); for(int i = 0; i < 3; i++) pthread_join(threads[i], NULL); return 0; } 

任何顺序的预期输出为1 2 3 ,但有时它会打印出一个奇怪数字的序列,如1823219 2 3 。 我无法发现任何竞争条件或相关问题,所以我感谢任何帮助。


    我已经注释了你的代码。 我从你的第一个post和你的第二个post中拿了一点。 我修复了代码,显示前后[请原谅无偿风格的清理]:

     #include  #include  #include  static pthread_mutex_t task_mutex = PTHREAD_MUTEX_INITIALIZER; typedef struct Task { int number; } Task; typedef struct Cell { // NOTE/BUG: this should be a pointer to the task. otherwise, dequeue gets // messy #if 0 Task t; #else Task *t; #endif struct Cell *next; } Cell; typedef struct TQueue { struct Cell *head; struct Cell *tail; } TQueue; void startQueue(TQueue *queue) { #if 0 queue->head = malloc(sizeof(Cell)); #else queue->head = NULL; #endif queue->tail = NULL; } int empty(TQueue *queue) { // NOTE/BUG: dequeue never touches tail, so this test is incorrect #if 0 return (queue->head == queue->tail); #else return (queue->head == NULL); #endif } void enqueue(TQueue *queue, Task *t) { Cell *p; pthread_mutex_lock(&task_mutex); p = malloc(sizeof(Cell)); p->next = NULL; p->t = t; if (queue->tail == NULL) { queue->tail = p; queue->head = p; } else { queue->tail->next = p; queue->tail = p; } pthread_mutex_unlock(&task_mutex); } Task * dequeue(TQueue *queue) { Task *t; pthread_mutex_lock(&task_mutex); if (empty(queue)) t = NULL; else { Cell *p = queue->head; if (p == queue->tail) queue->tail = NULL; queue->head = p->next; // NOTE/BUG: this is setting t to the second element in the list, // not the first // NOTE/BUG: this is also undefined behavior, in original code (with // original struct definition), because what t points to _does_ get // freed before return #if 0 t = &queue->head->t; #else t = p->t; #endif free(p); } pthread_mutex_unlock(&task_mutex); return t; } void * work(void *arg) { TQueue *queue = (TQueue *) arg; // NOTE/BUG: this gets orphaned on the first call to dequeue #if 0 Task *t = malloc(sizeof(Task)); #else Task *t; #endif for (t = dequeue(queue); t != NULL; t = dequeue(queue)) printf("%d ", t->number); // NOTE/BUG: this frees some cell allocated in main -- not what we want #if 0 free(t); #endif pthread_exit(NULL); return 0; } // For a simple test i runned this on main: int main() { TQueue *queue = malloc(sizeof(TQueue)); startQueue(queue); pthread_t threads[3]; Task t[3]; for (int i = 0; i < 3; i++) { t[i].number = i + 1; #if 0 enqueue(queue, t); #else enqueue(queue, &t[i]); #endif } for (int i = 0; i < 3; i++) pthread_create(&threads[i], NULL, work, (void *) queue); for (int i = 0; i < 3; i++) pthread_join(threads[i], NULL); return 0; } 


    线程是否同时执行任务? 我一直在用htop测试cpu的使用情况,我只能最多使用四个核心的单个核心。

    要记住一些事情。 对于运行时间如此短的程序, htop可能不会显示太多。 即使有10,000个队列条目,该程序也会在20ms内执行。

    让程序本身打印信息更好[见下文]。 请注意, printfstdin上执行线程锁定,因此它可能会导致程序的“串行”性质。 它也为程序的执行时间贡献了很多 (即printfdequeue慢得多)


    操作系统可以[可以自由地]安排单个核心上的所有线程。 然后它可以稍后“迁移”它们(例如在一秒左右)。

    我已经增强了程序,在输出打印中包含一些定时信息,可能有助于显示您希望看到的更多信息。 此外,我添加了命令行选项来控制线程数和排队的项目数。 这与我自己的一些程序类似。 将程序输出转移到日志文件并进行检查。 在多次运行中使用选项

     #include  #include  #include  #include  #include  int opt_n; // suppress thread output int opt_T; // number of threads int opt_Q; // number of queue items static pthread_mutex_t task_mutex = PTHREAD_MUTEX_INITIALIZER; double tvzero; typedef struct Task { int number; } Task; typedef struct Cell { Task *t; struct Cell *next; } Cell; typedef struct TQueue { struct Cell *head; struct Cell *tail; } TQueue; typedef struct Thread { pthread_t tid; int xid; TQueue *queue; } Thread; double tvgetf(void) { struct timespec ts; double sec; clock_gettime(CLOCK_REALTIME,&ts); sec = ts.tv_nsec; sec /= 1e9; sec += ts.tv_sec; sec -= tvzero; return sec; } void startQueue(TQueue *queue) { queue->head = NULL; queue->tail = NULL; } int empty(TQueue *queue) { return (queue->head == NULL); } void enqueue(TQueue *queue, Task *t) { Cell *p; pthread_mutex_lock(&task_mutex); p = malloc(sizeof(Cell)); p->next = NULL; p->t = t; if (queue->tail == NULL) { queue->tail = p; queue->head = p; } else { queue->tail->next = p; queue->tail = p; } pthread_mutex_unlock(&task_mutex); } Task * dequeue(TQueue *queue) { Task *t; pthread_mutex_lock(&task_mutex); if (empty(queue)) t = NULL; else { Cell *p = queue->head; if (p == queue->tail) queue->tail = NULL; queue->head = p->next; t = p->t; free(p); } pthread_mutex_unlock(&task_mutex); return t; } void * work(void *arg) { Thread *tskcur = arg; TQueue *queue = tskcur->queue; Task *t; double tvbef; double tvaft; while (1) { tvbef = tvgetf(); t = dequeue(queue); tvaft = tvgetf(); if (t == NULL) break; if (! opt_n) printf("[%.9f/%.9f %5.5d] %dn", tvbef,tvaft - tvbef,tskcur->xid,t->number); } return (void *) 0; } // For a simple test i runned this on main: int main(int argc,char **argv) { char *cp; TQueue *queue; Task *t; Thread *tsk; --argc; ++argv; for (; argc > 0; --argc, ++argv) { cp = *argv; if (*cp != '-') break; switch (cp[1]) { case 'n': // suppress thread output opt_n = 1; break; case 'Q': // number of queue items opt_Q = atoi(cp + 2); break; case 'T': // number of threads opt_T = atoi(cp + 2); break; default: break; } } tvzero = tvgetf(); queue = malloc(sizeof(TQueue)); startQueue(queue); if (opt_T == 0) opt_T = 16; Thread threads[opt_T]; if (opt_Q == 0) opt_Q = 10000; t = malloc(sizeof(Task) * opt_Q); for (int i = 0; i < opt_Q; i++) { t[i].number = i + 1; enqueue(queue, &t[i]); } for (int i = 0; i < opt_T; i++) { tsk = &threads[i]; tsk->xid = i + 1; tsk->queue = queue; pthread_create(&tsk->tid, NULL, work, tsk); } for (int i = 0; i < opt_T; i++) { tsk = &threads[i]; pthread_join(tsk->tid, NULL); } printf("TOTAL: %.9fn",tvgetf()); free(t); return 0; } 




    pthread_create需要一些时间,允许线程1继续,而其他线程仍在创建。 改善这种情况的一种方法是创建所有线程,每个线程设置一个“我正在运行”标志(在其线程控制块中)。 主线程等待所有线程设置此标志。 然后,主线程设置一个全局volatile“you_may_now_all_run”标志,每个线程在进入其主线程循环之前旋转。 根据我的经验,它们都开始在彼此的微秒内运行[或更好]。

    我没有在下面的更新代码中实现这一点,因此您可以自己进行实验[与nanosleep ]。

    互斥体总体上相当公平[至少在linux下],因为被阻塞的线程将排队,等待互斥锁。 正如我在评论中提到的那样,也可以使用nanosleep ,但这[有点]会失败,因为线程会减慢。

    线索饥饿的解药是“公平”。 正如我所提到的,有一个精细的公平算法,无需等待。 它是Kogan / Petrank算法: http : //www.cs.technion.ac.il/~erez/Papers/wf-methodology-ppopp12.pdf这实际上有点涉及/高级,所以需要注意......

    但是,妥协可能是门票锁: https : //en.wikipedia.org/wiki/Ticket_lock

    我再次重新编写程序。 它具有池化分配,票证与互斥锁定以及日志条目的延迟打印的选项。 它还交叉检查线程之间的结果,以确保它们都没有重复的条目。


    例如,人们会认为在dequeue内部执行free会比简单地将Cell释放到可恢复池(类似于slab分配器)慢,但是,性能提升并不像预期的那么大。 这可能是glibc的malloc/free只是快速的[这是他们声称的 ]。



     #include  #include  #include  #include  #include  #include  #include  #include  int opt_p; // print thread output immediately int opt_T; // number of threads int opt_Q; // number of queue items int opt_L; // use ticket lock int opt_M; // use fast cell alloc/free typedef unsigned char byte; typedef unsigned int u32; #define sysfault(_fmt...)  do {  fprintf(stderr,_fmt);  exit(1);  } while (0) // lock control typedef struct AnyLock { pthread_mutex_t mutex; // standard mutex volatile u32 seqreq; // ticket lock request volatile u32 seqacq; // ticket lock grant } AnyLock; // work value typedef struct Task { union { struct Task *next; int number; }; } Task; // queue item typedef struct Cell { struct Cell *next; Task *t; } Cell; // queue control typedef struct TQueue { struct Cell *head; struct Cell *tail; } TQueue; // thread log entry typedef struct Log { double tvbef; double tvaft; int number; } Log; #define BTVOFF(_off)  ((_off) >> 3) #define BTVMSK(_off)  (1u << ((_off) & 0x07)) #define BTVLEN(_len)  ((_len) + 7) >> 3 // thread control typedef struct Thread { pthread_t tid; int xid; TQueue *queue; Log *log; byte *bitv; } Thread; static inline byte btvset(byte *bitv,long off) { u32 msk; byte oval; bitv += BTVOFF(off); msk = BTVMSK(off); oval = *bitv & msk; *bitv |= msk; return oval; } AnyLock task_mutex; AnyLock print_mutex; double tvzero; Cell *cellpool; // free pool of cells long bitvlen; #define BARRIER  __asm__ __volatile__("" ::: "memory") // virtual function pointers Cell *(*cellnew)(void); void (*cellfree)(Cell *); void (*lock_acquire)(AnyLock *lock); void (*lock_release)(AnyLock *lock); double tvgetf(void) { struct timespec ts; double sec; clock_gettime(CLOCK_REALTIME,&ts); sec = ts.tv_nsec; sec /= 1e9; sec += ts.tv_sec; sec -= tvzero; return sec; } void * xalloc(size_t cnt,size_t siz) { void *ptr; ptr = calloc(cnt,siz); if (ptr == NULL) sysfault("xalloc: calloc failure -- %sn",strerror(errno)); return ptr; } void lock_wait_ticket(AnyLock *lock,u32 newval) { u32 oldval; // wait for our ticket to come up // NOTE: atomic_load is [probably] overkill here while (1) { #if 0 oldval = atomic_load(&lock->seqacq); #else oldval = lock->seqacq; #endif if (oldval == newval) break; } } void lock_acquire_ticket(AnyLock *lock) { u32 oldval; u32 newval; int ok; // acquire our ticket value // NOTE: just use a garbage value for oldval -- the exchange will // update it with the correct/latest value -- this saves a separate // refetch within the loop oldval = 0; while (1) { #if 0 BARRIER; oldval = lock->seqreq; #endif newval = oldval + 1; ok = atomic_compare_exchange_strong(&lock->seqreq,&oldval,newval); if (ok) break; } lock_wait_ticket(lock,newval); } void lock_release_ticket(AnyLock *lock) { // NOTE: atomic_fetch_add is [probably] overkill, but leave it for now #if 1 atomic_fetch_add(&lock->seqacq,1); #else lock->seqacq += 1; #endif } void lock_acquire_mutex(AnyLock *lock) { pthread_mutex_lock(&lock->mutex); } void lock_release_mutex(AnyLock *lock) { pthread_mutex_unlock(&lock->mutex); } void lock_init(AnyLock *lock) { switch (opt_L) { case 1: lock->seqreq = 0; lock->seqacq = 1; lock_acquire = lock_acquire_ticket; lock_release = lock_release_ticket; break; default: pthread_mutex_init(&lock->mutex,NULL); lock_acquire = lock_acquire_mutex; lock_release = lock_release_mutex; break; } } void startQueue(TQueue *queue) { queue->head = NULL; queue->tail = NULL; } int empty(TQueue *queue) { return (queue->head == NULL); } // cellnew_pool -- allocate a queue entry Cell * cellnew_pool(void) { int cnt; Cell *p; Cell *pool; while (1) { // try for quick allocation p = cellpool; // bug out if we got it if (p != NULL) { cellpool = p->next; break; } // go to the heap to replenish the pool cnt = 1000; p = xalloc(cnt,sizeof(Cell)); // link up the entries pool = NULL; for (; cnt > 0; --cnt, ++p) { p->next = pool; pool = p; } // put this "online" cellpool = pool; } return p; } // cellfree_pool -- release a queue entry void cellfree_pool(Cell *p) { p->next = cellpool; cellpool = p; } // cellnew_std -- allocate a queue entry Cell * cellnew_std(void) { Cell *p; p = xalloc(1,sizeof(Cell)); return p; } // cellfree_std -- release a queue entry void cellfree_std(Cell *p) { free(p); } void enqueue(TQueue *queue, Task *t) { Cell *p; lock_acquire(&task_mutex); p = cellnew(); p->next = NULL; p->t = t; if (queue->tail == NULL) { queue->tail = p; queue->head = p; } else { queue->tail->next = p; queue->tail = p; } lock_release(&task_mutex); } Task * dequeue(TQueue *queue) { Task *t; lock_acquire(&task_mutex); if (empty(queue)) t = NULL; else { Cell *p = queue->head; if (p == queue->tail) queue->tail = NULL; queue->head = p->next; t = p->t; cellfree(p); } lock_release(&task_mutex); return t; } void * work(void *arg) { Thread *tskcur = arg; TQueue *queue = tskcur->queue; Task *t; Log *log; long cnt; int tprev; byte *bitv; double tvbeg; double tvbef; double tvaft; log = tskcur->log; bitv = tskcur->bitv; tvbeg = tvgetf(); tprev = 0; while (1) { tvbef = tvgetf(); t = dequeue(queue); tvaft = tvgetf(); if (t == NULL) break; // abort if we get a double entry if (btvset(bitv,t->number)) sysfault("work: duplicaten"); if (opt_p) { printf("[%.9f/%.9f %5.5d] %d [%d]n", tvbef,tvaft - tvbef,tskcur->xid,t->number,t->number - tprev); tprev = t->number; continue; } log->tvbef = tvbef; log->tvaft = tvaft; log->number = t->number; ++log; } if (! opt_p) { tvaft = tvgetf(); cnt = log - tskcur->log; log = tskcur->log; lock_acquire(&print_mutex); printf("n"); printf("THREAD=%5.5d START=%.9f STOP=%.9f ELAP=%.9f TOTAL=%ldn", tskcur->xid,tvbeg,tvaft,tvaft - tvbeg,cnt); tprev = 0; for (; cnt > 0; --cnt, ++log) { printf("[%.9f/%.9f %5.5d] %d [%d]n", log->tvbef,log->tvaft - log->tvbef,tskcur->xid, log->number,log->number - tprev); tprev = log->number; } lock_release(&print_mutex); } return (void *) 0; } void btvchk(Thread *tska,Thread *tskb) { byte *btva; byte *btvb; byte aval; byte bval; int idx; printf("btvchk: %d ??? %dn",tska->xid,tskb->xid); btva = tska->bitv; btvb = tskb->bitv; // abort if we get overlapping entries between two threads for (idx = 0; idx < bitvlen; ++idx) { aval = btva[idx]; bval = btvb[idx]; if (aval & bval) sysfault("btvchk: duplicaten"); } } // For a simple test i runned this on main: int main(int argc,char **argv) { char *cp; TQueue *queue; Task *t; Thread *tsk; --argc; ++argv; for (; argc > 0; --argc, ++argv) { cp = *argv; if (*cp != '-') break; switch (cp[1]) { case 'p': // print immediately opt_p = 1; break; case 'Q': // number of queue items opt_Q = atoi(cp + 2); break; case 'T': // number of threads opt_T = atoi(cp + 2); break; case 'L': opt_L = 1; break; case 'M': opt_M = 1; break; default: break; } } printf("p=%d -- thread log is %sn",opt_p,opt_p ? "immediate" : "deferred"); if (opt_T == 0) opt_T = 16; printf("T=%d (number of threads)n",opt_T); if (opt_Q == 0) opt_Q = 1000000; printf("Q=%d (number of items to enqueue)n",opt_Q); printf("L=%d -- lock is %sn",opt_L,opt_L ? "ticket" : "mutex"); printf("M=%d -- queue item allocation is %sn", opt_M,opt_M ? "pooled" : "malloc/free"); tvzero = tvgetf(); lock_init(&task_mutex); lock_init(&print_mutex); // select queue item allocation strategy switch (opt_M) { case 1: cellnew = cellnew_pool; cellfree = cellfree_pool; break; default: cellnew = cellnew_std; cellfree = cellfree_std; break; } queue = xalloc(1,sizeof(TQueue)); startQueue(queue); Thread threads[opt_T]; // get byte length of bit vectors bitvlen = BTVLEN(opt_Q + 1); // allocate per-thread log buffers for (int i = 0; i < opt_T; i++) { tsk = &threads[i]; if (! opt_p) tsk->log = xalloc(opt_Q,sizeof(Log)); tsk->bitv = xalloc(bitvlen,sizeof(byte)); } // allocate "work to do" t = xalloc(opt_Q,sizeof(Task)); // add to master queue for (int i = 0; i < opt_Q; i++) { t[i].number = i + 1; enqueue(queue, &t[i]); } // fire up the threads for (int i = 0; i < opt_T; i++) { tsk = &threads[i]; tsk->xid = i + 1; tsk->queue = queue; pthread_create(&tsk->tid, NULL, work, tsk); } // wait for threads to complete for (int i = 0; i < opt_T; i++) { tsk = &threads[i]; pthread_join(tsk->tid, NULL); } // wait for threads to complete for (int i = 0; i < opt_T; i++) { for (int j = i + 1; j < opt_T; j++) btvchk(&threads[i],&threads[j]); } printf("TOTAL: %.9fn",tvgetf()); free(t); return 0; } 

      以上就是c/c++开发分享C Pthreads – 线程安全队列实现的问题相关内容,想了解更多C/C++开发(异常处理)及C/C++游戏开发关注计算机技术网(www.ctvol.com)!)。




      上一篇 2020年12月9日
      下一篇 2020年12月9日
