Go 并发模式
在写连接池的时候遇到了一些并发的问题,在这里记录一下。
1. 具体情景
假设我们有一个管理 Docker 连接的连接池,我们需要检查池子是否需要补充新的容器。下面是一个最简单的实现:
func (p *Pool) maintainPool() {
p.mu.Lock()
currentCount := len(p.idleContainers)
needed := p.config.MinIdle - currentCount
p.mu.Unlock()
if needed <= 0 {
return
}
var wg sync.WaitGroup
for range needed {
wg.Add(1)
go func() {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
container, err := p.createWarmContainer(ctx, "warmup", "warmup")
if err != nil {
p.logger.Error("Failed to replenish pool", "error", err)
return
}
p.mu.Lock()
p.idleContainers = append(p.idleContainers, container)
p.mu.Unlock()
}()
}
wg.Wait()
}
这个实现被下面这个 worker 调用:
func (p *Pool) worker() {
ticker := time.NewTicker(p.config.HealthCheckInterval)
defer ticker.Stop()
for {
select {
case <-p.stopCh:
return
case <-ticker.C:
p.maintainPool()
}
}
}
原有的实现有这样的问题:有时我们需要并发创建大量 Docker 容器,当前的实现会直接对 Docker 发送所有请求,如果 Docker 负载过大或者挂掉了,这些请求就会失败并输出很多错误日志。然后再下一次 tick,大量的并发请求又被发起,由于只隔了很短的时间,这些请求很可能又会错误、又打了很多错误日志。最终连续的错误访问和大量日志会导致系统崩溃。
解决这个问题也很简单。我们:
- 限制一次发送请求的数量。
- 如果多次请求异常,直接使用熔断器模式进行熔断,等到系统恢复到可用状态时再发起请求。
2. 限流器实现
我们使用下面的 channel 作为计数器:
sem := make(chan struct{}, 3)
sem <- struct{}{}
...
defer func() { <-sem }()
sem 满时会阻塞,从而限制 goroutine 数量。
3. 熔断器实现
在系统连续失败后,我们标记系统处于冷却期、不做任何事情:
if time.Now().Before(p.cooldownUntil) {
return
}
冷静期通过对失败次数的计数得到:
newCount := atomic.AddInt32(&failureCount, 1)
if newCount >= 3 {
p.cooldownUntil = time.Now().Add(1 * time.Minute)
}
4. 完整实现
func (p *Pool) maintainPool() {
p.mu.Lock()
if time.Now().Before(p.cooldownUntil) {
p.mu.Unlock()
return
}
needed := p.config.MinIdle - len(p.idleContainers)
p.mu.Unlock()
if needed <= 0 {
return
}
// 限制一次最多发送 3 个请求
sem := make(chan struct{}, 3)
var wg sync.WaitGroup
// 熔断器计数
var failureCount int32 = 1
for range needed {
sem <- struct{}{}
wg.Go(func() {
defer func() { <-sem }()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
container, err := p.createWarmContainer(ctx, "warmup", "warmup")
if err != nil {
p.logger.Error("Failed to replenish pool", "error", err)
newCount := atomic.AddInt32(&failureCount, 1)
if newCount >= 3 {
p.cooldownUntil = time.Now().Add(1 * time.Minute)
}
return
}
// 二次校验
p.mu.Lock()
if len(p.idleContainers) < p.config.MinIdle {
p.idleContainers = append(p.idleContainers, container)
}
p.mu.Unlock()
})
}
wg.Wait()
}