C++学习笔记-Async Future

std::async和std::future是C++11引入的异步编程工具,用于在后台执行任务并在需要时获取结果。它们提供了一种简单而强大的方式来实现并发编程,避免了手动管理线程的复杂性。

基本用法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#include <iostream>
#include <string>
#include <chrono>
#include <future>
#include <thread>

/* 模拟读取文件 */
std::string readFileContent(const std::string& filename) {
    std::this_thread::sleep_for(std::chrono::seconds(3));
    return "This is the content of the file: " + filename;
}

int calculateSum(int start, int end) {
    std::this_thread::sleep_for(std::chrono::seconds(2));
    int sum = 0;
    for (int i = start; i <= end; ++i) {
        sum += i;
    }
    return sum;
}

void BasicAsyncDemo()
{
    std::cout << "=== Basic Async Demo ===" << std::endl;
    
    std::string filename = "example.txt";

    /*
    异步执行函数readFileContent
    这里需要注意的是,如果你不需要接收返回值,
    那么你不需要调用std::future的任何方法,这样就使得不会有任何效果。
    一般情况下返回值是需要的,那么你需要调用get()方法,这时候就会等待任务执行完毕并获取其返回值。
    
    关于std::async的启动策略:
        1. std::launch::async:强制异步执行,创建新线程。
        2. std::launch::deferred:延迟执行,任务不会立即执行,只有在调用 std::future 的 get 或 wait 方法时才会执行。
    */
    std::future<std::string> fileContentFuture = std::async(std::launch::async, readFileContent, filename);

    std::cout << "Doing some other work while the file is being read..." << std::endl;
    
    // 模拟其他工作
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Still doing other work..." << std::endl;

    std::string fileContent = fileContentFuture.get(); // 获取任务的返回值

    std::cout << "File content: " << fileContent << std::endl;
}

void MultipleAsyncTasks()
{
    std::cout << "\n=== Multiple Async Tasks ===" << std::endl;
    
    // 启动多个异步任务
    auto future1 = std::async(std::launch::async, calculateSum, 1, 1000);
    auto future2 = std::async(std::launch::async, calculateSum, 1001, 2000);
    auto future3 = std::async(std::launch::async, calculateSum, 2001, 3000);
    
    std::cout << "Started 3 calculation tasks..." << std::endl;
    
    // 等待所有任务完成并获取结果
    int result1 = future1.get();
    int result2 = future2.get();
    int result3 = future3.get();
    
    std::cout << "Sum 1-1000: " << result1 << std::endl;
    std::cout << "Sum 1001-2000: " << result2 << std::endl;
    std::cout << "Sum 2001-3000: " << result3 << std::endl;
    std::cout << "Total sum: " << (result1 + result2 + result3) << std::endl;
}

int main()
{
    BasicAsyncDemo();
    MultipleAsyncTasks();
    
    /*
    总结
    1. std::async 创建一个新的异步任务,并返回一个 std::future 对象,该对象可以用于获取任务的结果或检查状态。
    2. std::async和std::future需要一起使用,否则就不会有任何效果。
    */
    
    return 0;
}

启动策略详解

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#include <iostream>
#include <future>
#include <chrono>
#include <thread>

int heavyComputation(int n)
{
    std::cout << "Starting computation in thread: " << std::this_thread::get_id() << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(2));
    return n * n;
}

void LaunchPolicyDemo()
{
    std::cout << "=== Launch Policy Demo ===" << std::endl;
    std::cout << "Main thread: " << std::this_thread::get_id() << std::endl;
    
    // 1. std::launch::async - 强制异步执行
    std::cout << "\n1. std::launch::async:" << std::endl;
    auto future1 = std::async(std::launch::async, heavyComputation, 5);
    std::cout << "Task started asynchronously" << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Doing other work..." << std::endl;
    int result1 = future1.get();
    std::cout << "Result: " << result1 << std::endl;
    
    // 2. std::launch::deferred - 延迟执行
    std::cout << "\n2. std::launch::deferred:" << std::endl;
    auto future2 = std::async(std::launch::deferred, heavyComputation, 6);
    std::cout << "Task created but not started yet" << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "About to call get()..." << std::endl;
    int result2 = future2.get(); // 现在才开始执行
    std::cout << "Result: " << result2 << std::endl;
    
    // 3. 默认策略 - 实现定义
    std::cout << "\n3. Default policy (implementation defined):" << std::endl;
    auto future3 = std::async(heavyComputation, 7); // 可能是async或deferred
    std::cout << "Task created with default policy" << std::endl;
    int result3 = future3.get();
    std::cout << "Result: " << result3 << std::endl;
    
    // 4. 组合策略
    std::cout << "\n4. Combined policy:" << std::endl;
    auto future4 = std::async(std::launch::async | std::launch::deferred, heavyComputation, 8);
    std::cout << "Task created with combined policy" << std::endl;
    int result4 = future4.get();
    std::cout << "Result: " << result4 << std::endl;
}

int main()
{
    LaunchPolicyDemo();
    return 0;
}

Future状态检查和等待

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#include <iostream>
#include <future>
#include <chrono>
#include <thread>

int longRunningTask(int seconds)
{
    for (int i = 0; i < seconds; ++i)
    {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        std::cout << "Task progress: " << (i + 1) << "/" << seconds << std::endl;
    }
    return seconds * 100;
}

void FutureStatusDemo()
{
    std::cout << "=== Future Status Demo ===" << std::endl;
    
    // 启动长时间运行的任务
    auto future = std::async(std::launch::async, longRunningTask, 5);
    
    // 检查任务状态
    while (true)
    {
        auto status = future.wait_for(std::chrono::milliseconds(500));
        
        switch (status)
        {
            case std::future_status::ready:
                std::cout << "Task completed!" << std::endl;
                std::cout << "Result: " << future.get() << std::endl;
                return;
                
            case std::future_status::timeout:
                std::cout << "Task still running..." << std::endl;
                break;
                
            case std::future_status::deferred:
                std::cout << "Task is deferred, calling get() to start..." << std::endl;
                std::cout << "Result: " << future.get() << std::endl;
                return;
        }
        
        // 在等待期间做其他工作
        std::cout << "Doing other work while waiting..." << std::endl;
    }
}

void WaitingDemo()
{
    std::cout << "\n=== Waiting Demo ===" << std::endl;
    
    auto future1 = std::async(std::launch::async, []() {
        std::this_thread::sleep_for(std::chrono::seconds(2));
        return 42;
    });
    
    auto future2 = std::async(std::launch::async, []() {
        std::this_thread::sleep_for(std::chrono::seconds(3));
        return std::string("Hello");
    });
    
    // 等待特定时间
    std::cout << "Waiting for future1 (max 1 second)..." << std::endl;
    if (future1.wait_for(std::chrono::seconds(1)) == std::future_status::ready)
    {
        std::cout << "Future1 ready: " << future1.get() << std::endl;
    }
    else
    {
        std::cout << "Future1 not ready yet, waiting more..." << std::endl;
        std::cout << "Future1 result: " << future1.get() << std::endl;
    }
    
    // 等待到特定时间点
    auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(1);
    std::cout << "Waiting for future2 until deadline..." << std::endl;
    if (future2.wait_until(deadline) == std::future_status::ready)
    {
        std::cout << "Future2 ready: " << future2.get() << std::endl;
    }
    else
    {
        std::cout << "Future2 not ready by deadline, waiting indefinitely..." << std::endl;
        future2.wait(); // 无限等待
        std::cout << "Future2 result: " << future2.get() << std::endl;
    }
}

int main()
{
    FutureStatusDemo();
    WaitingDemo();
    return 0;
}

异常处理

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
#include <iostream>
#include <future>
#include <stdexcept>
#include <string>

int riskyOperation(int value)
{
    if (value < 0)
    {
        throw std::invalid_argument("Value cannot be negative");
    }
    
    if (value == 0)
    {
        throw std::runtime_error("Division by zero");
    }
    
    return 100 / value;
}

std::string fileOperation(const std::string& filename)
{
    if (filename.empty())
    {
        throw std::invalid_argument("Filename cannot be empty");
    }
    
    if (filename == "nonexistent.txt")
    {
        throw std::runtime_error("File not found");
    }
    
    return "Content of " + filename;
}

void ExceptionHandlingDemo()
{
    std::cout << "=== Exception Handling Demo ===" << std::endl;
    
    // 测试正常情况
    std::cout << "1. Normal operation:" << std::endl;
    auto future1 = std::async(std::launch::async, riskyOperation, 10);
    try
    {
        int result = future1.get();
        std::cout << "Result: " << result << std::endl;
    }
    catch (const std::exception& e)
    {
        std::cout << "Exception: " << e.what() << std::endl;
    }
    
    // 测试异常情况1
    std::cout << "\n2. Exception case 1 (negative value):" << std::endl;
    auto future2 = std::async(std::launch::async, riskyOperation, -5);
    try
    {
        int result = future2.get();
        std::cout << "Result: " << result << std::endl;
    }
    catch (const std::exception& e)
    {
        std::cout << "Exception caught: " << e.what() << std::endl;
    }
    
    // 测试异常情况2
    std::cout << "\n3. Exception case 2 (zero value):" << std::endl;
    auto future3 = std::async(std::launch::async, riskyOperation, 0);
    try
    {
        int result = future3.get();
        std::cout << "Result: " << result << std::endl;
    }
    catch (const std::exception& e)
    {
        std::cout << "Exception caught: " << e.what() << std::endl;
    }
    
    // 多个异步任务的异常处理
    std::cout << "\n4. Multiple tasks with exceptions:" << std::endl;
    std::vector<std::future<std::string>> futures;
    std::vector<std::string> filenames{"file1.txt", "", "nonexistent.txt", "file2.txt"};
    
    for (const auto& filename : filenames)
    {
        futures.push_back(std::async(std::launch::async, fileOperation, filename));
    }
    
    for (size_t i = 0; i < futures.size(); ++i)
    {
        try
        {
            std::string result = futures[i].get();
            std::cout << "File " << i << ": " << result << std::endl;
        }
        catch (const std::exception& e)
        {
            std::cout << "File " << i << " error: " << e.what() << std::endl;
        }
    }
}

int main()
{
    ExceptionHandlingDemo();
    return 0;
}

实际应用场景

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
#include <iostream>
#include <future>
#include <vector>
#include <algorithm>
#include <numeric>
#include <chrono>
#include <random>

// 并行计算示例
class ParallelProcessor
{
public:
    // 并行求和
    static long long parallelSum(const std::vector<int>& data, size_t numThreads = 4)
    {
        if (data.empty()) return 0;
        
        size_t chunkSize = data.size() / numThreads;
        std::vector<std::future<long long>> futures;
        
        for (size_t i = 0; i < numThreads; ++i)
        {
            size_t start = i * chunkSize;
            size_t end = (i == numThreads - 1) ? data.size() : (i + 1) * chunkSize;
            
            futures.push_back(std::async(std::launch::async, [&data, start, end]() {
                return std::accumulate(data.begin() + start, data.begin() + end, 0LL);
            }));
        }
        
        long long total = 0;
        for (auto& future : futures)
        {
            total += future.get();
        }
        
        return total;
    }
    
    // 并行查找
    static std::vector<size_t> parallelFind(const std::vector<int>& data, int target, size_t numThreads = 4)
    {
        if (data.empty()) return {};
        
        size_t chunkSize = data.size() / numThreads;
        std::vector<std::future<std::vector<size_t>>> futures;
        
        for (size_t i = 0; i < numThreads; ++i)
        {
            size_t start = i * chunkSize;
            size_t end = (i == numThreads - 1) ? data.size() : (i + 1) * chunkSize;
            
            futures.push_back(std::async(std::launch::async, [&data, target, start, end]() {
                std::vector<size_t> indices;
                for (size_t j = start; j < end; ++j)
                {
                    if (data[j] == target)
                    {
                        indices.push_back(j);
                    }
                }
                return indices;
            }));
        }
        
        std::vector<size_t> allIndices;
        for (auto& future : futures)
        {
            auto indices = future.get();
            allIndices.insert(allIndices.end(), indices.begin(), indices.end());
        }
        
        return allIndices;
    }
};

// 网络请求模拟
class NetworkClient
{
public:
    static std::string simulateHttpRequest(const std::string& url, int delayMs)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(delayMs));
        return "Response from " + url + " (delay: " + std::to_string(delayMs) + "ms)";
    }
    
    static std::vector<std::string> fetchMultipleUrls(const std::vector<std::string>& urls)
    {
        std::vector<std::future<std::string>> futures;
        std::random_device rd;
        std::mt19937 gen(rd());
        std::uniform_int_distribution<> dis(100, 1000);
        
        // 启动所有请求
        for (const auto& url : urls)
        {
            int delay = dis(gen);
            futures.push_back(std::async(std::launch::async, simulateHttpRequest, url, delay));
        }
        
        // 收集所有响应
        std::vector<std::string> responses;
        for (auto& future : futures)
        {
            responses.push_back(future.get());
        }
        
        return responses;
    }
};

// 生产者-消费者模式
class TaskProcessor
{
private:
    std::vector<std::function<int()>> m_Tasks;
    
public:
    void addTask(std::function<int()> task)
    {
        m_Tasks.push_back(task);
    }
    
    std::vector<int> processAllTasks()
    {
        std::vector<std::future<int>> futures;
        
        // 启动所有任务
        for (auto& task : m_Tasks)
        {
            futures.push_back(std::async(std::launch::async, task));
        }
        
        // 收集结果
        std::vector<int> results;
        for (auto& future : futures)
        {
            results.push_back(future.get());
        }
        
        return results;
    }
};

void PracticalExamplesDemo()
{
    std::cout << "=== Practical Examples Demo ===" << std::endl;
    
    // 1. 并行计算
    std::cout << "1. Parallel computation:" << std::endl;
    std::vector<int> largeData(1000000);
    std::iota(largeData.begin(), largeData.end(), 1); // 填充1到1000000
    
    auto start = std::chrono::high_resolution_clock::now();
    long long parallelResult = ParallelProcessor::parallelSum(largeData);
    auto end = std::chrono::high_resolution_clock::now();
    auto parallelTime = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
    
    start = std::chrono::high_resolution_clock::now();
    long long sequentialResult = std::accumulate(largeData.begin(), largeData.end(), 0LL);
    end = std::chrono::high_resolution_clock::now();
    auto sequentialTime = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
    
    std::cout << "Parallel result: " << parallelResult << " (time: " << parallelTime.count() << "ms)" << std::endl;
    std::cout << "Sequential result: " << sequentialResult << " (time: " << sequentialTime.count() << "ms)" << std::endl;
    std::cout << "Results match: " << (parallelResult == sequentialResult ? "Yes" : "No") << std::endl;
    
    // 2. 并行查找
    std::cout << "\n2. Parallel search:" << std::endl;
    std::vector<int> searchData{1, 5, 3, 5, 7, 5, 9, 5, 2, 5};
    auto indices = ParallelProcessor::parallelFind(searchData, 5);
    std::cout << "Found value 5 at indices: ";
    for (size_t idx : indices)
    {
        std::cout << idx << " ";
    }
    std::cout << std::endl;
    
    // 3. 网络请求
    std::cout << "\n3. Concurrent network requests:" << std::endl;
    std::vector<std::string> urls{
        "http://api1.example.com",
        "http://api2.example.com",
        "http://api3.example.com",
        "http://api4.example.com"
    };
    
    start = std::chrono::high_resolution_clock::now();
    auto responses = NetworkClient::fetchMultipleUrls(urls);
    end = std::chrono::high_resolution_clock::now();
    auto networkTime = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
    
    std::cout << "Fetched " << responses.size() << " URLs in " << networkTime.count() << "ms:" << std::endl;
    for (const auto& response : responses)
    {
        std::cout << "  " << response << std::endl;
    }
    
    // 4. 任务处理器
    std::cout << "\n4. Task processor:" << std::endl;
    TaskProcessor processor;
    
    processor.addTask([]() { 
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        return 10; 
    });
    processor.addTask([]() { 
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
        return 20; 
    });
    processor.addTask([]() { 
        std::this_thread::sleep_for(std::chrono::milliseconds(150));
        return 30; 
    });
    
    start = std::chrono::high_resolution_clock::now();
    auto taskResults = processor.processAllTasks();
    end = std::chrono::high_resolution_clock::now();
    auto taskTime = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
    
    std::cout << "Processed " << taskResults.size() << " tasks in " << taskTime.count() << "ms:" << std::endl;
    for (size_t i = 0; i < taskResults.size(); ++i)
    {
        std::cout << "  Task " << i << " result: " << taskResults[i] << std::endl;
    }
}

int main()
{
    PracticalExamplesDemo();
    return 0;
}

总结

  1. std::async基础:创建异步任务并返回std::future对象
  2. 启动策略
    • std::launch::async:强制异步执行
    • std::launch::deferred:延迟执行
    • 默认:实现定义的策略
  3. std::future功能
    • get():获取结果(阻塞)
    • wait():等待完成(阻塞)
    • wait_for():等待指定时间
    • wait_until():等待到指定时间点
  4. 异常处理:异步任务中的异常会在调用get()时重新抛出
  5. 实际应用
    • 并行计算
    • 网络请求
    • 文件I/O
    • 任务处理
  6. 最佳实践
    • 必须调用get()或wait()才能确保任务执行
    • 使用适当的启动策略
    • 处理异步任务中的异常
    • 考虑线程数量和系统资源
    • 避免过度创建线程
updatedupdated2025-09-202025-09-20