Dark Dwarf Blog background

Go 并发模式

Go 并发模式

在写连接池的时候遇到了一些并发的问题,在这里记录一下。

1. 具体情景

假设我们有一个管理 Docker 连接的连接池,我们需要检查池子是否需要补充新的容器。下面是一个最简单的实现:

func (p *Pool) maintainPool() {
  p.mu.Lock()
  currentCount := len(p.idleContainers)
  needed := p.config.MinIdle - currentCount
  p.mu.Unlock()

  if needed <= 0 {
    return
  }

  var wg sync.WaitGroup
  for range needed {
    wg.Add(1)
    go func() {
      defer wg.Done()
      ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
      defer cancel()

      container, err := p.createWarmContainer(ctx, "warmup", "warmup")
      if err != nil {
        p.logger.Error("Failed to replenish pool", "error", err)
        return
      }
      p.mu.Lock()
      p.idleContainers = append(p.idleContainers, container)
      p.mu.Unlock()
    }()
  }

  wg.Wait()
}

这个实现被下面这个 worker 调用:

func (p *Pool) worker() {
  ticker := time.NewTicker(p.config.HealthCheckInterval)
  defer ticker.Stop()
  for {
    select {
    case <-p.stopCh:
      return

    case <-ticker.C:
      p.maintainPool()
    }
  }
}

原有的实现有这样的问题:有时我们需要并发创建大量 Docker 容器,当前的实现会直接对 Docker 发送所有请求,如果 Docker 负载过大或者挂掉了,这些请求就会失败并输出很多错误日志。然后再下一次 tick,大量的并发请求又被发起,由于只隔了很短的时间,这些请求很可能又会错误、又打了很多错误日志。最终连续的错误访问和大量日志会导致系统崩溃。

解决这个问题也很简单。我们:

  1. 限制一次发送请求的数量。
  2. 如果多次请求异常,直接使用熔断器模式进行熔断,等到系统恢复到可用状态时再发起请求。

2. 限流器实现

我们使用下面的 channel 作为计数器:

sem := make(chan struct{}, 3)
sem <- struct{}{}
...
defer func() { <-sem }()

sem 满时会阻塞,从而限制 goroutine 数量。

3. 熔断器实现

在系统连续失败后,我们标记系统处于冷却期、不做任何事情:

if time.Now().Before(p.cooldownUntil) {
    return
}

冷静期通过对失败次数的计数得到:

newCount := atomic.AddInt32(&failureCount, 1)
if newCount >= 3 {
    p.cooldownUntil = time.Now().Add(1 * time.Minute)
}

4. 完整实现

func (p *Pool) maintainPool() {
  p.mu.Lock()
  if time.Now().Before(p.cooldownUntil) {
    p.mu.Unlock()
    return
  }

  needed := p.config.MinIdle - len(p.idleContainers)
  p.mu.Unlock()

  if needed <= 0 {
    return
  }

  // 限制一次最多发送 3 个请求
  sem := make(chan struct{}, 3)
  var wg sync.WaitGroup

  // 熔断器计数
  var failureCount int32 = 1

  for range needed {
    sem <- struct{}{}
    wg.Go(func() {
      defer func() { <-sem }()
      ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
      defer cancel()

      container, err := p.createWarmContainer(ctx, "warmup", "warmup")
      if err != nil {
        p.logger.Error("Failed to replenish pool", "error", err)
        newCount := atomic.AddInt32(&failureCount, 1)
        if newCount >= 3 {
          p.cooldownUntil = time.Now().Add(1 * time.Minute)
        }
        return
      }

      // 二次校验
      p.mu.Lock()
      if len(p.idleContainers) < p.config.MinIdle {
        p.idleContainers = append(p.idleContainers, container)
      }
      p.mu.Unlock()
    })
  }

  wg.Wait()
}