6.3 C++11 原子操作与原子类型
作者:mmseoamin日期:2023-12-14

一、原子类型

1.多线程下的问题

在C++中,一个全局数据在多个线程中被同时使用时,如果不加任何处理,则会出现数据同步的问题。

#include 
#include 
#include 
long val = 0;
void test()
{
	for (int i = 0; i < 10000000; i++) {
		val++;
	}
}
int main()
{
	auto time1 = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count();
	std::thread thread1(test);
	std::thread thread2(test);
	thread1.join();
	thread2.join();
	auto time2 = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count();
	time2 - time1;
    std::cout << "val:" << val<<" " << ((time2 - time1)/1000.0)<<"s"< 

上述例子中test函数对全局变量val进行累加,并在thread1和thread2两个线程中分别处理test函数。得到的结果如下:

val:11260278 0.105s

val的值并不是期望的20000000,这是因为val++操作不是原子操作导致的。

对于val++,实际上会被拆为三步:

  • 从内存读取val值到寄存器
  • 寄存器自增
  • 将寄存器值回写到val的内存

    而多线程同时操作时,实际步骤可能为:

    • thread1读取val值为0
    • thread2读取val值为0
    • thread1增加val值为1
    • thread2增加val值为1(注意与thread1增加的不是一个原始值)
    • thread1写入val,值为1
    • thread2覆盖写入val,值为1

      这样就会导致两个线程的操作重复,最终结果小于20000000。

      2.原子操作

      所谓原子操作,指的是多线程中"最小且不可并行化"的操作,如val++语句的三步在一个线程中执行完之前,不会运行其他的进程。

      在C++11之前,原子操作都是通过“互斥”(即在临界区间内,一个线程正在访问,则其他线程会等待)处理的,如互斥锁。下面则是通过添加互斥锁处理这个问题:

      #include 
      long val = 0;
      std::mutex locker;
      void test()
      {
      	for (int i = 0; i < 10000000; i++) {
      		locker.lock();
      		val++;
      		locker.unlock();
      	}
      }
      

      输出结果如下:

      val:20000000 5.648s

      可以看到val值正确,但是性能消耗非常大。

      3.原子类型

      C++11将原子操作抽象,引入原子类型atomic,并提供相应的操作接口(原子操作)。通过atomic实现线程间数据同步:

      #include 
      std::atomic_long val = 0;
      void test()
      {
      	for (int i = 0; i < 10000000; i++) {
      		val++;
      	}
      }

      输出为:

      val:20000000 2.29s

      可见val值正确,且耗时比互斥锁小了很多。(因为mutex使用涉及:多次atomic原子指令操作+用户态和内核态切换、线程切换调度开销、全局锁hash queue维护等开销,所以时间更长。但是atomic只能对变量,而锁可以针对范围内的所有内容)

      3.1 内置原子类型

      类似于前面的atomic_long,C++11为所有内置类型都提供了对应的原子类型:

      6.3 C++11 原子操作与原子类型,第1张

      3.2 自定义类型原子类型

      因为atomic为类模板,所以可以通过:

      std::atomic t;

      创建自定义类型的原子类型,当然也可以使用此方式创建内置类型的原子类型。

      atomic为作为类模板,提供了统一的操作接口:

      6.3 C++11 原子操作与原子类型,第2张

      其中is_lock_free用于判断是否有锁,load用于读取,store用于存,exchange用于交换数据。

      由于原子类型属于资源类型,所以为了避免拷贝时引起的问题,atomic类模板删除了相关的拷贝构造和赋值函数。

      此外,atomic到原始类型的转换也是允许的(隐式的),但非原子操作。

      atomic_flag

      atomic_flag是无锁的,仅支持test_and_set和clear两个接口。其中test_and_set表示:

      • 如果atomic_flag原始为false,则设置其为true,并返回false
      • 如果原始为true,则不处理,并返回true

        而clear则表示将atomic_flag置为false。

        所以可以使用atomic_flag实现一个自旋锁:

        #include 
        #include 
        #include 
        #include 
        using namespace std;
        std::atomic_flag lock = ATOMIC_FLAG_INIT;
        void f(int n) {
            while (lock.test_and_set(std::memory_order_acquire)) // 尝试获得锁
                cout << "Waiting from thread " << n << endl;      // 自旋
            cout << "Thread " << n << " starts working" << endl;
        }
        void g(int n) {
            cout << "Thread " << n << " is going to start." << endl;
            lock.clear();
            cout << "Thread " << n << " starts working" << endl;
        }
        int main() {
            lock.test_and_set();	//设为true
            thread t1(f, 1);
            thread t2(g, 2);
            t1.join();
            usleep(100);
            t2.join();
        }
        // 编译选项:g++ -std=c++11 6-3-3.cpp -lpthread
        

        上述代码声明了一个全局的atomic_flag变量lock。最开始,将lock初始化为值ATOMIC_FLAG_INIT,即false的状态。

        而在线程t1中(执行函数f的代码),我们不停地通过lock的成员test_and_set来设置lock为true。这里的test_and_set()是一种原子操作,用于在一个内存空间原子地写入新值并且返回旧值。因此test_and_set会返回之前的lock的值。

        所以当线程t1执行join之后,由于在main函数中调用过test_and_set,因此f中的test_and_set将一直返回true,并不断打印信息,即自旋等待。

        而当线程t2加入运行的时候,由于其调用了lock的成员clear,将lock的值设为false,因此此时线程t1的自旋将终止,从而开始运行后面的代码。这样一来,我们实际上就通过自旋锁达到了让t1线程等待t2线程的效果。

        当然,还可以将lock封装为锁操作,比如:

        void Lock(atomic_flag *lock) { while (lock.test_and_set ()); }
        void Unlock(atomic_flag *lock) { lock.clear(); }

        二、内存模型、顺序一致性和memory_order

        原子类型为线程间数据同步提供了一定的保障,但是这是建立在顺序一致性的内存模型基础上。

        1.问题

        #include 
        #include 
        #include 
        using namespace std;
        atomic a {0};
        atomic b {0};
        int ValueSet(int) {
            int t = 1;
            a = t;
            b = 2;
        }
        int Observer(int) {
            cout << "(" << a << ", " << b << ")" << endl;    // 可能有多种输出
        }
        int main() {
            thread t1(ValueSet, 0);
            thread t2(Observer, 0);
            t1.join();
            t2.join();
            cout << "Got (" << a << ", " << b << ")" << endl;    // Got (1, 2)
        }
        // 编译选项:g++ -std=c++11 6-3-4.cpp -lpthread

        对于上面的代码,比较合理的打印值有:(0,0),(1,0),(1,2),然而在非顺序一致性时可能会打印(0,2)这样的结果。

        这是因为对于非顺序一致性场景下,编译器在认定a、b的赋值语句的执行先后顺序对输出结果有任何的影响的话,则可以依情况将指令重排序(reorder)以提高性能。因此就有可能会将b的赋值语句提前到a的赋值语句之前,从而得到(0,2)这样的结果。

        当然,默认情况下,在C++11中的原子类型的变量在线程中总是保持着顺序执行的特性(非原子类型则没有必要,因为不需要在线程间进行同步)。我们称这样的特性为“顺序一致”的,即代码在线程中运行的顺序与程序员看到的代码顺序一致,a的赋值语句永远发生于b的赋值语句之前。

        2.内存模型

        通常情况下,内存模型是一个硬件的概念,表示机器指令以什么样的顺序被执行。

        对于"t = 1; a = t; b = 2;"可以用如下的伪汇编表示:

        1: Loadi     reg3, 1;     # 将立即数1放入寄存器reg3
        2:Move     reg4, reg3;   # 将reg3的数据放入reg4
        3: Store     reg4, a;     # 将寄存器reg4中的数据存入内存地址a
        4: Loadi     reg5, 2;     # 将立即数2放入寄存器reg5
        5: Store     reg5, b;     # 将寄存器reg5中的数据存入内存地址b

        通常情况下,应该按照1,2,3,4,5顺序执行,这样的内存模型称为强顺序的。这时a的赋值始终先于b的赋值执行。

        但是我们可以看到,指令1,2,3与指令4,5毫无关联,因此一些处理器可能就会重排指令顺序,比如1,4,2,5,3的顺序执行。这种场景,我们称为弱顺序的,b的赋值也就会先于a的赋值。

        而在多线程中,强顺序意味着,多个线程看到的指令执行顺序是一致的且反馈到处理器层面,内存数据变换顺序与指令顺序一致,而弱顺序则无法保证这一点。

        而原子操作要求都是顺序的,这在强顺序内存模型下是不需要额外处理的,而对于弱顺序内存模型下,则需要添加内存栅栏这样的指令来确保顺序一致性,这对性能往往有较大的损耗。

        3.内存顺序memory_order

        以上描述的都是硬件上的一些内存模型,而C++11引入的内存模型和顺序一致性则是针对编译器而言:

        • 编译器保证原子操作的指令间顺序不变,即保证产生的读写原子类型的变量的机器指令与代码编写者看到的是一致的。
        • 处理器对原子操作的汇编指令的执行顺序不变。这对于x86这样的强顺序的体系结构而言,并没有任何的问题;而对于PowerPC这样的弱顺序的体系结构而言,则要求编译器在每次原子操作后加入内存栅栏。

          C++11的原子操作默认都是顺序一致性的,这对强顺序体系而言没有影响,但是对于弱顺序体系而言,添加内存栅栏来确保顺序一致性,会大大增加性能消耗。为了解决这一问题,C++11引入了内存顺序memory_order的概念,即对所有的原子操作提供一个参数入口,传入不同的momery_order以弱化对顺序一致性的要求。

          具体使用如下:

          #include 
          #include 
          #include 
          using namespace std;
          atomic a {0};
          atomic b {0};
          int ValueSet(int) {
              int t = 1;
              a.store(t, memory_order_relaxed);
              b.store(2, memory_order_relaxed);
          }
          int Observer(int) {
              cout << "(" << a << ", " << b << ")" << endl;    // 可能有多种输出
          }
          int main() {
              thread t1(ValueSet, 0);
              thread t2(Observer, 0);
              t1.join();
              t2.join();
              cout << "Got (" << a << ", " << b << ")" << endl;    // Got (1, 2)
              return 0;
          }
          // 编译选项:g++ -std=c++11 6-3-6.cpp -lpthread
          

          a和b的赋值操作传入参数memory_order_relaxed,表示对执行顺序不做任何要求,从而放开顺序一致性。

          memory_order的枚举值有:

          6.3 C++11 原子操作与原子类型,第3张

          通常情况下,我们可以把atomic成员函数可使用的memory_order值分为以下3组:

          ❑ 原子存储操作(store)可以使用memorey_order_relaxed、memory_order_release、memory_order_seq_cst。

          ❑ 原子读取操作(load)可以使用memorey_order_relaxed、memory_order_consume、memory_order_acquire、memory_order_seq_cst。

          ❑ RMW操作(read-modify-write),即一些需要同时读写的操作,比如之前提过的atomic_flag类型的test_and_set()操作。又比如atomic类模板的atomic_compare_exchange()操作等都是需要同时读写的。RMW操作可以使用memorey_order_relaxed、memory_order_consume、memory_order_acquire、memory_order_release、memory_order_acq_rel、memory_order_seq_cst。

          排除顺序一致和松散两种方式,我们能不能保证程序“既快又对”地运行呢?实际上,我们所需要的只是a.store先于b.store发生,b.load先于a.load发生的顺序。这要这两个“先于发生”关系得到了遵守,对于整个程序而言来说,就不会发生线程间的错误。所以我们可以修改代码如下:

          #include 
          #include 
          #include 
          using namespace std;
          atomic a;
          atomic b;
          int Thread1(int) {
              int t = 1;
              a.store(t, memory_order_relaxed);
              b.store(2, memory_order_release); // 本原子操作前所有的写原子操作必须完成
          }
          int Thread2(int) {
              while(b.load(memory_order_acquire) != 2);   // 本原子操作必须完成才能执行之后所有
              的读原子操作
              cout << a.load(memory_order_relaxed) << endl;    // 1
          }
          int main() {
              thread t1(Thread1, 0);
              thread t2(Thread2, 0);
              t1.join();
              t2.join();
              return 0;
          }
          // 编译选项:g++ -std=c++11 6-3-8.cpp -lpthread
          

          b.store采用了memory_order_release内存顺序,这保证了本原子操作前所有的写原子操作必须完成,也即a.store操作必须发生于b.store之前。b.load采用了memory_order_acquire作为内存顺序,这保证了本原子操作必须完成才能执行之后所有的读原子操作。即b.load必须发生在a.load操作之前。这样一来,通过确立“先于发生”关系的,我们就完全保证了代码运行的正确性,即当b的值为2的时候,a的值也确定地为1。