C++学习笔记-线程

C++11引入了标准的线程库,提供了跨平台的多线程编程支持。std::thread类用于创建和管理线程,join()方法用于等待线程执行完成,类似于C#中的wait_for_exit()方法。

线程的基本使用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <iostream>
#include <thread>

static bool s_Finished = false;

void DoWork()
{
    using namespace std::literals::chrono_literals;

    std::cout << "Started thread id=" << std::this_thread::get_id() << std::endl;

    while (!s_Finished)
    {
        std::cout << "Working..." << std::endl;
        std::this_thread::sleep_for(1s);
    }
    
    std::cout << "Work finished!" << std::endl;
}

int main()
{
    std::thread worker(DoWork); // 创建并启动一个线程

    std::cout << "Main thread id=" << std::this_thread::get_id() << std::endl;
    std::cout << "Press Enter to stop the worker thread..." << std::endl;
    
    std::cin.get(); // 等待用户按回车,注意这不会阻塞工作线程
    s_Finished = true;

    worker.join(); // 等待线程结束
    
    std::cout << "Program finished!" << std::endl;
    
    return 0;
}

线程的创建方式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
#include <iostream>
#include <thread>
#include <functional>

// 1. 普通函数
void SimpleFunction()
{
    std::cout << "Simple function in thread " << std::this_thread::get_id() << std::endl;
}

// 2. 带参数的函数
void FunctionWithParams(int value, const std::string& message)
{
    std::cout << "Thread " << std::this_thread::get_id() 
              << ": value=" << value << ", message=" << message << std::endl;
}

// 3. 类的成员函数
class Worker
{
public:
    void DoWork(int iterations)
    {
        for (int i = 0; i < iterations; ++i)
        {
            std::cout << "Worker iteration " << i << " in thread " 
                      << std::this_thread::get_id() << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
    }
    
    void StaticWork()
    {
        std::cout << "Static work in thread " << std::this_thread::get_id() << std::endl;
    }
};

int main()
{
    std::cout << "=== Different Ways to Create Threads ===" << std::endl;
    
    // 1. 使用函数指针
    std::thread t1(SimpleFunction);
    
    // 2. 使用带参数的函数
    std::thread t2(FunctionWithParams, 42, "Hello Thread");
    
    // 3. 使用Lambda表达式
    std::thread t3([]() {
        std::cout << "Lambda in thread " << std::this_thread::get_id() << std::endl;
    });
    
    // 4. 使用Lambda带参数
    std::thread t4([](int count) {
        for (int i = 0; i < count; ++i)
        {
            std::cout << "Lambda iteration " << i << std::endl;
        }
    }, 3);
    
    // 5. 使用成员函数
    Worker worker;
    std::thread t5(&Worker::DoWork, &worker, 3);
    
    // 6. 使用std::bind
    std::thread t6(std::bind(&Worker::DoWork, &worker, 2));
    
    // 7. 使用函数对象
    auto functor = [](const std::string& name) {
        std::cout << "Functor " << name << " in thread " 
                  << std::this_thread::get_id() << std::endl;
    };
    std::thread t7(functor, "MyFunctor");
    
    // 等待所有线程完成
    t1.join();
    t2.join();
    t3.join();
    t4.join();
    t5.join();
    t6.join();
    t7.join();
    
    std::cout << "All threads completed!" << std::endl;
    
    return 0;
}

线程同步 - 互斥锁

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>

std::mutex g_Mutex;
int g_Counter = 0;

void IncrementCounter(int threadId, int iterations)
{
    for (int i = 0; i < iterations; ++i)
    {
        // 使用lock_guard自动管理锁
        {
            std::lock_guard<std::mutex> lock(g_Mutex);
            ++g_Counter;
            std::cout << "Thread " << threadId << " incremented counter to " << g_Counter << std::endl;
        }
        
        // 模拟一些工作
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
}

void UnsafeIncrement(int threadId, int iterations)
{
    for (int i = 0; i < iterations; ++i)
    {
        // 不安全的操作 - 可能导致竞态条件
        int temp = g_Counter;
        std::this_thread::sleep_for(std::chrono::microseconds(1));
        g_Counter = temp + 1;
    }
}

void DemonstrateMutex()
{
    std::cout << "=== Mutex Demonstration ===" << std::endl;
    
    g_Counter = 0;
    std::vector<std::thread> threads;
    
    // 创建多个线程安全地增加计数器
    for (int i = 0; i < 3; ++i)
    {
        threads.emplace_back(IncrementCounter, i, 5);
    }
    
    // 等待所有线程完成
    for (auto& t : threads)
    {
        t.join();
    }
    
    std::cout << "Final counter value (with mutex): " << g_Counter << std::endl;
}

void DemonstrateRaceCondition()
{
    std::cout << "\n=== Race Condition Demonstration ===" << std::endl;
    
    g_Counter = 0;
    std::vector<std::thread> threads;
    
    // 创建多个线程不安全地增加计数器
    for (int i = 0; i < 3; ++i)
    {
        threads.emplace_back(UnsafeIncrement, i, 100);
    }
    
    // 等待所有线程完成
    for (auto& t : threads)
    {
        t.join();
    }
    
    std::cout << "Final counter value (without mutex): " << g_Counter 
              << " (expected: 300)" << std::endl;
}

int main()
{
    DemonstrateMutex();
    DemonstrateRaceCondition();
    
    return 0;
}

条件变量

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>

class ThreadSafeQueue
{
private:
    std::queue<int> m_Queue;
    std::mutex m_Mutex;
    std::condition_variable m_Condition;
    
public:
    void Push(int value)
    {
        std::lock_guard<std::mutex> lock(m_Mutex);
        m_Queue.push(value);
        std::cout << "Pushed: " << value << std::endl;
        m_Condition.notify_one();  // 通知等待的线程
    }
    
    int Pop()
    {
        std::unique_lock<std::mutex> lock(m_Mutex);
        
        // 等待直到队列不为空
        m_Condition.wait(lock, [this] { return !m_Queue.empty(); });
        
        int value = m_Queue.front();
        m_Queue.pop();
        std::cout << "Popped: " << value << std::endl;
        return value;
    }
    
    bool TryPop(int& value, std::chrono::milliseconds timeout)
    {
        std::unique_lock<std::mutex> lock(m_Mutex);
        
        if (m_Condition.wait_for(lock, timeout, [this] { return !m_Queue.empty(); }))
        {
            value = m_Queue.front();
            m_Queue.pop();
            std::cout << "Try popped: " << value << std::endl;
            return true;
        }
        
        std::cout << "Try pop timeout" << std::endl;
        return false;
    }
    
    size_t Size()
    {
        std::lock_guard<std::mutex> lock(m_Mutex);
        return m_Queue.size();
    }
};

void Producer(ThreadSafeQueue& queue, int start, int count)
{
    for (int i = 0; i < count; ++i)
    {
        queue.Push(start + i);
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

void Consumer(ThreadSafeQueue& queue, int consumeCount)
{
    for (int i = 0; i < consumeCount; ++i)
    {
        int value = queue.Pop();
        std::this_thread::sleep_for(std::chrono::milliseconds(150));
    }
}

int main()
{
    std::cout << "=== Condition Variable Example ===" << std::endl;
    
    ThreadSafeQueue queue;
    
    // 创建生产者和消费者线程
    std::thread producer1(Producer, std::ref(queue), 1, 5);
    std::thread producer2(Producer, std::ref(queue), 100, 3);
    std::thread consumer1(Consumer, std::ref(queue), 4);
    std::thread consumer2(Consumer, std::ref(queue), 4);
    
    // 等待所有线程完成
    producer1.join();
    producer2.join();
    consumer1.join();
    consumer2.join();
    
    std::cout << "Remaining items in queue: " << queue.Size() << std::endl;
    
    return 0;
}

原子操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#include <iostream>
#include <thread>
#include <atomic>
#include <vector>

std::atomic<int> g_AtomicCounter(0);
std::atomic<bool> g_Ready(false);
std::atomic<bool> g_Stop(false);

void AtomicIncrement(int threadId, int iterations)
{
    // 等待开始信号
    while (!g_Ready.load())
    {
        std::this_thread::yield();
    }
    
    for (int i = 0; i < iterations; ++i)
    {
        g_AtomicCounter.fetch_add(1);  // 原子递增
        
        if (g_Stop.load())
            break;
    }
    
    std::cout << "Thread " << threadId << " finished" << std::endl;
}

void AtomicOperationsDemo()
{
    std::cout << "=== Atomic Operations Demo ===" << std::endl;
    
    const int numThreads = 4;
    const int iterations = 1000;
    
    std::vector<std::thread> threads;
    
    // 创建线程
    for (int i = 0; i < numThreads; ++i)
    {
        threads.emplace_back(AtomicIncrement, i, iterations);
    }
    
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    
    // 发出开始信号
    g_Ready.store(true);
    
    // 等待一段时间后停止
    std::this_thread::sleep_for(std::chrono::milliseconds(500));
    g_Stop.store(true);
    
    // 等待所有线程完成
    for (auto& t : threads)
    {
        t.join();
    }
    
    std::cout << "Final atomic counter value: " << g_AtomicCounter.load() << std::endl;
}

// 原子操作的不同内存序
void MemoryOrderingDemo()
{
    std::cout << "\n=== Memory Ordering Demo ===" << std::endl;
    
    std::atomic<int> data(0);
    std::atomic<bool> flag(false);
    
    // 写线程
    std::thread writer([&]() {
        data.store(42, std::memory_order_relaxed);
        flag.store(true, std::memory_order_release);  // 释放语义
        std::cout << "Writer: data set to 42, flag set to true" << std::endl;
    });
    
    // 读线程
    std::thread reader([&]() {
        while (!flag.load(std::memory_order_acquire))  // 获取语义
        {
            std::this_thread::yield();
        }
        
        int value = data.load(std::memory_order_relaxed);
        std::cout << "Reader: flag is true, data value is " << value << std::endl;
    });
    
    writer.join();
    reader.join();
}

int main()
{
    AtomicOperationsDemo();
    MemoryOrderingDemo();
    
    return 0;
}

线程池简单实现

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <future>

class SimpleThreadPool
{
private:
    std::vector<std::thread> m_Workers;
    std::queue<std::function<void()>> m_Tasks;
    std::mutex m_QueueMutex;
    std::condition_variable m_Condition;
    bool m_Stop;
    
public:
    SimpleThreadPool(size_t numThreads) : m_Stop(false)
    {
        for (size_t i = 0; i < numThreads; ++i)
        {
            m_Workers.emplace_back([this] {
                while (true)
                {
                    std::function<void()> task;
                    
                    {
                        std::unique_lock<std::mutex> lock(m_QueueMutex);
                        m_Condition.wait(lock, [this] { return m_Stop || !m_Tasks.empty(); });
                        
                        if (m_Stop && m_Tasks.empty())
                            return;
                        
                        task = std::move(m_Tasks.front());
                        m_Tasks.pop();
                    }
                    
                    task();
                }
            });
        }
    }
    
    ~SimpleThreadPool()
    {
        {
            std::unique_lock<std::mutex> lock(m_QueueMutex);
            m_Stop = true;
        }
        
        m_Condition.notify_all();
        
        for (std::thread& worker : m_Workers)
        {
            worker.join();
        }
    }
    
    template<class F, class... Args>
    auto Enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>
    {
        using return_type = typename std::result_of<F(Args...)>::type;
        
        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        
        std::future<return_type> result = task->get_future();
        
        {
            std::unique_lock<std::mutex> lock(m_QueueMutex);
            
            if (m_Stop)
                throw std::runtime_error("Enqueue on stopped ThreadPool");
            
            m_Tasks.emplace([task]() { (*task)(); });
        }
        
        m_Condition.notify_one();
        return result;
    }
};

// 测试函数
int CalculateSum(int start, int end)
{
    std::cout << "Calculating sum from " << start << " to " << end 
              << " in thread " << std::this_thread::get_id() << std::endl;
    
    int sum = 0;
    for (int i = start; i <= end; ++i)
    {
        sum += i;
    }
    
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    return sum;
}

void PrintMessage(const std::string& message)
{
    std::cout << "Message: " << message << " from thread " 
              << std::this_thread::get_id() << std::endl;
}

int main()
{
    std::cout << "=== Thread Pool Example ===" << std::endl;
    
    SimpleThreadPool pool(4);
    
    std::vector<std::future<int>> results;
    
    // 提交计算任务
    for (int i = 0; i < 8; ++i)
    {
        results.emplace_back(
            pool.Enqueue(CalculateSum, i * 10, (i + 1) * 10 - 1)
        );
    }
    
    // 提交打印任务
    for (int i = 0; i < 5; ++i)
    {
        pool.Enqueue(PrintMessage, "Hello from task " + std::to_string(i));
    }
    
    // 获取计算结果
    std::cout << "\nCalculation results:" << std::endl;
    for (auto& result : results)
    {
        std::cout << "Sum: " << result.get() << std::endl;
    }
    
    std::cout << "All tasks completed!" << std::endl;
    
    return 0;
}

线程的最佳实践

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#include <iostream>
#include <thread>
#include <mutex>
#include <atomic>
#include <chrono>

class ThreadSafeCounter
{
private:
    mutable std::mutex m_Mutex;
    int m_Value;
    
public:
    ThreadSafeCounter() : m_Value(0) {}
    
    void Increment()
    {
        std::lock_guard<std::mutex> lock(m_Mutex);
        ++m_Value;
    }
    
    void Decrement()
    {
        std::lock_guard<std::mutex> lock(m_Mutex);
        --m_Value;
    }
    
    int GetValue() const
    {
        std::lock_guard<std::mutex> lock(m_Mutex);
        return m_Value;
    }
};

void BestPracticesDemo()
{
    std::cout << "=== Thread Best Practices ===" << std::endl;
    
    // 1. 使用RAII管理线程资源
    {
        std::thread t([]() {
            std::cout << "RAII thread example" << std::endl;
        });
        
        // 确保线程被正确join或detach
        if (t.joinable())
        {
            t.join();
        }
    }  // 线程对象在此处销毁
    
    // 2. 避免数据竞争
    ThreadSafeCounter counter;
    std::vector<std::thread> threads;
    
    for (int i = 0; i < 4; ++i)
    {
        threads.emplace_back([&counter]() {
            for (int j = 0; j < 100; ++j)
            {
                counter.Increment();
            }
        });
    }
    
    for (auto& t : threads)
    {
        t.join();
    }
    
    std::cout << "Counter value: " << counter.GetValue() << std::endl;
    
    // 3. 使用原子操作替代简单的互斥锁
    std::atomic<int> atomicCounter(0);
    threads.clear();
    
    for (int i = 0; i < 4; ++i)
    {
        threads.emplace_back([&atomicCounter]() {
            for (int j = 0; j < 100; ++j)
            {
                atomicCounter.fetch_add(1);
            }
        });
    }
    
    for (auto& t : threads)
    {
        t.join();
    }
    
    std::cout << "Atomic counter value: " << atomicCounter.load() << std::endl;
    
    // 4. 检查线程是否可join
    std::thread detachedThread([]() {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        std::cout << "Detached thread finished" << std::endl;
    });
    
    detachedThread.detach();  // 分离线程
    
    // 不能对已分离的线程调用join
    if (detachedThread.joinable())
    {
        detachedThread.join();
    }
    else
    {
        std::cout << "Thread is not joinable (detached)" << std::endl;
    }
    
    // 等待分离的线程完成
    std::this_thread::sleep_for(std::chrono::milliseconds(200));
}

int main()
{
    BestPracticesDemo();
    
    std::cout << "\nMain thread finished!" << std::endl;
    
    return 0;
}

总结

  1. std::thread基础:C++11标准线程库,提供跨平台多线程支持
  2. 线程管理
    • join():等待线程执行完成,类似C#的wait_for_exit()
    • detach():分离线程,让其独立运行
    • std::this_thread:操作当前线程的工具
  3. 同步机制
    • std::mutex:互斥锁,防止数据竞争
    • std::condition_variable:条件变量,线程间通信
    • std::atomic:原子操作,无锁编程
  4. 最佳实践
    • 使用RAII管理线程资源
    • 避免数据竞争
    • 优先使用原子操作而非互斥锁(适用于简单操作)
    • 检查线程是否可join
  5. 高级特性:线程池、future/promise、内存序等
  6. 注意事项:线程创建有开销,合理控制线程数量,避免过度同步
updatedupdated2025-09-202025-09-20