技术专栏
简单线程池
线程池,简单来说就是有一堆已经创建好的线程(最大数目一定),初始时他们都处于空闲状态,当有新的任务进来,从线程池中取出一个空闲的线程处理任务,然后当任务处理完成之后,该线程被重新放回到线程池中,供其他的任务使用,当线程池中的线程都在处理任务时,就没有空闲线程供使用,此时,若有新的任务产生,只能等待线程池中有线程结束任务空闲才能执行。
首先是用条件变量和互斥量封装的一个状态,用于保护线程池的状态
condition.h
#ifndef _CONDITION_H_
#define _CONDITION_H_
#include <pthread.h>
//封装一个互斥量和条件变量作为状态
typedef struct condition
{
pthread_mutex_t pmutex;
pthread_cond_t pcond;
}condition_t;
//对状态的操作函数
int condition_init(condition_t *cond);
int condition_lock(condition_t *cond);
int condition_unlock(condition_t *cond);
int condition_wait(condition_t *cond);
int condition_timedwait(condition_t *cond, const struct timespec *abstime);
int condition_signal(condition_t* cond);
int condition_broadcast(condition_t *cond);
int condition_destroy(condition_t *cond);
#endif
condition.c
#include "condition.h"
//初始化
int condition_init(condition_t *cond)
{
int status;
if((status = pthread_mutex_init(&cond->pmutex, NULL)))
return status;
if((status = pthread_cond_init(&cond->pcond, NULL)))
return status;
return 0;
}
//加锁
int condition_lock(condition_t *cond)
{
return pthread_mutex_lock(&cond->pmutex);
}
//解锁
int condition_unlock(condition_t *cond)
{
return pthread_mutex_unlock(&cond->pmutex);
}
//等待
int condition_wait(condition_t *cond)
{
return pthread_cond_wait(&cond->pcond, &cond->pmutex);
}
//固定时间等待
int condition_timedwait(condition_t *cond, const struct timespec *abstime)
{
return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime);
}
//唤醒一个睡眠线程
int condition_signal(condition_t* cond)
{
return pthread_cond_signal(&cond->pcond);
}
//唤醒所有睡眠线程
int condition_broadcast(condition_t *cond)
{
return pthread_cond_broadcast(&cond->pcond);
}
//释放
int condition_destroy(condition_t *cond)
{
int status;
if((status = pthread_mutex_destroy(&cond->pmutex)))
return status;
if((status = pthread_cond_destroy(&cond->pcond)))
return status;
return 0;
}
然后是线程池对应的threadpool.h和threadpool.c
#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_
//线程池头文件
#include "condition.h"
//封装线程池中的对象需要执行的任务对象
typedef struct task
{
void *(*run)(void *args); //函数指针,需要执行的任务
void *arg; //参数
struct task *next; //任务队列中下一个任务
}task_t;
//下面是线程池结构体
typedef struct threadpool
{
condition_t ready; //状态量
task_t *first; //任务队列中第一个任务
task_t *last; //任务队列中最后一个任务
int counter; //线程池中已有线程数
int idle; //线程池中kongxi线程数
int max_threads; //线程池最大线程数
int quit; //是否退出标志
}threadpool_t;
//线程池初始化
void threadpool_init(threadpool_t *pool, int threads);
//往线程池中加入任务
void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg);
//摧毁线程池
void threadpool_destroy(threadpool_t *pool);
#endif
#include "threadpool.h"
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <time.h>
//创建的线程执行
void *thread_routine(void *arg)
{
struct timespec abstime;
int timeout;
printf("thread %d is starting\n", (int)pthread_self());
threadpool_t *pool = (threadpool_t *)arg;
while(1)
{
timeout = 0;
//访问线程池之前需要加锁
condition_lock(&pool->ready);
//空闲
pool->idle++;
//等待队列有任务到来 或者 收到线程池销毁通知
while(pool->first == NULL && !pool->quit)
{
//否则线程阻塞等待
printf("thread %d is waiting\n", (int)pthread_self());
//获取从当前时间,并加上等待时间, 设置进程的超时睡眠时间
clock_gettime(CLOCK_REALTIME, &abstime);
abstime.tv_sec += 2;
int status;
status = condition_timedwait(&pool->ready, &abstime); //该函数会解锁,允许其他线程访问,当被唤醒时,加锁
if(status == ETIMEDOUT)
{
printf("thread %d wait timed out\n", (int)pthread_self());
timeout = 1;
break;
}
}
pool->idle--;
if(pool->first != NULL)
{
//取出等待队列最前的任务,移除任务,并执行任务
task_t *t = pool->first;
pool->first = t->next;
//由于任务执行需要消耗时间,先解锁让其他线程访问线程池
condition_unlock(&pool->ready);
//执行任务
t->run(t->arg);
//执行完任务释放内存
free(t);
//重新加锁
condition_lock(&pool->ready);
}
//退出线程池
if(pool->quit && pool->first == NULL)
{
pool->counter--;//当前工作的线程数-1
//若线程池中没有线程,通知等待线程(主线程)全部任务已经完成
if(pool->counter == 0)
{
condition_signal(&pool->ready);
}
condition_unlock(&pool->ready);
break;
}
//超时,跳出销毁线程
if(timeout == 1)
{
pool->counter--;//当前工作的线程数-1
condition_unlock(&pool->ready);
break;
}
condition_unlock(&pool->ready);
}
printf("thread %d is exiting\n", (int)pthread_self());
return NULL;
}
//线程池初始化
void threadpool_init(threadpool_t *pool, int threads)
{
condition_init(&pool->ready);
pool->first = NULL;
pool->last =NULL;
pool->counter =0;
pool->idle =0;
pool->max_threads = threads;
pool->quit =0;
}
//增加一个任务到线程池
void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg)
{
//产生一个新的任务
task_t *newtask = (task_t *)malloc(sizeof(task_t));
newtask->run = run;
newtask->arg = arg;
newtask->next=NULL;//新加的任务放在队列尾端
//线程池的状态被多个线程共享,操作前需要加锁
condition_lock(&pool->ready);
if(pool->first == NULL)//第一个任务加入
{
pool->first = newtask;
}
else
{
pool->last->next = newtask;
}
pool->last = newtask; //队列尾指向新加入的线程
//线程池中有线程空闲,唤醒
if(pool->idle > 0)
{
condition_signal(&pool->ready);
}
//当前线程池中线程个数没有达到设定的最大值,创建一个新的线性
else if(pool->counter < pool->max_threads)
{
pthread_t tid;
pthread_create(&tid, NULL, thread_routine, pool);
pool->counter++;
}
//结束,访问
condition_unlock(&pool->ready);
}
//线程池销毁
void threadpool_destroy(threadpool_t *pool)
{
//如果已经调用销毁,直接返回
if(pool->quit)
{
return;
}
//加锁
condition_lock(&pool->ready);
//设置销毁标记为1
pool->quit = 1;
//线程池中线程个数大于0
if(pool->counter > 0)
{
//对于等待的线程,发送信号唤醒
if(pool->idle > 0)
{
condition_broadcast(&pool->ready);
}
//正在执行任务的线程,等待他们结束任务
while(pool->counter)
{
condition_wait(&pool->ready);
}
}
condition_unlock(&pool->ready);
condition_destroy(&pool->ready);
}
测试代码:
#include "threadpool.h"
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
void* mytask(void *arg)
{
printf("thread %d is working on task %d\n", (int)pthread_self(), *(int*)arg);
sleep(1);
free(arg);
return NULL;
}
//测试代码
int main(void)
{
threadpool_t pool;
//初始化线程池,最多三个线程
threadpool_init(&pool, 3);
int i;
//创建十个任务
for(i=0; i < 10; i++)
{
int *arg = malloc(sizeof(int));
*arg = i;
threadpool_add_task(&pool, mytask, arg);
}
threadpool_destroy(&pool);
return 0;
}
声明:本文内容由易百纳平台入驻作者撰写,文章观点仅代表作者本人,不代表易百纳立场。如有内容侵权或者其他问题,请联系本站进行删除。
红包
点赞
收藏
评论
打赏
- 分享
- 举报
评论
0个
手气红包
暂无数据
相关专栏
-
浏览量:1233次2022-10-10 10:18:49
-
2022-11-08 12:06:09
-
浏览量:13635次2020-12-17 14:44:21
-
浏览量:1811次2019-07-04 12:02:50
-
2022-11-10 10:36:09
-
浏览量:1565次2022-01-02 09:00:38
-
浏览量:4813次2020-08-19 22:25:39
-
浏览量:1608次2022-12-05 18:44:16
-
浏览量:2592次2020-07-09 09:39:40
-
浏览量:1849次2022-02-06 09:00:21
-
浏览量:4180次2020-10-28 23:08:53
-
浏览量:6482次2020-10-28 23:03:59
-
浏览量:1732次2019-12-05 16:34:01
-
浏览量:1693次2020-08-07 17:02:17
-
浏览量:1212次2022-10-10 11:44:39
-
浏览量:1315次2023-01-04 09:49:23
-
浏览量:4711次2021-09-16 13:47:50
-
浏览量:1597次2020-08-10 09:42:52
-
浏览量:2023次2020-08-14 18:38:14
置顶时间设置
结束时间
删除原因
-
广告/SPAM
-
恶意灌水
-
违规内容
-
文不对题
-
重复发帖
打赏作者
big_anana
您的支持将鼓励我继续创作!
打赏金额:
¥1
¥5
¥10
¥50
¥100
支付方式:
微信支付
打赏成功!
感谢您的打赏,如若您也想被打赏,可前往 发表专栏 哦~
举报反馈
举报类型
- 内容涉黄/赌/毒
- 内容侵权/抄袭
- 政治相关
- 涉嫌广告
- 侮辱谩骂
- 其他
详细说明
审核成功
发布时间设置
发布时间:
请选择发布时间设置
是否关联周任务-专栏模块
审核失败
失败原因
请选择失败原因
备注
请输入备注