Go语言的并发模型是其最大的优势之一。通过goroutine和channel,我们可以轻松构建高并发的应用程序。本文将分享在实际项目中使用goroutine和channel的最佳实践。
Goroutine基础
创建goroutine
package main
import (
"fmt"
"time"
)
func printNumbers() {
for i := 1; i <= 5; i++ {
fmt.Printf("Number: %d
", i)
time.Sleep(250 * time.Millisecond)
}
}
func main() {
// 启动goroutine
go printNumbers()
// 主goroutine继续执行
for i := 1; i <= 3; i++ {
fmt.Printf("Main: %d
", i)
time.Sleep(500 * time.Millisecond)
}
// 等待goroutine完成
time.Sleep(2 * time.Second)
}等待goroutine完成
使用sync.WaitGroup等待多个goroutine完成:
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done() // 完成后递减计数器
fmt.Printf("Worker %d starting
", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done
", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1) // 增加计数器
go worker(i, &wg)
}
wg.Wait() // 等待所有goroutine完成
fmt.Println("All workers completed")
}Channel高级用法
Buffered Channel
package main
import "fmt"
func main() {
// 创建缓冲大小为3的channel
ch := make(chan int, 3)
// 发送数据(不会阻塞,因为缓冲区未满)
ch <- 1
ch <- 2
ch <- 3
// 接收数据
fmt.Println(<-ch) // 1
fmt.Println(<-ch) // 2
fmt.Println(<-ch) // 3
}Select语句
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "from ch1"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "from ch2"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println("Received:", msg1)
case msg2 := <-ch2:
fmt.Println("Received:", msg2)
case <-time.After(3 * time.Second):
fmt.Println("Timeout")
return
}
}
}实际项目案例
1. Web服务器并发处理
package main
import (
"fmt"
"net/http"
"sync"
)
type RequestCounter struct {
mu sync.Mutex
count int
}
func (rc *RequestCounter) Increment() {
rc.mu.Lock()
rc.count++
rc.mu.Unlock()
}
func (rc *RequestCounter) GetCount() int {
rc.mu.Lock()
defer rc.mu.Unlock()
return rc.count
}
func main() {
counter := &RequestCounter{}
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
counter.Increment()
fmt.Fprintf(w, "Hello, World! Total requests: %d", counter.GetCount())
})
fmt.Println("Server starting on :8080")
http.ListenAndServe(":8080", nil)
}2. 并发数据下载器
package main
import (
"fmt"
"io"
"net/http"
"sync"
)
func downloadFile(url string, wg *sync.WaitGroup, results chan<- string) {
defer wg.Done()
resp, err := http.Get(url)
if err != nil {
results <- fmt.Sprintf("Failed to download %s: %v", url, err)
return
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
results <- fmt.Sprintf("Failed to read response from %s: %v", url, err)
return
}
results <- fmt.Sprintf("Downloaded %s (%d bytes)", url, len(body))
}
func main() {
urls := []string{
"https://example.com/file1.txt",
"https://example.com/file2.txt",
"https://example.com/file3.txt",
}
var wg sync.WaitGroup
results := make(chan string, len(urls))
for _, url := range urls {
wg.Add(1)
go downloadFile(url, &wg, results)
}
wg.Wait()
close(results)
fmt.Println("Download results:")
for result := range results {
fmt.Println(result)
}
}3. 任务队列处理器
package main
import (
"fmt"
"sync"
"time"
)
type Task struct {
ID int
Name string
}
func worker(id int, tasks <-chan Task, wg *sync.WaitGroup) {
defer wg.Done()
for task := range tasks {
fmt.Printf("Worker %d processing task %d: %s
", id, task.ID, task.Name)
time.Sleep(500 * time.Millisecond) // 模拟处理时间
fmt.Printf("Worker %d completed task %d
", id, task.ID)
}
fmt.Printf("Worker %d shutting down
", id)
}
func main() {
numWorkers := 3
numTasks := 10
tasks := make(chan Task, numTasks)
var wg sync.WaitGroup
// 启动worker
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, tasks, &wg)
}
// 发送任务
for i := 1; i <= numTasks; i++ {
tasks <- Task{ID: i, Name: fmt.Sprintf("Task-%d", i)}
}
close(tasks)
wg.Wait()
fmt.Println("All tasks completed")
}最佳实践
1. 避免goroutine泄漏
// 不好的写法
func processData(data []string) {
for _, item := range data {
go func(item string) {
// 处理数据
fmt.Println(item)
}(item)
}
// goroutine可能泄漏
}
// 好的写法
func processDataSafely(data []string) {
var wg sync.WaitGroup
for _, item := range data {
wg.Add(1)
go func(item string) {
defer wg.Done()
fmt.Println(item)
}(item)
}
wg.Wait()
}2. 使用context控制goroutine生命周期
package main
import (
"context"
"fmt"
"time"
)
func worker(ctx context.Context, id int) {
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d: Shutting down
", id)
return
default:
fmt.Printf("Worker %d: Working...
", id)
time.Sleep(1 * time.Second)
}
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
go worker(ctx, 1)
go worker(ctx, 2)
time.Sleep(6 * time.Second)
}3. 合理使用channel缓冲
// 根据场景选择合适的缓冲区大小
func processItems(items []Item) {
// 小批量处理:缓冲大小为10
taskCh := make(chan Item, 10)
// 流式处理:无缓冲
resultCh := make(chan Result)
// 批量结果收集:缓冲大小为结果数量
batchCh := make(chan Batch, len(items)/10)
}性能调优
1. 减少锁竞争
// 不好的写法:频繁锁操作
type Counter struct {
mu sync.Mutex
count int
}
func (c *Counter) Increment() {
c.mu.Lock()
c.count++
c.mu.Unlock()
}
// 好的写法:批量操作
func (c *Counter) AddBatch(n int) {
c.mu.Lock()
c.count += n
c.mu.Unlock()
}2. 使用sync.Pool减少内存分配
var bufferPool = sync.Pool{
New: func() interface{} {
return make([]byte, 1024)
},
}
func getBuffer() []byte {
return bufferPool.Get().([]byte)
}
func putBuffer(buf []byte) {
bufferPool.Put(buf)
}常见问题与解决方案
1. Deadlock预防
// 避免死锁:保证锁的获取顺序一致
func transfer(a, b *Account, amount int) {
// 总是先锁ID小的账户
first, second := a, b
if a.ID > b.ID {
first, second = b, a
}
first.mu.Lock()
defer first.mu.Unlock()
second.mu.Lock()
defer second.mu.Unlock()
// 执行转账逻辑
}2. 资源限制
// 使用带缓冲的channel限制并发数
type Limiter struct {
tokens chan struct{}
}
func NewLimiter(n int) *Limiter {
return &Limiter{
tokens: make(chan struct{}, n),
}
}
func (l *Limiter) Acquire() {
l.tokens <- struct{}{}
}
func (l *Limiter) Release() {
<-l.tokens
}
func (l *Limiter) Do(f func()) {
l.Acquire()
defer l.Release()
f()
}总结
Go的并发编程虽然简单易用,但要写出高效、安全的并发代码仍需注意很多细节。通过合理使用goroutine、channel和sync包提供的工具,我们可以构建出高性能的并发应用程序。
记住以下原则:
- 明确goroutine的生命周期
- 合理使用channel进行通信
- 避免共享内存,通过通信共享内存
- 使用context管理goroutine
- 注意资源限制和性能调优
通过实践这些最佳实践,你将能够更好地利用Go的并发特性。
