生产者消费者编程实现,采用了线程池以及信号量技术。
线程的概念就不多说,首先说一下多线程的好处:多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。
那么为什么又需要线程池呢?
我们知道应用程序创建一个对象,然后销毁对象是很耗费资源的。创建线程,销毁线程,也是如此。因此,我们就预先生成一些线程,等到我们使用的时候在进行调度,于是,一些"池化资源"技术就这样的产生了。
一般一个简单线程池至少包含下列组成部分。
1)线程池管理器(ThreadPoolManager):用于创建并管理线程池
2)工作线程(WorkThread):线程池中线程
3)任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行。
4)任务队列:用于存放没有处理的任务。提供一种缓冲机制。
图示:
图1 线程池图解
生产者消费者模型C语言代码实现:
thread_pool_pv.h:
- #ifndefTHREAD_POOL_H
- #defineTHREAD_POOL_H
- #include<stdio.h>
- #include<stdlib.h>
- #include<semaphore.h>//信号量sem_t
- #include<pthread.h>
- typedefvoid*(*FUNC)(void*arg);
- typedefstructthread_pool_job_s{
- FUNCfunction;
- void*arg;
- structthread_pool_job_s*pre;
- structthread_pool_job_s*next;
- }thread_pool_job;
- typedefstructthread_pool_job_queue_s{
- thread_pool_job*head;
- thread_pool_job*tail;
- intnum;
- sem_t*quene_sem;
- }thread_pool_job_queue;
- typedefstructthread_pool_s{
- pthread_t*threads;
- intthreads_num;
- thread_pool_job_queue*job_queue;
- }thread_pool;
- thread_pool*tp_init(intthread_num);
- inttp_job_quene_init(thread_pool*tp);
- voidtp_job_quene_add(thread_pool*tp,thread_pool_job*new_job);
- inttp_add_work(thread_pool*tp,void*(*func_p)(void*),void*arg);
- thread_pool_job*tp_get_lastjob(thread_pool*tp);
- inttp_delete__lastjob(thread_pool*tp);
- voidtp_destroy(thread_pool*tp);
- void*tp_thread_func(thread_pool*tp);
- void*thread_func_producer(thread_pool*tp);
- #endif
thread_pool_pv.c:
- #include"thread_pool.h"
- pthread_mutex_tmutex=PTHREAD_MUTEX_INITIALIZER;
- staticinttp_alive=1;
- thread_pool*tp_init(intthread_num){
- thread_pool*tp;
- inti;
- if(thread_num<1)
- thread_num=1;
- tp=(thread_pool*)malloc(sizeof(thread_pool));
- if(NULL==tp){
- printf("ERROR:allocatememoryforthread_poolfailed\n");
- returnNULL;
- }
- tp->threads_num=thread_num;
- tp->threads=(pthread_t*)malloc(thread_num*sizeof(pthread_t));
- if(NULL==tp->threads){
- printf("ERROR:allocatememoryforthreadsinthreadpoolfailed\n");
- returnNULL;
- }
- if(tp_job_quene_init(tp))
- returnNULL;
- tp->job_queue->quene_sem=(sem_t*)malloc(sizeof(sem_t));
- sem_init(tp->job_queue->quene_sem,0,0);
- for(i=0;i<thread_num;++i){
- pthread_create(&(tp->threads[i]),NULL,(void*)tp_thread_func,(void*)tp);
- }
- returntp;
- }
- inttp_job_quene_init(thread_pool*tp){
- tp->job_queue=(thread_pool_job_queue*)malloc(sizeof(thread_pool_job_queue));
- if(NULL==tp->job_queue){
- return-1;
- }
- tp->job_queue->head=NULL;
- tp->job_queue->tail=NULL;
- tp->job_queue->num=0;
- return0;
- }
- void*tp_thread_func(thread_pool*tp){
- FUNCfunction;
- void*arg_buf;
- thread_pool_job*job_p;
- while(tp_alive){
- if(sem_wait(tp->job_queue->quene_sem)){
- printf("threadwaitingforsemaphore....\n");
- exit(1);
- }
- if(tp_alive){
- pthread_mutex_lock(&mutex);
- job_p=tp_get_lastjob(tp);
- if(NULL==job_p){
- pthread_mutex_unlock(&mutex);
- continue;
- }
- function=job_p->function;
- arg_buf=job_p->arg;
- if(tp_delete__lastjob(tp))
- return;
- pthread_mutex_unlock(&mutex);
- printf("consumer...getajobfromjobqueneandrunit!\n");
- function(arg_buf);
- free(job_p);
- }
- else
- return;
- }
- return;
- }
- voidtp_job_quene_add(thread_pool*tp,thread_pool_job*new_job){
- new_job->pre=NULL;
- new_job->next=NULL;
- thread_pool_job*old_head_job=tp->job_queue->head;
- if(NULL==old_head_job){
- tp->job_queue->head=new_job;
- tp->job_queue->tail=new_job;
- }
- else{
- old_head_job->pre=new_job;
- new_job->next=old_head_job;
- tp->job_queue->head=new_job;
- }
- ++(tp->job_queue->num);
- sem_post(tp->job_queue->quene_sem);
- }
- thread_pool_job*tp_get_lastjob(thread_pool*tp){
- returntp->job_queue->tail;
- }
- inttp_delete__lastjob(thread_pool*tp){
- if(NULL==tp)
- return-1;
- thread_pool_job*last_job=tp->job_queue->tail;
- if(0==tp->job_queue->num){
- return-1;
- }
- elseif(1==tp->job_queue->num){
- tp->job_queue->head=NULL;
- tp->job_queue->tail=NULL;
- }
- else{
- last_job->pre->next=NULL;
- tp->job_queue->tail=last_job->pre;
- }
- --(tp->job_queue->num);
- return0;
- }
- inttp_add_work(thread_pool*tp,void*(*func_p)(void*),void*arg){
- thread_pool_job*new_job=(thread_pool_job*)malloc(sizeof(thread_pool_job));
- if(NULL==new_job){
- printf("ERROR:allocatememoryfornewjobfailed!\n");
- exit(1);
- }
- new_job->function=func_p;
- new_job->arg=arg;
- pthread_mutex_lock(&mutex);
- tp_job_quene_add(tp,new_job);
- pthread_mutex_unlock(&mutex);
- }
- voidtp_destroy(thread_pool*tp){
- inti;
- tp_alive=0;
- for(i=0;i<tp->threads_num;++i){
- pthread_join(tp->threads[i],NULL);
- }
- free(tp->threads);
- if(sem_destroy(tp->job_queue->quene_sem)){
- printf("ERROR:destroysemaphorefailed!\n");
- }
- free(tp->job_queue->quene_sem);
- thread_pool_job*current_job=tp->job_queue->tail;
- while(tp->job_queue->num){
- tp->job_queue->tail=current_job->pre;
- free(current_job);
- current_job=tp->job_queue->tail;
- --(tp->job_queue->num);
- }
- tp->job_queue->head=NULL;
- tp->job_queue->tail=NULL;
- }
- void*thread_func1(){
- printf("Task1running...byThread:%u\n",(unsignedint)pthread_self());
- }
- void*thread_func2(){
- printf("Task2running...byThread:%u\n",(unsignedint)pthread_self());
- }
- void*thread_func_producer(thread_pool*tp){
- while(1){
- printf("producer...addajob(job1)tojobquene!\n");
- tp_add_work(tp,(void*)thread_func1,NULL);
- sleep(1);
- printf("producer...addajob(job2)tojobquene!\n");
- tp_add_work(tp,(void*)thread_func2,NULL);
- }
- }
- intmain(){
- thread_pool*tp=tp_init(5);
- inti;
- intarg=7;
- pthread_tproducer_thread_id;
- pthread_create(&producer_thread_id,NULL,(void*)thread_func_producer,(void*)tp);
- pthread_join(producer_thread_id,NULL);
- tp_destroy(tp);
- return0;
- }
运行结果:
分享到:
相关推荐
以记录型信号量实现生产者-消费者问题 实验目的: 1.加深对进程同步概念的理解。 2.理解多道程序环境中,不同进程对资源访问及相互合作进程的关系的处理方法。 实验要求: 利用C语言程序模拟生产者-消费者问题和哲学...
参考教材中的生产者消费者算法,创建5个进程,其中两个进程为生产者进程,3个进程为消费者进程。一个生产者进程试图不断地在一个缓冲中写入大写字母,另一个生产者进程试图不断地在缓冲中写入小写字母。3个消费者...
利用互斥锁和计数信号完成生产者消费者问题 一组生产者进程和一组消费者进程共享一个初始为空、大小为n的缓冲区,只有缓冲区没满时,生产者才把消息放入到缓冲区,否则必须等待;只有缓冲区不空时,消费者才能从中...
基于Qt信号量实现的单生产者多消费者模式下的消息队列,可在多线程应用中对不同的消息结构体进行数据传输。
Qt5多线程,使用QSemaphore类实现生产者和消费者问题。
2. 在Linux操作系统上,利用Pthread API提供的信号量机制,编写应用程序实现生产者——消费者问题。 3. 两种环境下,生产者和消费者均作为独立线程,并通过empty、full、mutex三个信号量实现对缓冲进行插入与删除。 ...
1、设计目的:通过研究Linux的进程机制和信号量,实现生产者消费者问题的并发控制。 2、说明:有界缓冲区内设有20个存储单元,放入取出的产品设定为1-20个整数。 3、设计要求: 生产者和消费者进程的数目不固定,可...
在Linux下完整C语言实现生产者消费者问题的代码。其中涉及信号量、多线程、GCC编译、PV操作等基础知识。Linux下通过gcc - o yy xxx.c -pthread,再通过./yy即可运行。
通过研究进程并发和信号量机制,实现生产者-消费者问题的并发控制。 2、设计要求 1)每个生产者和消费者对有界缓冲区进行操作后,即时显示有界缓冲区的全部内容,当前指针位置和生产者/消费者进程的标识符。 说明:...
利用记录型信号量解决生产者-消费者问题.doc
运行环境CentOS7,涉及进程、线程、信号量等知识
设计目的:通过研究Linux 的进程机制和信号量实现生产者消费者问题的并发控制。说明:有界缓冲区内设有20 个存储单元,放入/取出的数据项设定为1‐20 这20 个整型数。设计要求:1)每个生产者和消费者对有界缓冲区...
1、资源内容:基于Python+OpenCV-Python+PyQt5实现基础的图像检索+源代码+文档说明 2、代码特点:内含运行结果,不会运行可私信,参数化编程、参数可方便更改、代码编程思路清晰、注释明细,都经过测试运行成功,...
数字信号处理-理论+算法和实现 湖广书 第三版书和答案
以生产者/消费者问题为例来阐述Linux线程的控制和通信。一组生产者线程与一组消费者线程通过缓冲区发生联系。生产者线程将生产的产品送入缓冲区,消费者线程则从中取出产品。缓冲区有N 个,是一个环形的缓冲池。 ...
用线程做一个生产者-消费者同步问题 其实我是想用线程做一个生产者-消费者同步问题,现在已经成功了,贴出来,大家请指证. 我想让线程thread1和thread2按顺序显示,于是加了个信号量mysem,并且在新建两个线程的两个...
操作系统中经典的互斥信号量机制的应用--生产者、消费者
第二步:实现生产者/消费者问题 1)有一群生产者进程在生产产品,并将这些产品提供给消费者进程去消费。为使生产者进程与消费者进程能并发执行,在两者之间设置了一个具有n个缓冲区的缓冲池:生产者进程从文件中读取...
LinuxC语言实现生产者和消费者模型LinuxC语言实现生产者和消费者模型LinuxC语言实现生产者和消费者模型LinuxC语言实现生产者和消费者模型LinuxC语言实现生产者和消费者模型LinuxC语言实现生产者和消费者模型LinuxC...
1.利用记录型信号量解决生产者-消费者问题.odt1.利用记录型信号量解决生产者-消费者问题.odt1.利用记录型信号量解决生产者-消费者问题.odt