Go学习笔记-Goroutines

Goroutine是Go语言并发编程的核心概念,它是轻量级的线程,由Go运行时管理。Goroutine使得编写并发程序变得简单而高效,是Go语言的杀手级特性之一。

基本Goroutine使用

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

// 1. 简单的函数,用于演示goroutine
func sayHello(name string) {
    for i := 0; i < 5; i++ {
        fmt.Printf("Hello from %s - %d\n", name, i)
        time.Sleep(100 * time.Millisecond)
    }
}

// 2. 带参数的goroutine函数
func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done() // 确保在函数结束时调用Done()
    
    fmt.Printf("Worker %d starting\n", id)
    
    // 模拟工作
    time.Sleep(time.Duration(id) * 100 * time.Millisecond)
    
    fmt.Printf("Worker %d done\n", id)
}

// 3. 计算密集型任务
func calculateSum(start, end int, result chan<- int) {
    sum := 0
    for i := start; i <= end; i++ {
        sum += i
    }
    result <- sum
}

// 4. 并发下载模拟
func downloadFile(url string, wg *sync.WaitGroup) {
    defer wg.Done()
    
    fmt.Printf("开始下载: %s\n", url)
    
    // 模拟下载时间
    downloadTime := time.Duration(len(url)*10) * time.Millisecond
    time.Sleep(downloadTime)
    
    fmt.Printf("下载完成: %s\n", url)
}

func main() {
    // 1. 基本goroutine使用
    fmt.Println("=== 基本Goroutine ===")
    
    // 启动goroutine
    go sayHello("Goroutine-1")
    go sayHello("Goroutine-2")
    
    // 主goroutine也执行一些工作
    sayHello("Main")
    
    // 等待一段时间让goroutine完成
    time.Sleep(1 * time.Second)
    
    // 2. 使用WaitGroup等待goroutine完成
    fmt.Println("\n=== 使用WaitGroup ===")
    
    var wg sync.WaitGroup
    
    // 启动多个worker goroutine
    for i := 1; i <= 5; i++ {
        wg.Add(1) // 增加等待计数
        go worker(i, &wg)
    }
    
    // 等待所有goroutine完成
    wg.Wait()
    fmt.Println("所有worker完成")
    
    // 3. 并发计算
    fmt.Println("\n=== 并发计算 ===")
    
    // 计算1到1000000的和,分成4个goroutine并发计算
    const total = 1000000
    const numGoroutines = 4
    const chunkSize = total / numGoroutines
    
    results := make(chan int, numGoroutines)
    
    start := time.Now()
    
    // 启动计算goroutine
    for i := 0; i < numGoroutines; i++ {
        startNum := i*chunkSize + 1
        endNum := (i + 1) * chunkSize
        if i == numGoroutines-1 {
            endNum = total // 确保最后一个goroutine处理剩余的数字
        }
        
        go calculateSum(startNum, endNum, results)
    }
    
    // 收集结果
    totalSum := 0
    for i := 0; i < numGoroutines; i++ {
        totalSum += <-results
    }
    
    elapsed := time.Since(start)
    fmt.Printf("并发计算结果: %d, 耗时: %v\n", totalSum, elapsed)
    
    // 4. 并发下载
    fmt.Println("\n=== 并发下载 ===")
    
    urls := []string{
        "https://example.com/file1.zip",
        "https://example.com/file2.zip",
        "https://example.com/file3.zip",
        "https://example.com/file4.zip",
        "https://example.com/file5.zip",
    }
    
    var downloadWg sync.WaitGroup
    
    start = time.Now()
    
    for _, url := range urls {
        downloadWg.Add(1)
        go downloadFile(url, &downloadWg)
    }
    
    downloadWg.Wait()
    elapsed = time.Since(start)
    fmt.Printf("所有下载完成,总耗时: %v\n", elapsed)
    
    // 5. 匿名函数goroutine
    fmt.Println("\n=== 匿名函数Goroutine ===")
    
    var anonymousWg sync.WaitGroup
    
    for i := 1; i <= 3; i++ {
        anonymousWg.Add(1)
        
        // 使用匿名函数创建goroutine
        go func(id int) {
            defer anonymousWg.Done()
            fmt.Printf("匿名goroutine %d 执行中\n", id)
            time.Sleep(200 * time.Millisecond)
            fmt.Printf("匿名goroutine %d 完成\n", id)
        }(i) // 注意:传递i的值,避免闭包陷阱
    }
    
    anonymousWg.Wait()
    
    // 6. 查看goroutine信息
    fmt.Println("\n=== Goroutine信息 ===")
    fmt.Printf("当前goroutine数量: %d\n", runtime.NumGoroutine())
    fmt.Printf("CPU核心数: %d\n", runtime.NumCPU())
    fmt.Printf("GOMAXPROCS: %d\n", runtime.GOMAXPROCS(0))
}

Goroutine的生命周期和管理

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
package main

import (
    "context"
    "fmt"
    "runtime"
    "sync"
    "sync/atomic"
    "time"
)

// 1. 长时间运行的goroutine
func longRunningTask(ctx context.Context, id int) {
    fmt.Printf("任务 %d 开始\n", id)
    
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("任务 %d 收到取消信号,正在清理...\n", id)
            // 执行清理工作
            time.Sleep(100 * time.Millisecond)
            fmt.Printf("任务 %d 已停止\n", id)
            return
        default:
            // 执行实际工作
            fmt.Printf("任务 %d 正在工作...\n", id)
            time.Sleep(500 * time.Millisecond)
        }
    }
}

// 2. 带超时的goroutine
func taskWithTimeout(id int, duration time.Duration) {
    fmt.Printf("超时任务 %d 开始,超时时间: %v\n", id, duration)
    
    ctx, cancel := context.WithTimeout(context.Background(), duration)
    defer cancel()
    
    done := make(chan bool)
    
    go func() {
        // 模拟工作
        time.Sleep(2 * time.Second)
        done <- true
    }()
    
    select {
    case <-done:
        fmt.Printf("超时任务 %d 正常完成\n", id)
    case <-ctx.Done():
        fmt.Printf("超时任务 %d 超时\n", id)
    }
}

// 3. 工作池模式
type Job struct {
    ID   int
    Data string
}

type Result struct {
    Job    Job
    Output string
    Error  error
}

func worker2(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for job := range jobs {
        fmt.Printf("Worker %d 处理任务 %d\n", id, job.ID)
        
        // 模拟处理时间
        time.Sleep(time.Duration(job.ID*100) * time.Millisecond)
        
        result := Result{
            Job:    job,
            Output: fmt.Sprintf("处理结果: %s", job.Data),
            Error:  nil,
        }
        
        results <- result
    }
    
    fmt.Printf("Worker %d 退出\n", id)
}

// 4. 生产者-消费者模式
func producer(data chan<- int, count int) {
    defer close(data)
    
    for i := 1; i <= count; i++ {
        fmt.Printf("生产: %d\n", i)
        data <- i
        time.Sleep(100 * time.Millisecond)
    }
    
    fmt.Println("生产者完成")
}

func consumer(id int, data <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for value := range data {
        fmt.Printf("消费者 %d 消费: %d\n", id, value)
        time.Sleep(200 * time.Millisecond)
    }
    
    fmt.Printf("消费者 %d 退出\n", id)
}

// 5. 错误处理和恢复
func riskyTask(id int, wg *sync.WaitGroup, errors chan<- error) {
    defer wg.Done()
    defer func() {
        if r := recover(); r != nil {
            errors <- fmt.Errorf("任务 %d panic: %v", id, r)
        }
    }()
    
    fmt.Printf("风险任务 %d 开始\n", id)
    
    // 模拟可能panic的操作
    if id == 3 {
        panic("模拟panic")
    }
    
    // 模拟可能出错的操作
    if id == 2 {
        errors <- fmt.Errorf("任务 %d 执行失败", id)
        return
    }
    
    time.Sleep(500 * time.Millisecond)
    fmt.Printf("风险任务 %d 成功完成\n", id)
}

func main() {
    // 1. 可取消的长时间运行任务
    fmt.Println("=== 可取消的长时间任务 ===")
    
    ctx, cancel := context.WithCancel(context.Background())
    
    // 启动多个长时间运行的任务
    for i := 1; i <= 3; i++ {
        go longRunningTask(ctx, i)
    }
    
    // 让任务运行一段时间
    time.Sleep(2 * time.Second)
    
    // 取消所有任务
    fmt.Println("发送取消信号...")
    cancel()
    
    // 等待任务清理完成
    time.Sleep(1 * time.Second)
    
    // 2. 超时任务
    fmt.Println("\n=== 超时任务 ===")
    
    // 启动不同超时时间的任务
    go taskWithTimeout(1, 1*time.Second)  // 会超时
    go taskWithTimeout(2, 3*time.Second)  // 不会超时
    
    time.Sleep(4 * time.Second)
    
    // 3. 工作池模式
    fmt.Println("\n=== 工作池模式 ===")
    
    const numWorkers = 3
    const numJobs = 10
    
    jobs := make(chan Job, numJobs)
    results := make(chan Result, numJobs)
    
    var workerWg sync.WaitGroup
    
    // 启动worker
    for i := 1; i <= numWorkers; i++ {
        workerWg.Add(1)
        go worker2(i, jobs, results, &workerWg)
    }
    
    // 发送任务
    go func() {
        for i := 1; i <= numJobs; i++ {
            jobs <- Job{
                ID:   i,
                Data: fmt.Sprintf("任务数据-%d", i),
            }
        }
        close(jobs)
    }()
    
    // 收集结果
    go func() {
        workerWg.Wait()
        close(results)
    }()
    
    // 处理结果
    for result := range results {
        if result.Error != nil {
            fmt.Printf("任务 %d 失败: %v\n", result.Job.ID, result.Error)
        } else {
            fmt.Printf("任务 %d 成功: %s\n", result.Job.ID, result.Output)
        }
    }
    
    // 4. 生产者-消费者模式
    fmt.Println("\n=== 生产者-消费者模式 ===")
    
    data := make(chan int, 5) // 带缓冲的channel
    var consumerWg sync.WaitGroup
    
    // 启动消费者
    for i := 1; i <= 2; i++ {
        consumerWg.Add(1)
        go consumer(i, data, &consumerWg)
    }
    
    // 启动生产者
    go producer(data, 10)
    
    // 等待所有消费者完成
    consumerWg.Wait()
    
    // 5. 错误处理和恢复
    fmt.Println("\n=== 错误处理和恢复 ===")
    
    var errorWg sync.WaitGroup
    errors := make(chan error, 10)
    
    // 启动多个可能出错的任务
    for i := 1; i <= 5; i++ {
        errorWg.Add(1)
        go riskyTask(i, &errorWg, errors)
    }
    
    // 等待所有任务完成并关闭错误channel
    go func() {
        errorWg.Wait()
        close(errors)
    }()
    
    // 收集和处理错误
    var errorCount int
    for err := range errors {
        if err != nil {
            fmt.Printf("收到错误: %v\n", err)
            errorCount++
        }
    }
    
    fmt.Printf("总共收到 %d 个错误\n", errorCount)
    
    // 6. Goroutine泄漏检测
    fmt.Println("\n=== Goroutine泄漏检测 ===")
    
    before := runtime.NumGoroutine()
    fmt.Printf("开始前goroutine数量: %d\n", before)
    
    // 创建一些goroutine
    var leakWg sync.WaitGroup
    for i := 0; i < 10; i++ {
        leakWg.Add(1)
        go func(id int) {
            defer leakWg.Done()
            time.Sleep(100 * time.Millisecond)
        }(i)
    }
    
    leakWg.Wait()
    
    // 强制垃圾回收
    runtime.GC()
    time.Sleep(100 * time.Millisecond)
    
    after := runtime.NumGoroutine()
    fmt.Printf("结束后goroutine数量: %d\n", after)
    
    if after > before {
        fmt.Printf("可能存在goroutine泄漏: %d\n", after-before)
    } else {
        fmt.Println("没有检测到goroutine泄漏")
    }
}

Goroutine的高级用法

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
package main

import (
    "context"
    "fmt"
    "math/rand"
    "runtime"
    "sync"
    "sync/atomic"
    "time"
)

// 1. 限制并发数量
type Semaphore struct {
    ch chan struct{}
}

func NewSemaphore(capacity int) *Semaphore {
    return &Semaphore{
        ch: make(chan struct{}, capacity),
    }
}

func (s *Semaphore) Acquire() {
    s.ch <- struct{}{}
}

func (s *Semaphore) Release() {
    <-s.ch
}

// 2. 工作窃取模式
type WorkStealingPool struct {
    workers []chan func()
    next    uint64
}

func NewWorkStealingPool(numWorkers int) *WorkStealingPool {
    pool := &WorkStealingPool{
        workers: make([]chan func(), numWorkers),
    }
    
    for i := 0; i < numWorkers; i++ {
        pool.workers[i] = make(chan func(), 10)
        go pool.worker(i)
    }
    
    return pool
}

func (p *WorkStealingPool) worker(id int) {
    for task := range p.workers[id] {
        task()
    }
}

func (p *WorkStealingPool) Submit(task func()) {
    // 轮询分配任务
    n := atomic.AddUint64(&p.next, 1)
    worker := p.workers[n%uint64(len(p.workers))]
    
    select {
    case worker <- task:
        // 任务成功分配
    default:
        // 如果当前worker忙,尝试其他worker
        for i := 0; i < len(p.workers); i++ {
            select {
            case p.workers[i] <- task:
                return
            default:
                continue
            }
        }
        // 如果所有worker都忙,阻塞等待
        worker <- task
    }
}

// 3. 扇入扇出模式
func fanOut(input <-chan int, workers int) []<-chan int {
    outputs := make([]<-chan int, workers)
    
    for i := 0; i < workers; i++ {
        output := make(chan int)
        outputs[i] = output
        
        go func(out chan<- int) {
            defer close(out)
            for data := range input {
                // 处理数据
                processed := data * data
                out <- processed
            }
        }(output)
    }
    
    return outputs
}

func fanIn(inputs ...<-chan int) <-chan int {
    output := make(chan int)
    var wg sync.WaitGroup
    
    for _, input := range inputs {
        wg.Add(1)
        go func(in <-chan int) {
            defer wg.Done()
            for data := range in {
                output <- data
            }
        }(input)
    }
    
    go func() {
        wg.Wait()
        close(output)
    }()
    
    return output
}

// 4. 管道模式
func pipeline() {
    // 第一阶段:生成数据
    generate := func() <-chan int {
        out := make(chan int)
        go func() {
            defer close(out)
            for i := 1; i <= 10; i++ {
                out <- i
            }
        }()
        return out
    }
    
    // 第二阶段:平方
    square := func(in <-chan int) <-chan int {
        out := make(chan int)
        go func() {
            defer close(out)
            for n := range in {
                out <- n * n
            }
        }()
        return out
    }
    
    // 第三阶段:过滤偶数
    filterEven := func(in <-chan int) <-chan int {
        out := make(chan int)
        go func() {
            defer close(out)
            for n := range in {
                if n%2 == 0 {
                    out <- n
                }
            }
        }()
        return out
    }
    
    // 构建管道
    numbers := generate()
    squared := square(numbers)
    filtered := filterEven(squared)
    
    // 消费结果
    fmt.Println("管道结果:")
    for result := range filtered {
        fmt.Printf("%d ", result)
    }
    fmt.Println()
}

// 5. 超时和取消模式
func timeoutPattern() {
    doWork := func(ctx context.Context) <-chan string {
        result := make(chan string, 1)
        
        go func() {
            defer close(result)
            
            // 模拟工作
            select {
            case <-time.After(2 * time.Second):
                result <- "工作完成"
            case <-ctx.Done():
                return
            }
        }()
        
        return result
    }
    
    // 带超时的上下文
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()
    
    result := doWork(ctx)
    
    select {
    case res := <-result:
        fmt.Printf("结果: %s\n", res)
    case <-ctx.Done():
        fmt.Printf("操作超时: %v\n", ctx.Err())
    }
}

// 6. 竞争检测
func raceDetection() {
    var counter int64
    var wg sync.WaitGroup
    
    // 不安全的并发访问
    unsafeIncrement := func() {
        defer wg.Done()
        for i := 0; i < 1000; i++ {
            counter++ // 竞争条件
        }
    }
    
    // 安全的并发访问
    safeIncrement := func() {
        defer wg.Done()
        for i := 0; i < 1000; i++ {
            atomic.AddInt64(&counter, 1)
        }
    }
    
    fmt.Println("竞争检测演示:")
    
    // 重置计数器
    counter = 0
    
    // 启动多个goroutine进行不安全操作
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go unsafeIncrement()
    }
    wg.Wait()
    
    fmt.Printf("不安全操作结果: %d (期望: 10000)\n", counter)
    
    // 重置计数器
    counter = 0
    
    // 启动多个goroutine进行安全操作
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go safeIncrement()
    }
    wg.Wait()
    
    fmt.Printf("安全操作结果: %d (期望: 10000)\n", counter)
}

func main() {
    // 1. 限制并发数量
    fmt.Println("=== 限制并发数量 ===")
    
    sem := NewSemaphore(3) // 最多3个并发
    var wg sync.WaitGroup
    
    for i := 1; i <= 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            
            sem.Acquire()
            defer sem.Release()
            
            fmt.Printf("任务 %d 开始执行\n", id)
            time.Sleep(1 * time.Second)
            fmt.Printf("任务 %d 完成\n", id)
        }(i)
    }
    
    wg.Wait()
    
    // 2. 工作窃取池
    fmt.Println("\n=== 工作窃取池 ===")
    
    pool := NewWorkStealingPool(3)
    
    for i := 1; i <= 10; i++ {
        taskID := i
        pool.Submit(func() {
            fmt.Printf("执行任务 %d\n", taskID)
            time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
        })
    }
    
    time.Sleep(3 * time.Second)
    
    // 3. 扇入扇出模式
    fmt.Println("\n=== 扇入扇出模式 ===")
    
    input := make(chan int)
    
    // 启动数据生成器
    go func() {
        defer close(input)
        for i := 1; i <= 10; i++ {
            input <- i
        }
    }()
    
    // 扇出到3个worker
    outputs := fanOut(input, 3)
    
    // 扇入合并结果
    result := fanIn(outputs...)
    
    fmt.Println("扇入扇出结果:")
    for data := range result {
        fmt.Printf("%d ", data)
    }
    fmt.Println()
    
    // 4. 管道模式
    fmt.Println("\n=== 管道模式 ===")
    pipeline()
    
    // 5. 超时和取消模式
    fmt.Println("\n=== 超时和取消模式 ===")
    timeoutPattern()
    
    // 6. 竞争检测
    fmt.Println("\n=== 竞争检测 ===")
    raceDetection()
    
    // 7. 性能监控
    fmt.Println("\n=== 性能监控 ===")
    fmt.Printf("当前goroutine数量: %d\n", runtime.NumGoroutine())
    
    var m runtime.MemStats
    runtime.ReadMemStats(&m)
    fmt.Printf("内存使用: %d KB\n", m.Alloc/1024)
    fmt.Printf("GC次数: %d\n", m.NumGC)
}

总结

  1. Goroutine基础

    • 使用go关键字启动goroutine
    • Goroutine是轻量级线程,由Go运行时管理
    • 主goroutine结束时,所有其他goroutine也会结束
  2. 同步机制

    • sync.WaitGroup - 等待一组goroutine完成
    • sync.Mutex - 互斥锁保护共享资源
    • sync/atomic - 原子操作避免竞争条件
  3. 生命周期管理

    • 使用context.Context进行取消和超时控制
    • 合理的错误处理和恢复机制
    • 避免goroutine泄漏
  4. 并发模式

    • 工作池模式 - 限制并发数量
    • 生产者-消费者模式 - 解耦生产和消费
    • 扇入扇出模式 - 并行处理和结果合并
    • 管道模式 - 数据流处理
  5. 最佳实践

    • 避免共享内存,通过通信共享
    • 合理控制goroutine数量
    • 使用channel进行goroutine间通信
    • 注意闭包变量捕获问题
    • 进行竞争检测和性能监控
updatedupdated2025-09-202025-09-20