当前位置:网站首页>借助原子变量,使用CAS完成并发操作

借助原子变量,使用CAS完成并发操作

2022-06-27 09:11:00 wangzai6378

                看如下一个例子

#pragma once
#include <atomic>
#include <queue>
#include <thread>
#include <iostream>

template <typename T>
class FreeQueue
{
public:
    explicit FreeQueue(const std::size_t size) : m_size(size)
    {
        m_queue = new T[m_size];
    }

    ~FreeQueue()
    {
        delete[] m_queue;
    }

    void Push(const T &d)
    {
        int expect = m_claimSequence.fetch_add(1);
        int temp = expect;
        do
        {
            temp = expect;
        } while (!m_cursor.compare_exchange_strong(temp, temp));

        int index = temp & (m_size - 1);
        m_queue[index] = d;

        m_cursor.store(temp + 1);
    }

    std::vector<T> PopSome()
    {
        std::vector<T> ret;
        while (m_head < m_cursor)
        {
            ret.push_back(m_queue[m_head.fetch_add(1) & (m_size - 1)]);
        }

        return ret;
    }

    T PopOne()
    {
        return T();
        // if (m_head >= m_cursor)
        // {
        //     T ret;
        //     return T();
        // }

        // return m_queue[m_head.fetch_add(1) & (m_size - 1)];
    }

private:
    T *m_queue;
    const int m_size;
    std::atomic_int m_cursor{0};
    std::atomic_int m_head{0};
    std::atomic_int m_claimSequence{0};
};

        主调代码如下:

class Command
{
public:
    Command()
    {
        a = std::vector<int>(1024, 1);
    }
    void HandleData(const int d)
    {
        std::cout << "Command HandleData " << d << std::endl;
    }

    std::vector<int> a{1, 2, 3, 4};
};

void write_thread(const int start, FreeQueue<Command> &quque)
{
    std::atomic_uint64_t index;
    while (true)
    {
        Command cmd;
        cmd.a.assign(random() % 10, 2);
        quque.Push(cmd);
    }
}

void read_thread(FreeQueue<Command> &quque)
{
    while (true)
    {
        auto data = quque.PopOne();

        std::cout << "data size " << data.a.size() << std::endl;
    }
}

int main(int argc, char **argv)
{
    std::vector<std::thread> threads;
    FreeQueue<Command> g_queue(8);
    for (int i = 0; i < std::thread::hardware_concurrency(); i++)
    {
        threads.push_back(std::thread(write_thread, i * 10, std::ref(g_queue)));
    }

    std::thread readT(read_thread, std::ref(g_queue));
    for (auto &t : threads)
    {
        t.join();
    }
    readT.join();

    return 0;
}

        目前在多线程环境下,Push函数是没有问题的。

主要借用std::atomic_int::compare_exchange_strong方法,为了避免ABA问题,每次进入时使用fetch_add生成唯一编号,将其认为是期望值,然后循环等待m_cursor与期望值相等,相等后立马将m_cursor设置为期值,因为每个线程都会获得一个编号,那么每个线程需要的期望值为0,1,2,3,4;由于第一个m_cursor初始值为0,那么分配到0编号的循环会退出循环并将m_cursor设置为0,此时由于其它线程期望值是1,2,3,4...;所以其它线程进行循环等待。退出循环的线程完成操作后,将m_cursor设置为期望值加1(0+1)。这时期望得1的线程会退出循环,以此类推完成所有的Push操作。

        设置思路关键在于,要找到一个期望值,使得仅有一个线程得到标志后,立马更改为其它线程都不能获得期望值的值。此时,该线程已经独占资源了,完成操作后;将原子共享变量改为下一个线程的期望值。需要注意的是,fetch_add在第一次执行的时候返回值就是它本身,之后按参数递增;如 std::atomic_int xx{-1}, 不间断对其fetch_add返回的结果,会是-1,0,1,2,3,....

原网站

版权声明
本文为[wangzai6378]所创,转载请带上原文链接,感谢
https://blog.csdn.net/wanghualin033/article/details/125336339