`
hulianwang2014
  • 浏览: 692042 次
文章分类
社区版块
存档分类
最新评论
  • bcworld: 排版成这样,一点看的欲望都没有了
    jfinal

【Linux】生产者消费者编程实现-线程池+信号量

 
阅读更多

生产者消费者编程实现,采用了线程池以及信号量技术。


线程的概念就不多说,首先说一下多线程的好处:多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。

那么为什么又需要线程池呢?

我们知道应用程序创建一个对象,然后销毁对象是很耗费资源的。创建线程,销毁线程,也是如此。因此,我们就预先生成一些线程,等到我们使用的时候在进行调度,于是,一些"池化资源"技术就这样的产生了。



一般一个简单线程池至少包含下列组成部分。

1)线程池管理器(ThreadPoolManager:用于创建并管理线程池

2)工作线程(WorkThread:线程池中线程

3)任务接口(Task:每个任务必须实现的接口,以供工作线程调度任务的执行。

4)任务队列:用于存放没有处理的任务。提供一种缓冲机制。

图示:


图1 线程池图解


生产者消费者模型C语言代码实现:

thread_pool_pv.h:

  1. //线程池编程实现
  2. #ifndefTHREAD_POOL_H
  3. #defineTHREAD_POOL_H
  4. #include<stdio.h>
  5. #include<stdlib.h>
  6. #include<semaphore.h>//信号量sem_t
  7. #include<pthread.h>
  8. //任务接口,线程调用的函数
  9. typedefvoid*(*FUNC)(void*arg);
  10. //任务数据结构
  11. typedefstructthread_pool_job_s{
  12. FUNCfunction;//线程调用的函数
  13. void*arg;//函数参数
  14. structthread_pool_job_s*pre;//指向上一个节点
  15. structthread_pool_job_s*next;//指向下一个节点
  16. }thread_pool_job;
  17. //工作队列
  18. typedefstructthread_pool_job_queue_s{
  19. thread_pool_job*head;//队列头指针
  20. thread_pool_job*tail;//队列尾指针
  21. intnum;//任务数目
  22. sem_t*quene_sem;//信号量
  23. }thread_pool_job_queue;
  24. //线程池(存放消费者进程)
  25. typedefstructthread_pool_s{
  26. pthread_t*threads;//线程
  27. intthreads_num;//线程数目
  28. thread_pool_job_queue*job_queue;//指向工作队列的指针
  29. }thread_pool;
  30. //typedefstructthread_data_s{
  31. //pthread_mutex_t*mutex_t;//互斥量
  32. //thread_pool*tp_p;//指向线程池的指针
  33. //}thread_data;
  34. //初始化线程池
  35. thread_pool*tp_init(intthread_num);
  36. //初始化工作队列
  37. inttp_job_quene_init(thread_pool*tp);
  38. //向工作队列中添加一个元素
  39. voidtp_job_quene_add(thread_pool*tp,thread_pool_job*new_job);
  40. //向线程池中添加一个工作项
  41. inttp_add_work(thread_pool*tp,void*(*func_p)(void*),void*arg);
  42. //取得工作队列的最后个节点
  43. thread_pool_job*tp_get_lastjob(thread_pool*tp);
  44. //删除工作队列的最后个节点
  45. inttp_delete__lastjob(thread_pool*tp);
  46. //销毁线程池
  47. voidtp_destroy(thread_pool*tp);
  48. //消费者线程函数
  49. void*tp_thread_func(thread_pool*tp);
  50. //生产者线程执行函数
  51. void*thread_func_producer(thread_pool*tp);
  52. #endif



thread_pool_pv.c:
  1. //线程池编程实现
  2. #include"thread_pool.h"
  3. //互斥量,用于对工作队列的访问
  4. pthread_mutex_tmutex=PTHREAD_MUTEX_INITIALIZER;
  5. //标记线程池是否处于可用状态
  6. staticinttp_alive=1;
  7. //初始化线程池
  8. thread_pool*tp_init(intthread_num){
  9. thread_pool*tp;
  10. inti;
  11. if(thread_num<1)
  12. thread_num=1;
  13. tp=(thread_pool*)malloc(sizeof(thread_pool));
  14. //判断内存分配是否成功
  15. if(NULL==tp){
  16. printf("ERROR:allocatememoryforthread_poolfailed\n");
  17. returnNULL;
  18. }
  19. tp->threads_num=thread_num;
  20. //分配线程所占内存空间
  21. tp->threads=(pthread_t*)malloc(thread_num*sizeof(pthread_t));
  22. //判断内存分配是否成功
  23. if(NULL==tp->threads){
  24. printf("ERROR:allocatememoryforthreadsinthreadpoolfailed\n");
  25. returnNULL;
  26. }
  27. if(tp_job_quene_init(tp))
  28. returnNULL;
  29. tp->job_queue->quene_sem=(sem_t*)malloc(sizeof(sem_t));
  30. sem_init(tp->job_queue->quene_sem,0,0);//信号量初始化
  31. //初始化线程
  32. for(i=0;i<thread_num;++i){
  33. pthread_create(&(tp->threads[i]),NULL,(void*)tp_thread_func,(void*)tp);
  34. }
  35. returntp;
  36. }
  37. //初始化工作队列
  38. inttp_job_quene_init(thread_pool*tp){
  39. tp->job_queue=(thread_pool_job_queue*)malloc(sizeof(thread_pool_job_queue));
  40. if(NULL==tp->job_queue){
  41. return-1;
  42. }
  43. tp->job_queue->head=NULL;
  44. tp->job_queue->tail=NULL;
  45. tp->job_queue->num=0;
  46. return0;
  47. }
  48. //线程函数
  49. void*tp_thread_func(thread_pool*tp){
  50. FUNCfunction;
  51. void*arg_buf;
  52. thread_pool_job*job_p;
  53. while(tp_alive){
  54. //线程阻塞,等待信号量
  55. if(sem_wait(tp->job_queue->quene_sem)){
  56. printf("threadwaitingforsemaphore....\n");
  57. exit(1);
  58. }
  59. if(tp_alive){
  60. pthread_mutex_lock(&mutex);
  61. job_p=tp_get_lastjob(tp);
  62. if(NULL==job_p){
  63. pthread_mutex_unlock(&mutex);
  64. continue;
  65. }
  66. function=job_p->function;
  67. arg_buf=job_p->arg;
  68. if(tp_delete__lastjob(tp))
  69. return;
  70. pthread_mutex_unlock(&mutex);
  71. //运行指定的线程函数
  72. printf("consumer...getajobfromjobqueneandrunit!\n");
  73. function(arg_buf);
  74. free(job_p);
  75. }
  76. else
  77. return;
  78. }
  79. return;
  80. }
  81. //向工作队列中添加一个元素
  82. voidtp_job_quene_add(thread_pool*tp,thread_pool_job*new_job){
  83. new_job->pre=NULL;
  84. new_job->next=NULL;
  85. thread_pool_job*old_head_job=tp->job_queue->head;
  86. if(NULL==old_head_job){
  87. tp->job_queue->head=new_job;
  88. tp->job_queue->tail=new_job;
  89. }
  90. else{
  91. old_head_job->pre=new_job;
  92. new_job->next=old_head_job;
  93. tp->job_queue->head=new_job;
  94. }
  95. ++(tp->job_queue->num);
  96. sem_post(tp->job_queue->quene_sem);
  97. }
  98. //取得工作队列的最后一个节点
  99. thread_pool_job*tp_get_lastjob(thread_pool*tp){
  100. returntp->job_queue->tail;
  101. }
  102. //删除工作队列的最后个节点
  103. inttp_delete__lastjob(thread_pool*tp){
  104. if(NULL==tp)
  105. return-1;
  106. thread_pool_job*last_job=tp->job_queue->tail;
  107. if(0==tp->job_queue->num){
  108. return-1;
  109. }
  110. elseif(1==tp->job_queue->num){
  111. tp->job_queue->head=NULL;
  112. tp->job_queue->tail=NULL;
  113. }
  114. else{
  115. last_job->pre->next=NULL;
  116. tp->job_queue->tail=last_job->pre;
  117. }
  118. //修改相关变量
  119. --(tp->job_queue->num);
  120. return0;
  121. }
  122. //向线程池中添加一个工作项
  123. inttp_add_work(thread_pool*tp,void*(*func_p)(void*),void*arg){
  124. thread_pool_job*new_job=(thread_pool_job*)malloc(sizeof(thread_pool_job));
  125. if(NULL==new_job){
  126. printf("ERROR:allocatememoryfornewjobfailed!\n");
  127. exit(1);
  128. }
  129. new_job->function=func_p;
  130. new_job->arg=arg;
  131. pthread_mutex_lock(&mutex);
  132. tp_job_quene_add(tp,new_job);
  133. pthread_mutex_unlock(&mutex);
  134. }
  135. //销毁线程池
  136. voidtp_destroy(thread_pool*tp){
  137. inti;
  138. tp_alive=0;
  139. //等待线程运行结束
  140. //sleep(10);
  141. for(i=0;i<tp->threads_num;++i){
  142. pthread_join(tp->threads[i],NULL);
  143. }
  144. free(tp->threads);
  145. if(sem_destroy(tp->job_queue->quene_sem)){
  146. printf("ERROR:destroysemaphorefailed!\n");
  147. }
  148. free(tp->job_queue->quene_sem);
  149. //删除job队列
  150. thread_pool_job*current_job=tp->job_queue->tail;
  151. while(tp->job_queue->num){
  152. tp->job_queue->tail=current_job->pre;
  153. free(current_job);
  154. current_job=tp->job_queue->tail;
  155. --(tp->job_queue->num);
  156. }
  157. tp->job_queue->head=NULL;
  158. tp->job_queue->tail=NULL;
  159. }
  160. //自定义线程执行函数
  161. void*thread_func1(){
  162. printf("Task1running...byThread:%u\n",(unsignedint)pthread_self());
  163. }
  164. //自定义线程执行函数
  165. void*thread_func2(){
  166. printf("Task2running...byThread:%u\n",(unsignedint)pthread_self());
  167. }
  168. //生产者线程执行函数
  169. void*thread_func_producer(thread_pool*tp){
  170. while(1){
  171. printf("producer...addajob(job1)tojobquene!\n");
  172. tp_add_work(tp,(void*)thread_func1,NULL);
  173. sleep(1);
  174. printf("producer...addajob(job2)tojobquene!\n");
  175. tp_add_work(tp,(void*)thread_func2,NULL);
  176. }
  177. }
  178. intmain(){
  179. thread_pool*tp=tp_init(5);
  180. inti;
  181. intarg=7;
  182. pthread_tproducer_thread_id;//生产者线程ID
  183. pthread_create(&producer_thread_id,NULL,(void*)thread_func_producer,(void*)tp);
  184. pthread_join(producer_thread_id,NULL);
  185. tp_destroy(tp);
  186. return0;
  187. }


运行结果:
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics