C++20同步原语详解 | latch、barrier与semaphore

C++20同步原语:latch、barrier与semaphore详解 #

引言 #

C++20引入了三种重要的同步原语:latch、barrier和semaphore,它们为多线程编程提供了更强大和灵活的同步机制。这里详细介绍下。

latch #

这个可能大多数人都有所了解,这就是我们经常会用到的CountDownLatch。用于使一个线程先阻塞,等待其他线程完成各自的工作后再继续执行。

CountDownLatch是通过计数器实现,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后等待的线程就可以打断阻塞去继续执行任务。

自己之前实现过一个CountDownLatch,源码大概这样:

CountDownLatch::CountDownLatch(int32_t count) : count_(count) {}
void CountDownLatch::CountDown() {
    std::unique_lock<std::mutex> lock(mutex_);
    --count_;
    if (count_ == 0) {
        cv_.notify_all();
    }
}
void CountDownLatch::Await(int32_t time_ms) {
    std::unique_lock<std::mutex> lock(mutex_);
    while (count_ > 0) {
        if (time_ms > 0) {
            cv_.wait_for(lock, std::chrono::milliseconds(time_ms));
        } else {
            cv_.wait(lock);
        }
    }
}
int32_t CountDownLatch::GetCount() const {
    std::unique_lock<std::mutex> lock(mutex_);
    return count_;
}

barrier #

许多线程在阻塞点阻塞,当到达阻塞点的线程达到一定数量时,会执行完成的回调,然后解除所有相关线程的阻塞,然后重置线程计数器,继续开始下一阶段的阻塞。

假设有很多线程并发执行,并在一个循环中执行一些计算。进一步假设一旦这些计算完成,需要在线程开始其循环的新迭代之前对结果进行一些处理。

看以下示例代码(摘自cppreference):

#include <barrier>
#include <iostream>
#include <string>
#include <thread>
#include <vector>

int main() {
    const auto workers = { "anil", "busara", "carl" };
    auto on_completion = []() noexcept {
        static auto phase = "... done\n" "Cleaning up...\n";
        std::cout << phase;
        phase = "... done\n";
    };
    std::barrier sync_point(std::ssize(workers), on_completion);

    auto work = [&](std::string name) {
        std::string product = "  " + name + " worked\n";
        std::cout << product;
        sync_point.arrive_and_wait();
        product = "  " + name + " cleaned\n";
        std::cout << product;
        sync_point.arrive_and_wait();
    };

    std::cout << "Starting...\n";
    std::vector<std::thread> threads;
    for (auto const& worker : workers) {
        threads.emplace_back(work, worker);
    }
    for (auto& thread : threads) {
        thread.join();
    }
}

可能的输出如下:

Starting...
  anil worked
  carl worked
  busara worked
... done
Cleaning up...
  busara cleaned
  carl cleaned
  anil cleaned
... done

semaphore #

信号量,这个估计大家都很熟悉,本质也是个计数器,主要有两个方法:

  • acquire():递减计数器,当计数器为零时阻塞,直到计数器再次递增。

  • release():递增计数器(可传递具体数字),并解除在acquire调用中的线程的阻塞。

示例代码如下:

#include <iostream>
#include <thread>
#include <chrono>
#include <semaphore>

std::binary_semaphore smphSignalMainToThread(0), smphSignalThreadToMain(0);

void ThreadProc() {
    smphSignalMainToThread.acquire();
    std::cout << "[thread] Got the signal\n";
    using namespace std::literals;
    std::this_thread::sleep_for(3s);
    std::cout << "[thread] Send the signal\n";
    smphSignalThreadToMain.release();
}

int main() {
    std::thread thrWorker(ThreadProc);
    std::cout << "[main] Send the signal\n";
    smphSignalMainToThread.release();
    smphSignalThreadToMain.acquire();
    std::cout << "[main] Got the signal\n";
    thrWorker.join();
}

输出如下:

[main] Send the signal
[thread] Got the signal
[thread] Send the signal
[main] Got the signal

信号量也可以当作条件变量使用,这个我估计你应该知道怎么做。