C++、线程与任务

标准库对并发提供了一些支持:

  1. 内存模型:对并发访问的保障,希望访问能按照需求工作
  2. programming without locks, 避免数据竞争
  3. thread 库,在 thread lock 等原语层次上
  4. task 支持库,在 Future, Task 这些层次上

层次由低到高,你应该选择更高的层次

需要解决的问题

  1. 位字段带来的整数bytes内的内存位置:

    struct {
    int x: 3;
    int y: 5;
    }

    这样的位字段中,多个处理器读/写 x, y 实际上会把 这 1bytes 的位置都加载,考虑到:

    thread1: 从内存读, 修改 x, 写回

    thread1: 从内存读, 修改 y, 写回
    这种东西应该在多线程中显示处理/同步

  2. 编译器:指令重排

  3. 处理:内存序。内存序这词用来描述一个线程从内存访问一个值会发生什么

    1. SC,顺序一致性,能读到一个一致性的视图(最强)
  4. 数据竞争:多个线程访问同一个内存位置,一个有写操作

原子性

无锁编程:编写不显式使用锁并发,由硬件支持的原语来使用

  1. acquire 操作:其他处理器会在后续操作之前看到效果;

atomic 类型可以帮助我们完成这一点。一般来说内存序是“顺序一致”的

我们这里有 atomic 的 ++

int expected = val.load();
int next;
do {
next = expected + 1;
} while (!val.compare_exchange_weak(expected, next));

这里,每次修改以后都是会被下一次看到的。基于这个操作,我们可以用 atomic 实现 Singleton:

Singleton* Singleton::getInstance() {
if (m_instance == nullptr) {
mutex_singleton.lock();
if (m_instance == nullptr) {
m_instance = new Singleton();
}
mutex_singleton.unlock();
}
return m_instance.load();
}

set 后你的 m_instance 的值都能被感知。

Diff: 内存屏障 vs Volatile

https://zhuanlan.zhihu.com/p/43526907

这里注意,可能m_instance 会面临重拍问题,Java 可以对应 volatile, 而 C++ 需要使用 atomic 来处理这些。

注意这里会有一定的ABA问题,类似 RDBMS 的访问,你的 R 和 W操作能让别的对象感知,但是可能会有:

  1. 读出来
  2. 改改
  3. 写进去

这三部类似加了 volatile, 可以被读取,但是可能会导致同步的问题。

  • volatile 指出对象可以被线程外的东西修改

CAS

我对这个不是很了解,可以参考这两篇文章:

线程和线程池

C++ 关于线程提供了很多所有语言都有的操作,具体的话我觉得这几篇文章讲的很好:

关于线程池,我们提供了一部分接口:

class BasicThreadPool {
public:
BasicThreadPool(int size): pool_size_(size) {
shared_sync_ = std::make_shared<SharedData>();
for (int i = 0; i < pool_size_; ++i) {
auto t = SubRunningThread(this->shared_sync_);
threads_.push_back(t);
}
}
BasicThreadPool(): BasicThreadPool(DEFAULT_POOL_SIZE) {}
void Execute(ThreadTask);
~BasicThreadPool() {
for (int i = 0; i < pool_size_; ++i) {
threads_[i].shutdown();
}
shared_sync_->cond.notify_all();
}
private:
int pool_size_;
std::vector<SubRunningThread> threads_;
// 共享的同步数据
std::shared_ptr<SharedData> shared_sync_;
};

可以看到 有个 std::shared_ptr<SharedData>,这里是用于同步的共享数据:

constexpr int DEFAULT_POOL_SIZE = 25;
using ThreadTask = std::function<int()>;
struct SharedData {
public:
// 使用的队列对象
// shared data
std::queue<ThreadTask > tasks;
std::mutex task_mutex;
std::mutex mutex;
std::condition_variable cond;
};
class SubRunningThread {
public:
SubRunningThread(std::shared_ptr<SharedData> sync): sync_data(sync), canceled_(false) {
auto t = std::thread(*this);
t.detach();
}
void operator()();
void shutdown() { canceled_ = false; }
private:
bool canceled_;
std::shared_ptr<SharedData> sync_data;
};

可以看到,我们会同步数据。

同步的时候,会从队列中 fetch 一个对象,并且完成计算。关于 Execute,我们这里使用 cond 来实现 notify

void BasicThreadPool::Execute(ThreadTask f) {
{
std::lock_guard<std::mutex> l(this->shared_sync_->task_mutex);
std::cout << "Put task into queue" << std::endl;
shared_sync_->tasks.push(f);
}
shared_sync_->cond.notify_one();
}

推进队列之后 push,然后激活一个任务线程。

void SubRunningThread::operator()() {
while (true) {
this->sync_data->task_mutex.lock();
// std::lock_guard<std::mutex> lock();
if (canceled_) {
std::cout << "Task canceled, return" << std::endl;
this->sync_data->task_mutex.unlock();
return;
}
if (this->sync_data->tasks.empty()) {
std::cout << "TaskQueue empty" << std::endl;
this->sync_data->task_mutex.unlock();
} else {
std::cout << "Fetch and run." << std::endl;
// fetch and do
auto task = this->sync_data->tasks.front();
this->sync_data->tasks.pop();
this->sync_data->task_mutex.unlock();
int ret = task();
std::cout << "thread " << std::this_thread::get_id() << " counted " << ret << std::endl;
}
std::unique_lock<std::mutex> uLck(this->sync_data->mutex);
this->sync_data->cond.wait(uLck);
std::cout << "thread " << std::this_thread::get_id() << " noptified" << std::endl;
}
}

这样在主循环里面,就可以利用 cond 来 wait 了。

Java 的线程池

实际上,C++很多并发相关的东西都是从 Java 里面借来的(拷贝忍者?),我这么菜就不自己不自量力爬源代码了,先提供一下别人的帖子

Submit 的区别:

public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

这里传入的还是 Runnable 和 Callable 的对象。

这种情况下,可以直接通过 Future 来获得值,线程池专门用来调度计算。

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
/*
获取当前worker的数目,如果小于corePoolSize那么就扩容,
这里不会判断是否已经有core线程,而是只要小于corePoolSize就会直接增加worker
*/
if (workerCountOf(c) < corePoolSize) {
/*
调用addWorker(Runnable firstTask, boolean core)方法扩容
firstTask表示为该worker启动之后要执行的第一个任务,core表示要增加的为core线程
*/
if (addWorker(command, true))
return;
//如果增加失败了那么重新获取ctl的快照,比如可能线程池在这期间关闭了
c = ctl.get();
}
/*
如果当前线程池正在运行中,并且将任务丢到队列中成功了,
那么就会进行一次double check,看下在这期间线程池是否关闭了,
如果关闭了,比如处于SHUTDOWN状态,如上文所讲的,SHUTDOWN状态的时候,
不再接受新任务,remove成功后调用拒绝处理器。而如果仍然处于运行中的状态,
那么这里就double check下当前的worker数,如果为0,有可能在上述逻辑的执行
过程中,有worker销毁了,比如说任务抛出了未捕获异常等,那么就会进行一次扩容,
但不同于扩容core线程,这里由于任务已经丢到队列中去了,因此就不需要再传递firstTask了,
同时要注意,这里扩容的是非core线程
*/
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
/*
如果在上一步中,将任务丢到队列中失败了,那么就进行一次扩容,
这里会将任务传递到firstTask参数中,并且扩容的是非core线程,
如果扩容失败了,那么就执行拒绝策略。
*/
reject(command);
}

ctl 是个 AtomicInteger,保证内存是可靠的。判断这个线程的状态。(我的 boolean 是不是也要改成这个?)

addWorker 具体看性质。根据拒绝处理器状态什么的来做处理。