分类 Go语言 下的文章

Go并发编程实战:goroutine与channel最佳实践

Go语言的并发模型是其最大的优势之一。通过goroutine和channel,我们可以轻松构建高并发的应用程序。本文将分享在实际项目中使用goroutine和channel的最佳实践。

Goroutine基础

创建goroutine

package main

import (
    "fmt"
    "time"
)

func printNumbers() {
    for i := 1; i <= 5; i++ {
        fmt.Printf("Number: %d
", i)
        time.Sleep(250 * time.Millisecond)
    }
}

func main() {
    // 启动goroutine
    go printNumbers()
    
    // 主goroutine继续执行
    for i := 1; i <= 3; i++ {
        fmt.Printf("Main: %d
", i)
        time.Sleep(500 * time.Millisecond)
    }
    
    // 等待goroutine完成
    time.Sleep(2 * time.Second)
}

等待goroutine完成

使用sync.WaitGroup等待多个goroutine完成:

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done() // 完成后递减计数器
    
    fmt.Printf("Worker %d starting
", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d done
", id)
}

func main() {
    var wg sync.WaitGroup
    
    for i := 1; i <= 5; i++ {
        wg.Add(1) // 增加计数器
        go worker(i, &wg)
    }
    
    wg.Wait() // 等待所有goroutine完成
    fmt.Println("All workers completed")
}

Channel高级用法

Buffered Channel

package main

import "fmt"

func main() {
    // 创建缓冲大小为3的channel
    ch := make(chan int, 3)
    
    // 发送数据(不会阻塞,因为缓冲区未满)
    ch <- 1
    ch <- 2
    ch <- 3
    
    // 接收数据
    fmt.Println(<-ch) // 1
    fmt.Println(<-ch) // 2
    fmt.Println(<-ch) // 3
}

Select语句

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)
    
    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- "from ch1"
    }()
    
    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- "from ch2"
    }()
    
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-ch1:
            fmt.Println("Received:", msg1)
        case msg2 := <-ch2:
            fmt.Println("Received:", msg2)
        case <-time.After(3 * time.Second):
            fmt.Println("Timeout")
            return
        }
    }
}

实际项目案例

1. Web服务器并发处理

package main

import (
    "fmt"
    "net/http"
    "sync"
)

type RequestCounter struct {
    mu    sync.Mutex
    count int
}

func (rc *RequestCounter) Increment() {
    rc.mu.Lock()
    rc.count++
    rc.mu.Unlock()
}

func (rc *RequestCounter) GetCount() int {
    rc.mu.Lock()
    defer rc.mu.Unlock()
    return rc.count
}

func main() {
    counter := &RequestCounter{}
    
    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        counter.Increment()
        fmt.Fprintf(w, "Hello, World! Total requests: %d", counter.GetCount())
    })
    
    fmt.Println("Server starting on :8080")
    http.ListenAndServe(":8080", nil)
}

2. 并发数据下载器

package main

import (
    "fmt"
    "io"
    "net/http"
    "sync"
)

func downloadFile(url string, wg *sync.WaitGroup, results chan<- string) {
    defer wg.Done()
    
    resp, err := http.Get(url)
    if err != nil {
        results <- fmt.Sprintf("Failed to download %s: %v", url, err)
        return
    }
    defer resp.Body.Close()
    
    body, err := io.ReadAll(resp.Body)
    if err != nil {
        results <- fmt.Sprintf("Failed to read response from %s: %v", url, err)
        return
    }
    
    results <- fmt.Sprintf("Downloaded %s (%d bytes)", url, len(body))
}

func main() {
    urls := []string{
        "https://example.com/file1.txt",
        "https://example.com/file2.txt",
        "https://example.com/file3.txt",
    }
    
    var wg sync.WaitGroup
    results := make(chan string, len(urls))
    
    for _, url := range urls {
        wg.Add(1)
        go downloadFile(url, &wg, results)
    }
    
    wg.Wait()
    close(results)
    
    fmt.Println("Download results:")
    for result := range results {
        fmt.Println(result)
    }
}

3. 任务队列处理器

package main

import (
    "fmt"
    "sync"
    "time"
)

type Task struct {
    ID   int
    Name string
}

func worker(id int, tasks <-chan Task, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for task := range tasks {
        fmt.Printf("Worker %d processing task %d: %s
", id, task.ID, task.Name)
        time.Sleep(500 * time.Millisecond) // 模拟处理时间
        fmt.Printf("Worker %d completed task %d
", id, task.ID)
    }
    fmt.Printf("Worker %d shutting down
", id)
}

func main() {
    numWorkers := 3
    numTasks := 10
    
    tasks := make(chan Task, numTasks)
    var wg sync.WaitGroup
    
    // 启动worker
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, tasks, &wg)
    }
    
    // 发送任务
    for i := 1; i <= numTasks; i++ {
        tasks <- Task{ID: i, Name: fmt.Sprintf("Task-%d", i)}
    }
    close(tasks)
    
    wg.Wait()
    fmt.Println("All tasks completed")
}

最佳实践

1. 避免goroutine泄漏

// 不好的写法
func processData(data []string) {
    for _, item := range data {
        go func(item string) {
            // 处理数据
            fmt.Println(item)
        }(item)
    }
    // goroutine可能泄漏
}

// 好的写法
func processDataSafely(data []string) {
    var wg sync.WaitGroup
    for _, item := range data {
        wg.Add(1)
        go func(item string) {
            defer wg.Done()
            fmt.Println(item)
        }(item)
    }
    wg.Wait()
}

2. 使用context控制goroutine生命周期

package main

import (
    "context"
    "fmt"
    "time"
)

func worker(ctx context.Context, id int) {
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("Worker %d: Shutting down
", id)
            return
        default:
            fmt.Printf("Worker %d: Working...
", id)
            time.Sleep(1 * time.Second)
        }
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    
    go worker(ctx, 1)
    go worker(ctx, 2)
    
    time.Sleep(6 * time.Second)
}

3. 合理使用channel缓冲

// 根据场景选择合适的缓冲区大小
func processItems(items []Item) {
    // 小批量处理:缓冲大小为10
    taskCh := make(chan Item, 10)
    
    // 流式处理:无缓冲
    resultCh := make(chan Result)
    
    // 批量结果收集:缓冲大小为结果数量
    batchCh := make(chan Batch, len(items)/10)
}

性能调优

1. 减少锁竞争

// 不好的写法:频繁锁操作
type Counter struct {
    mu sync.Mutex
    count int
}

func (c *Counter) Increment() {
    c.mu.Lock()
    c.count++
    c.mu.Unlock()
}

// 好的写法:批量操作
func (c *Counter) AddBatch(n int) {
    c.mu.Lock()
    c.count += n
    c.mu.Unlock()
}

2. 使用sync.Pool减少内存分配

var bufferPool = sync.Pool{
    New: func() interface{} {
        return make([]byte, 1024)
    },
}

func getBuffer() []byte {
    return bufferPool.Get().([]byte)
}

func putBuffer(buf []byte) {
    bufferPool.Put(buf)
}

常见问题与解决方案

1. Deadlock预防

// 避免死锁:保证锁的获取顺序一致
func transfer(a, b *Account, amount int) {
    // 总是先锁ID小的账户
    first, second := a, b
    if a.ID > b.ID {
        first, second = b, a
    }
    
    first.mu.Lock()
    defer first.mu.Unlock()
    
    second.mu.Lock()
    defer second.mu.Unlock()
    
    // 执行转账逻辑
}

2. 资源限制

// 使用带缓冲的channel限制并发数
type Limiter struct {
    tokens chan struct{}
}

func NewLimiter(n int) *Limiter {
    return &Limiter{
        tokens: make(chan struct{}, n),
    }
}

func (l *Limiter) Acquire() {
    l.tokens <- struct{}{}
}

func (l *Limiter) Release() {
    <-l.tokens
}

func (l *Limiter) Do(f func()) {
    l.Acquire()
    defer l.Release()
    f()
}

总结

Go的并发编程虽然简单易用,但要写出高效、安全的并发代码仍需注意很多细节。通过合理使用goroutine、channel和sync包提供的工具,我们可以构建出高性能的并发应用程序。

记住以下原则:

  1. 明确goroutine的生命周期
  2. 合理使用channel进行通信
  3. 避免共享内存,通过通信共享内存
  4. 使用context管理goroutine
  5. 注意资源限制和性能调优

通过实践这些最佳实践,你将能够更好地利用Go的并发特性。