量化开发面试必考并发编程,但很多人第一次接触就直接看线程池代码,结果一头雾水。这篇文章从最基础的数据竞争开始,一步步推导出有界阻塞队列和线程池,每个概念都从”它解决什么问题”出发。
第一课:数据竞争
先看这段代码,猜一下输出是什么:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| #include <iostream> #include <thread>
int counter = 0;
void add() { for (int i = 0; i < 100000; i++) { counter++; } }
int main() { std::thread t1(add); std::thread t2(add); t1.join(); t2.join(); std::cout << counter << std::endl; }
|
两个线程各加 10 万次,直觉上应该输出 200000。实际运行:
每次都不一样,几乎从来不是 200000。
为什么?
counter++ 看起来是一行,但 CPU 实际执行是三步:
1 2 3
| 1. 把 counter 从内存读到寄存器 (READ) 2. 寄存器里的值 +1 (ADD) 3. 把寄存器的值写回内存 (WRITE)
|
两个线程同时跑,可能发生这种情况:
1 2 3 4 5 6 7 8
| 时间线 线程1 线程2 ──────────────────────────────────────── t1 READ → 拿到 100 t2 READ → 也拿到 100 ← 还没被写回! t3 ADD → 算出 101 t4 ADD → 也算出 101 t5 WRITE → 写回 101 t6 WRITE → 写回 101 ← 覆盖了!
|
两个线程各加了一次,但 counter 只从 100 变成 101,丢了一次加法。这就叫数据竞争(Data Race)。
四个核心工具
1. mutex
本质:一把锁,同一时刻只有一个线程能持有它。
1 2 3 4 5
| std::mutex mtx;
mtx.lock(); counter++; mtx.unlock();
|
问题是手动 lock/unlock 很危险:
1 2 3
| mtx.lock(); doSomething(); mtx.unlock();
|
2. lock_guard
本质:一个对象,构造时加锁,析构时自动解锁。
1 2 3 4
| { std::lock_guard<std::mutex> lock(mtx); counter++; }
|
利用 C++ 的 RAII:对象销毁时自动执行清理。不管正常退出还是异常,析构函数一定执行,锁一定释放。
局限: 加锁后不能中途解锁,整个生命周期都是锁定状态。
3. unique_lock
本质:比 lock_guard 更灵活,可以中途解锁/重新加锁。
1 2 3 4 5 6
| std::unique_lock<std::mutex> lock(mtx);
lock.unlock();
lock.lock();
|
只有两种情况必须用 unique_lock:
- 需要配合 condition_variable(cv.wait 内部需要中途解锁)
- 需要在持有锁期间手动解锁
4. condition_variable
mutex 只能解决”同时访问”的问题,解决不了”等待某个条件”的问题。
比如”队列空时,取数据的线程要等待”,用 mutex 的朴素想法是:
1 2 3
| while (queue.empty()) { }
|
这叫忙等(busy waiting)。condition_variable 解决这个问题:
1 2 3 4 5 6 7 8
| std::condition_variable cv;
cv.wait(lock, [] { return !queue.empty(); });
cv.notify_one(); cv.notify_all();
|
wait 内部到底发生了什么:
1 2 3 4 5 6 7 8
| while (!pred()) { mtx.unlock(); mtx.lock(); }
|
为什么必须先解锁再睡?如果持有锁去睡觉,生产者永远拿不到锁放数据,消费者永远不会被唤醒——死锁。
为什么 wait 要传 lambda:
操作系统存在虚假唤醒:线程没有被 notify,却自己醒了。不传 lambda 的话醒来队列可能还是空的就去取数据,直接崩。lambda 保证醒来条件一定为真。
四者关系:
1 2 3 4 5 6 7 8 9 10
| mutex │ 太危险,需要自动管理 ▼ lock_guard ← 够用就用这个 │ 需要中途解锁时 ▼ unique_lock ← 配合 cv 时必须用这个 │ 需要等待条件时 ▼ condition_variable ← wait/notify,线程间协调
|
一句话记忆:
1 2 3 4 5
| mutex → 门上的锁 lock_guard → 进门自动锁,出门自动开(钥匙绑在脚上) unique_lock → 同上,但可以临时把钥匙摘下来 cv.wait → 我先把钥匙放门口,去睡觉,有人叫我再来拿 cv.notify_one → 喂,你可以来拿钥匙了
|
实战一:有界阻塞队列
理解了四个工具,阻塞队列就是把它们组合起来。
push 的逻辑:
- 加锁
- 如果队列满了,等待(not_full 条件变量)
- 放入数据
- 通知等待取数据的线程(not_empty)
- 解锁
pop 的逻辑:
- 加锁
- 如果队列空了,等待(not_empty 条件变量)
- 取出数据
- 通知等待放数据的线程(not_full)
- 解锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| #include <queue> #include <mutex> #include <condition_variable>
template<typename T> class BoundedBlockingQueue { public: explicit BoundedBlockingQueue(size_t capacity) : capacity_(capacity) {}
void push(T val) { std::unique_lock<std::mutex> lock(mutex_); not_full_.wait(lock, [this] { return queue_.size() < capacity_; }); queue_.push(std::move(val)); not_empty_.notify_one(); }
void pop(T& val) { std::unique_lock<std::mutex> lock(mutex_); not_empty_.wait(lock, [this] { return !queue_.empty(); }); val = std::move(queue_.front()); queue_.pop(); not_full_.notify_one(); }
bool try_push(T val) { std::unique_lock<std::mutex> lock(mutex_); if (queue_.size() >= capacity_) return false; queue_.push(std::move(val)); not_empty_.notify_one(); return true; }
size_t size() const { std::unique_lock<std::mutex> lock(mutex_); return queue_.size(); }
private: size_t capacity_; std::queue<T> queue_; mutable std::mutex mutex_; std::condition_variable not_full_; std::condition_variable not_empty_; };
|
几个细节:
- 两个条件变量分开(not_full / not_empty):每次只唤醒对应的等待方,避免惊群
mutable mutex_:size() 是 const 方法,但读 queue_.size() 也需要加锁防止读到中间态
std::move:push 时 move 进去,pop 时 move 出来,避免不必要的拷贝
实战二:线程池
线程池 = 有界阻塞队列 + 一组工作线程。
类比餐厅:
- 厨师 = 线程(提前雇好,一直在岗)
- 点单 = 提交任务
- 待处理订单队列 = 任务队列
不用线程池的话,每来一个任务就 new thread,做完就销毁——每次点菜都临时招一个厨师,代价极高。
结构:
1 2 3 4 5 6
| ThreadPool ├── vector<thread> workers // 固定数量的工作线程 ├── queue<function<void()>> tasks // 待执行的任务队列 ├── mutex // 保护任务队列 ├── condition_variable // 通知有新任务 / 要关闭了 └── bool stop // 析构时通知线程退出
|
完整实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| #include <vector> #include <queue> #include <thread> #include <mutex> #include <condition_variable> #include <functional> #include <future> #include <stdexcept>
class ThreadPool { public: explicit ThreadPool(size_t n) : stop_(false) { for (size_t i = 0; i < n; ++i) { workers_.emplace_back([this] { while (true) { std::function<void()> task; { std::unique_lock<std::mutex> lock(mutex_); cv_.wait(lock, [this] { return stop_ || !tasks_.empty(); }); if (stop_ && tasks_.empty()) return; task = std::move(tasks_.front()); tasks_.pop(); } task(); } }); } }
template<typename F, typename... Args> auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> { using ReturnType = decltype(f(args...));
auto task = std::make_shared<std::packaged_task<ReturnType()>>( std::bind(std::forward<F>(f), std::forward<Args>(args)...) );
std::future<ReturnType> result = task->get_future(); { std::unique_lock<std::mutex> lock(mutex_); if (stop_) throw std::runtime_error("submit on stopped ThreadPool"); tasks_.emplace([task] { (*task)(); }); } cv_.notify_one(); return result; }
~ThreadPool() { { std::unique_lock<std::mutex> lock(mutex_); stop_ = true; } cv_.notify_all(); for (auto& w : workers_) w.join(); }
private: std::vector<std::thread> workers_; std::queue<std::function<void()>> tasks_; std::mutex mutex_; std::condition_variable cv_; bool stop_; };
|
使用:
1 2 3 4 5 6 7
| ThreadPool pool(4);
auto f1 = pool.submit([](int a, int b) { return a + b; }, 3, 5); auto f2 = pool.submit([] { return 42; });
std::cout << f1.get() << "\n"; std::cout << f2.get() << "\n";
|
几个关键问题:
为什么用 packaged_task 而不是直接存 function?
因为要拿返回值。packaged_task 内部绑定了一个 promise,执行完自动把结果放进去,外面用 future.get() 取出来。
为什么 task 要用 shared_ptr 包装?
packaged_task 不能拷贝,但 std::function 要求内容可拷贝。shared_ptr 本身可拷贝(引用计数),lambda 捕获 shared_ptr 后存进 function<void()> 就没问题了。
析构为什么用 notify_all 而不是 notify_one?
要让所有线程都醒来检查 stop_,不然没被唤醒的线程会永远阻塞,join() 就会卡死。
默写路径(按逻辑推导,不要硬背):
1 2 3 4 5 6
| 1. 构造:for(n个线程) { emplace_back(主循环lambda) } 2. 主循环:while(true) { 加锁→wait→取任务→解锁→执行 } 3. wait 条件:stop_ || !tasks_.empty() 4. 退出条件:stop_ && tasks_.empty() 5. submit:packaged_task→shared_ptr→lambda入队→notify_one→返回future 6. 析构:stop_=true→notify_all→join all
|
量化开发手撕路线图
有了并发基础,接下来的学习路径:
第一阶段(并发基础,已完成)
- ✅ 数据竞争、mutex、lock_guard、unique_lock
- ✅ condition_variable、有界阻塞队列
- ✅ 线程池
第二阶段(量化特色)
- 内存池:预分配 + freelist,避免 malloc 开销
- SPSC 无锁队列:ring buffer + atomic,行情线程→策略线程的标准方案
- 读写锁:shared_mutex,读多写少场景
第三阶段(数据结构)
- LRU Cache(unordered_map + list)
- 滑动窗口均值/方差(O(1) 更新)
- 定时器轮(TimerWheel)
量化开发和普通后端的核心差异是低延迟意识:每写一行代码都要问自己”这里有没有不必要的拷贝/锁/内存分配”。mutex 版本写对写熟比死磕无锁更重要,面试官更在意你理解 happens-before 和数据竞争,不在意你背了多少原子操作。