背景 #
在音视频方向中,线程分为普通线程和GL线程(OpenGL线程)。GL线程中可以执行OpenGL相关的语句,用于图像渲染工作,而普通线程只能执行普通语句。
在项目开发中,有时会在普通线程中需要执行某些必须在GL线程下执行的任务(例如某些初始化工作或释放GL相关对象)。执行完这些任务后,需要继续执行普通线程的任务,就像在同一个线程中执行一样:
void func() {
task1();
task2(); // 需要在GL线程执行
task3();
}
分析 #
关键点在于:task3()
必须在task2()
执行完毕后才能执行。然而,由于task2()
是在其他线程中运行的,无法起到阻塞执行的效果。
一种解决方案是使用条件变量:
void task2() {
...
notify();
}
void func() {
task1();
task2(); // 需要在GL线程执行
wait();
task3();
}
普通线程在task2()
后使用wait()
阻塞,待GL线程中的任务执行完毕后,通过notify()
唤醒普通线程,从而实现顺序执行。但这种方法非常麻烦,代码可读性差,且不通用。
解决方案 #
在之前的文章中,我使用C++的future
封装了一套函数,可以方便地跨线程阻塞调度任务。然而,对于纯C语言开发的项目,没有C++的future
,实现类似需求就比较困难。不过,困难也得解决,于是有了下面的代码。
函数设计 #
#ifndef __GL_DISPATCH_H__
#define __GL_DISPATCH_H__
#include <stdbool.h>
typedef struct Dispatcher Dispatcher;
typedef void (*DispatcherFunc)(void* arg);
/**
* @brief 创建一个实例,内部会常驻一个GL线程,外部可以将某些任务丢到此线程里执行,可以选择是否阻塞执行
*/
Dispatcher* Dispatcher_create();
/**
* @brief 销毁实例
*/
void Dispatcher_destroy(Dispatcher** dispatcher_p);
/**
* @brief 利用此函数将任务丢到线程里执行
*
* @param dispatcher 实例
* @param func 要执行的函数
* @param arg 函数参数
* @param block 选择是否阻塞执行
*/
void Dispatcher_run(Dispatcher* dispatcher, DispatcherFunc func, void* arg, bool block);
#endif // __GL_DISPATCH_H__
主要功能在Dispatcher_run
函数中,该函数可以选择是否阻塞执行。这里可以思考一下如何实现阻塞的需求?条件变量是肯定需要的,但如何保证条件变量等待的是当前事件呢?直接看代码实现吧。
代码实现 #
#include <stdatomic.h>
#include "libctools.h"
#include "gldispatch.h"
typedef struct Task {
DispatcherFunc func;
void* arg;
Mutex* mutex;
Cond* cond;
bool is_continue;
bool is_block;
} Task;
struct Dispatcher {
Thread* thread_id;
Thread _thread_id;
Mutex* mutex;
Cond* cond;
List* task_queue;
atomic_bool is_interrupt;
};
static void taskDestroy(Task* task) {
if (task->mutex) {
mutex_destroyp(&task->mutex);
}
if (task->cond) {
cond_destroyp(&task->cond);
}
free((void*)task);
}
static Task* taskCreate(DispatcherFunc func, void* arg, bool is_block) {
Task* task = (Task*)calloc(1, sizeof(Task));
if (!task) return NULL;
task->func = func;
task->arg = arg;
task->is_block = is_block;
if (is_block) {
task->mutex = mutex_create();
task->cond = cond_create();
}
return task;
}
static int Dispatcher_process(void* arg) {
log_info("%s start", __func__);
Dispatcher* self = (Dispatcher*)arg;
while (!atomic_load(&self->is_interrupt)) {
mutex_lock(self->mutex);
while (self->task_queue->len == 0 && !atomic_load(&self->is_interrupt)) {
cond_wait(self->cond, self->mutex);
}
ListNode* node = list_pop_front(self->task_queue);
mutex_unlock(self->mutex);
if (atomic_load(&self->is_interrupt)) {
break;
}
if (node) {
Task* task = (Task*)node->val;
task->func(task->arg);
if (task->is_block) {
mutex_lock(task->mutex);
task->is_continue = true;
cond_signal(task->cond);
mutex_unlock(task->mutex);
} else {
taskDestroy(task);
}
free(node);
}
}
log_info("%s stop", __func__);
return 0;
}
static void Dispatcher_start(Dispatcher* dispatcher) {
dispatcher->thread_id =
thread_create_with_name(&dispatcher->_thread_id, Dispatcher_process, dispatcher, "Dispatcher_process");
}
static void Dispatcher_stop(Dispatcher* dispatcher) {
atomic_store(&dispatcher->is_interrupt, true);
cond_signal(dispatcher->cond);
list_clear(dispatcher->task_queue, true);
if (dispatcher->thread_id) {
thread_wait(dispatcher->thread_id, NULL);
}
}
Dispatcher* Dispatcher_create() {
Dispatcher* self = (Dispatcher*)calloc(1, sizeof(Dispatcher));
if (!self) return NULL;
self->mutex = mutex_create();
self->cond = cond_create();
self->task_queue = list_create();
self->task_queue->free_func = (void (*)(int64_t))taskDestroy;
Dispatcher_start(self);
return self;
}
void Dispatcher_destroy(Dispatcher** dispatcher_p) {
if (NULL == dispatcher_p || NULL == *dispatcher_p) return;
Dispatcher* self = *dispatcher_p;
Dispatcher_stop(self);
mutex_destroyp(&self->mutex);
cond_destroyp(&self->cond);
list_destroy(self->task_queue);
freep((void**)dispatcher_p);
}
void Dispatcher_run(Dispatcher* dispatcher, DispatcherFunc func, void* arg, bool block) {
if (!dispatcher) return;
Task* task = taskCreate(func, arg, block);
if (task) {
ListNode* node = list_node_new((int64_t)task);
mutex_lock(dispatcher->mutex);
list_push_back(dispatcher->task_queue, node);
cond_signal(dispatcher->cond);
mutex_unlock(dispatcher->mutex);
if (task->is_block) {
mutex_lock(task->mutex);
while (!task->is_continue) {
cond_wait(task->cond, task->mutex);
}
mutex_unlock(task->mutex);
taskDestroy(task);
}
}
}
使用方式 #
void func1(void* arg) {
print("hello func1");
}
void func2(void* arg) {
print("hello func2");
}
int main() {
Dispatcher* dispatcher = Dispatcher_create();
Dispatcher_run(dispatcher, func1, NULL, true);
Dispatcher_run(dispatcher, func2, NULL, true);
Dispatcher_destroy(&dispatcher);
return 0;
}
Tips #
代码中的log
、mutex
、cond
、thread
、list
都是二次封装的函数,功能包括日志输出、加解锁、条件变量、创建线程以及C语言的链表。这里不贴出它们的实现,大家可以自己实现一套,当作一个小练习,不难。