一次slice bug



一次slice bug

bug源:今天重构前同事的遗留代码,在review过程中,有段代码看起来有些别扭,有几分线程不安全的味道
bug描述:多个协程往同一个slice里追加操作需要开发者保证线程安全
bug修复: 详见代码


bug源
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 简化版bug复现
var arr []string
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
str := getStr()
arr = append(arr, str)
}()
}
wg.Wait()
calculate(arr)


疑问:slice的append操作是否是线程安全的?

结论:append操作并不能保证线程安全

我们看一看下面的两个例子(例子转自studygolang 仅为了证明结论):

  • 例1:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    package main

    import (
    "sync"
    "testing"
    )

    func TestAppend(t *testing.T) {
    x := []string{"start"}

    wg := sync.WaitGroup{}
    wg.Add(2)
    go func() {
    defer wg.Done()
    y := append(x, "hello", "world")
    t.Log(cap(y), len(y))
    }()
    go func() {
    defer wg.Done()
    z := append(x, "goodbye", "bob")
    t.Log(cap(z), len(z))
    }()
    wg.Wait()
    }
  • 例2:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    package main

    import (
    "testing"
    "sync"
    )

    func TestAppend(t *testing.T) {
    //以给这个名为 x 的 slice 在创建是预留一些容量.这是较例1唯一改动的地方
    x := make([]string, 0, 6)

    wg := sync.WaitGroup{}
    wg.Add(2)
    go func() {
    defer wg.Done()
    y := append(x, "hello", "world")
    t.Log(len(y))
    }()
    go func() {
    defer wg.Done()
    z := append(x, "goodbye", "bob")
    t.Log(len(z))
    }()
    wg.Wait()
    }
    如果我们执行这个测试时带上 -race,发现出现了DATA RACE

    解释为什么测试失败

理解为什么这个失败会发生,请看看这个旧例子的 x 的内存布局

x 没有足够的容量进行修改

Go 语言发现没有足够的内存空间来存储 hello,world 和 goodbye, bob,

于是分配的新的内存给 yz

数据竞争不会在多进程读取内存时发生,x 没有被修改。

这里没有冲突,也就没有竞争。

zy 获取新的内存空间

在新的代码里,事情不一样了

x 有更多的容量

在这里,go 注意到有足够的内存存放 hello, world
另一个协程也发现有足够的空间存放 goodbye, bob
这个竞争的发生是因为这两个协程都尝试往同一个内存空间写入,
谁也不知道谁是赢家。

这是 Go 语言的一个特性而非 bug ,append 不会强制每一次调用它都申请新的内存。
它允许用户在循环内进行 append 操作时不会破坏垃圾回收机制。
缺点是你必须清楚知道在多个协程对 slice 的操作。

疑问:slice的append操作如何保证线程安全?

最简单的解决方法是不使用共享状态的第一个变量来进行 append 。
相反,根据你的需要来 make 一个新的 slice ,
使用这个新的 slice 作为 append 的第一个变量。
下面是失败的测试示例的修正版,这里的替代方法是使用 copy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

package main

import (
"sync"
"testing"
)

func TestAppend(t *testing.T) {
x := make([]string, 0, 6)
x = append(x, "start")

wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
y := make([]string, 0, len(x)+2)
y = append(y, x...)
y = append(y, "hello", "world")
t.Log(cap(y), len(y), y[0])
}()
go func() {
defer wg.Done()
z := make([]string, 0, len(x)+2)
z = append(z, x...)
z = append(z, "goodbye", "bob")
t.Log(cap(z), len(z), z[0])
}()
wg.Wait()
}

疑问:文章开头的问题该如何解决呢?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 简化版bug复现
var arr []string
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
str := getStr()
arr = append(arr, str)
}()
}
wg.Wait()
calculate(arr)

解决方案:引入channel机制,将消费者生产者解耦

  • 解决方案1:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
       ch := make(chan string, n)
    var arr []string
    go func() {
    for i := range ch {
    arr = append(arr, i)
    }
    }()
    var wg sync.WaitGroup
    wg.Add(10)
    for i := 0; i < 10; i++ {
    go func() {
    defer wg.Done()
    str := getStr()
    ch <- str
    }()
    }
    wg.Wait()
    close(ch)
    calculate(arr)

    问题解决了吗?
    slice的append操作的线程安全有保障了,但是又引入了新问题:

    如何保证传递给calculate()arr取尽了ch呢?

  • 终极方案

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
       ch := make(chan string, n)
    var arr []string
    var wg0 sync.WaitGroup
    go func() {
    wg0.Add(1)
    for i := range ch {
    arr = append(arr, i)
    }
    wg0.Done()
    }()
    var wg sync.WaitGroup
    wg.Add(10)
    for i := 0; i < 10; i++ {
    go func() {
    defer wg.Done()
    str := getStr()
    ch <- str
    }()
    }
    wg.Wait()
    close(ch)
    wg0.Wait()
    calculate(arr)

    wg保证str都发送到了ch
    wg0保证ch都被取出放到了arr