C语言实现C++的future | 跨线程任务调度详解

背景 #

在音视频方向中,线程分为普通线程和GL线程(OpenGL线程)。GL线程中可以执行OpenGL相关的语句,用于图像渲染工作,而普通线程只能执行普通语句。

在项目开发中,有时会在普通线程中需要执行某些必须在GL线程下执行的任务(例如某些初始化工作或释放GL相关对象)。执行完这些任务后,需要继续执行普通线程的任务,就像在同一个线程中执行一样:

void func() {
    task1();
    task2(); // 需要在GL线程执行
    task3();
}

分析 #

关键点在于:task3()必须在task2()执行完毕后才能执行。然而,由于task2()是在其他线程中运行的,无法起到阻塞执行的效果。

一种解决方案是使用条件变量:

void task2() {
    ...
    notify();
}

void func() {
    task1();
    task2(); // 需要在GL线程执行
    wait();
    task3();
}

普通线程在task2()后使用wait()阻塞,待GL线程中的任务执行完毕后,通过notify()唤醒普通线程,从而实现顺序执行。但这种方法非常麻烦,代码可读性差,且不通用。

解决方案 #

在之前的文章中,我使用C++的future封装了一套函数,可以方便地跨线程阻塞调度任务。然而,对于纯C语言开发的项目,没有C++的future,实现类似需求就比较困难。不过,困难也得解决,于是有了下面的代码。

函数设计 #

#ifndef __GL_DISPATCH_H__
#define __GL_DISPATCH_H__
#include <stdbool.h>

typedef struct Dispatcher Dispatcher;
typedef void (*DispatcherFunc)(void* arg);

/**
 * @brief 创建一个实例,内部会常驻一个GL线程,外部可以将某些任务丢到此线程里执行,可以选择是否阻塞执行
 */
Dispatcher* Dispatcher_create();

/**
 * @brief 销毁实例
 */
void Dispatcher_destroy(Dispatcher** dispatcher_p);

/**
 * @brief 利用此函数将任务丢到线程里执行
 *
 * @param dispatcher 实例
 * @param func 要执行的函数
 * @param arg 函数参数
 * @param block 选择是否阻塞执行
 */
void Dispatcher_run(Dispatcher* dispatcher, DispatcherFunc func, void* arg, bool block);

#endif  // __GL_DISPATCH_H__

主要功能在Dispatcher_run函数中,该函数可以选择是否阻塞执行。这里可以思考一下如何实现阻塞的需求?条件变量是肯定需要的,但如何保证条件变量等待的是当前事件呢?直接看代码实现吧。

代码实现 #

#include <stdatomic.h>
#include "libctools.h"
#include "gldispatch.h"

typedef struct Task {
    DispatcherFunc func;
    void* arg;
    Mutex* mutex;
    Cond* cond;
    bool is_continue;
    bool is_block;
} Task;

struct Dispatcher {
    Thread* thread_id;
    Thread _thread_id;
    Mutex* mutex;
    Cond* cond;
    List* task_queue;
    atomic_bool is_interrupt;
};

static void taskDestroy(Task* task) {
    if (task->mutex) {
        mutex_destroyp(&task->mutex);
    }
    if (task->cond) {
        cond_destroyp(&task->cond);
    }
    free((void*)task);
}

static Task* taskCreate(DispatcherFunc func, void* arg, bool is_block) {
    Task* task = (Task*)calloc(1, sizeof(Task));
    if (!task) return NULL;
    task->func = func;
    task->arg = arg;
    task->is_block = is_block;
    if (is_block) {
        task->mutex = mutex_create();
        task->cond = cond_create();
    }
    return task;
}

static int Dispatcher_process(void* arg) {
    log_info("%s start", __func__);
    Dispatcher* self = (Dispatcher*)arg;
    while (!atomic_load(&self->is_interrupt)) {
        mutex_lock(self->mutex);
        while (self->task_queue->len == 0 && !atomic_load(&self->is_interrupt)) {
            cond_wait(self->cond, self->mutex);
        }
        ListNode* node = list_pop_front(self->task_queue);
        mutex_unlock(self->mutex);
        if (atomic_load(&self->is_interrupt)) {
            break;
        }
        if (node) {
            Task* task = (Task*)node->val;
            task->func(task->arg);
            if (task->is_block) {
                mutex_lock(task->mutex);
                task->is_continue = true;
                cond_signal(task->cond);
                mutex_unlock(task->mutex);
            } else {
                taskDestroy(task);
            }
            free(node);
        }
    }
    log_info("%s stop", __func__);
    return 0;
}

static void Dispatcher_start(Dispatcher* dispatcher) {
    dispatcher->thread_id =
        thread_create_with_name(&dispatcher->_thread_id, Dispatcher_process, dispatcher, "Dispatcher_process");
}

static void Dispatcher_stop(Dispatcher* dispatcher) {
    atomic_store(&dispatcher->is_interrupt, true);
    cond_signal(dispatcher->cond);
    list_clear(dispatcher->task_queue, true);
    if (dispatcher->thread_id) {
        thread_wait(dispatcher->thread_id, NULL);
    }
}

Dispatcher* Dispatcher_create() {
    Dispatcher* self = (Dispatcher*)calloc(1, sizeof(Dispatcher));
    if (!self) return NULL;
    self->mutex = mutex_create();
    self->cond = cond_create();
    self->task_queue = list_create();
    self->task_queue->free_func = (void (*)(int64_t))taskDestroy;
    Dispatcher_start(self);
    return self;
}

void Dispatcher_destroy(Dispatcher** dispatcher_p) {
    if (NULL == dispatcher_p || NULL == *dispatcher_p) return;
    Dispatcher* self = *dispatcher_p;
    Dispatcher_stop(self);
    mutex_destroyp(&self->mutex);
    cond_destroyp(&self->cond);
    list_destroy(self->task_queue);
    freep((void**)dispatcher_p);
}

void Dispatcher_run(Dispatcher* dispatcher, DispatcherFunc func, void* arg, bool block) {
    if (!dispatcher) return;
    Task* task = taskCreate(func, arg, block);
    if (task) {
        ListNode* node = list_node_new((int64_t)task);
        mutex_lock(dispatcher->mutex);
        list_push_back(dispatcher->task_queue, node);
        cond_signal(dispatcher->cond);
        mutex_unlock(dispatcher->mutex);
        if (task->is_block) {
            mutex_lock(task->mutex);
            while (!task->is_continue) {
                cond_wait(task->cond, task->mutex);
            }
            mutex_unlock(task->mutex);
            taskDestroy(task);
        }
    }
}

使用方式 #

void func1(void* arg) {
    print("hello func1");
}

void func2(void* arg) {
    print("hello func2");
}

int main() {
    Dispatcher* dispatcher = Dispatcher_create();
    Dispatcher_run(dispatcher, func1, NULL, true);
    Dispatcher_run(dispatcher, func2, NULL, true);
    Dispatcher_destroy(&dispatcher);
    return 0;
}

Tips #

代码中的logmutexcondthreadlist都是二次封装的函数,功能包括日志输出、加解锁、条件变量、创建线程以及C语言的链表。这里不贴出它们的实现,大家可以自己实现一套,当作一个小练习,不难。