【在主画面加入捷径】
       
【选择语系】
繁中 简中

[Golang] 程序设计教学:撰写共时性 (Concurrency) 程序

【赞助商连结】

    由于 CPU 的时脉已经到物理上限,现在的硬件都往多核心、多 CPU 发展。同样地,单一的大型服务器相当昂贵,而且扩充量有限,使用多台主机组成的丛集 (cluster) 则相对易于扩充。然而,若程序代码没有使用共时性 (concurrency) 的特性来撰写,则无法真正发挥平行处理 (parallel computing) 所带来的效能提升。

    Go 主要的特色之一,就在于其对共时性程序的支援;大部分程序语言以函式库来支援共时性程序,但 Go 将其内建在语法中。Go 的并时性程序有两种,一种是以 CSP (communicating sequential processes) 模型的并时性程序,一种是传统的多线程 (multi-thread) 程序。由于 Go 将 CSP 模型内建在语法中,通常建议使用这些内建功能来写共时性程序。

    goroutine 是轻量级线程 (lightweight thread)

    大部分的程序语言,像是 C++ 或 Java 等,以线程 (thread) 做为并进程式的单位。Go 程序以 goroutine 做为并行执行的程序代码区块,goroutine 类似于线程,但更轻量,一次启动数百甚至数千个以上的 goroutine 也不会占用太多内存。要使用 goroutine,在函式前加上 go 关键字即可。以下为使用 goroutine 的实例:

    package main
     
    import (
        "log"
        "os"
        "sync"
    )
     
    func main() {
        // A goroutine-safe console printer.
        logger := log.New(os.Stdout, "", 0)
     
        // Sync between goroutines.
        var wg sync.WaitGroup
     
        // Add goroutine 1.
        wg.Add(1)
        go func() {
            defer wg.Done()
            logger.Println("Print from goroutine 1")
        }()
     
        // Add goroutine 2.
        wg.Add(1)
        go func() {
            defer wg.Done()
            logger.Println("Print from goroutine 2")
        }()
     
        // Add goroutine 3.
        wg.Add(1)
        go func() {
            defer wg.Done()
            logger.Println("Print from goroutine 3")
        }()
     
        logger.Println("Print from main")
     
        // Wait all goroutines.
        wg.Wait()
    }

    在本例中,我们建立了三个 goroutine。由于 goroutine 和主程序并时执行,若我们没有使用 WaitGroup 将程序同步化,本程序在主程序结束时即提早结束,因此,我们声明 wg 变量,在程序尾端等待所有 goroutine 执行结束。

    如果读者多执行几次本程序,会发现每次印出字串的顺序不同。并时性程序和传统的循序式程序的思维不太一样,执行并时性程序时无法保证程序运行的先后顺序,需注意。

    利用 channel 在 goroutine 间传递数据

    上述的 goroutine 内的数据是各自独立的,而 Go 用 channel 在不同并进程式间传递数据。如下例:

    package main
     
    import "fmt"
     
    func main() {
        // Create a channel
        message := make(chan string)
     
        // Init a goroutine.
        go func() {
            // Send some data into the channel.
            message <- "Hello from channel"
        }()
     
        // Receive the data from the channel.
        msg := <-message
        fmt.Println(msg)
    }

    由于信道在传输时,会阻塞 (blocking) 程序的行进,在此处,我们不需要另外设置 WaitGroup。

    设置固定大小的 buffered channel

    前述的 channel 是无缓冲的。我们也可以设置有缓冲的 (buffered) channel,buffered channel 有固定的大小,这样就不需等待其他的 goroutine,可以直接传送数据。

    package main
     
    import (
        "log"
        "os"
        "sync"
    )
     
    func main() {
        // A goroutine-safe console printer.
        logger := log.New(os.Stdout, "", 0)
     
        // Sync among all goroutines.
        var wg sync.WaitGroup
     
        // Make a buffered channel.
        ch := make(chan int, 10)
     
        for i := 1; i <= 10; i++ {
            ch <- i
            wg.Add(1)
            go func() {
                defer wg.Done()
                logger.Println("Print from goroutine ", <-ch)
            }()
        }
     
        logger.Println("Print from main")
        wg.Wait()
    }

    指定 channel 的方向

    我们在设置 channel 时,可指定其方向,如下例:

    package main
     
    import "fmt"
     
    func ping(pings chan<- string, msg string) {
        pings <- msg
    }
     
    func pong(pings <-chan string, pongs chan<- string) {
        msg := <-pings
        pongs <- msg
    }
     
    func main() {
        pings := make(chan string, 1)
        pongs := make(chan string, 1)
        ping(pings, "passed message")
        pong(pings, pongs)
        fmt.Println(<-pongs)
    }

    关闭 channel

    若不用 channel 时,可用 close函数将 channel 关闭,如下例:

    package main
     
    import "fmt"
     
    func main() {
        ch := make(chan int, 4)
        ch <- 2
        ch <- 4
        close(ch)
        // ch <- 6 // panic, send on closed channel
     
        fmt.Println(<-ch)
        fmt.Println(<-ch)
        fmt.Println(<-ch) // closed, returns zero value for element
    }
    

    使用 select 叙述在多个 channel 间做选择

    透过 select,我们可以在多个 channel 中做选择。如下例:

    package main
     
    import "time"
    import "fmt"
     
    func main() {
        // For our example we'll select across two channels.
        c1 := make(chan string)
        c2 := make(chan string)
     
        // Each channel will receive a value after some amount
        // of time, to simulate e.g. blocking RPC operations
        // executing in concurrent goroutines.
        go func() {
            time.Sleep(time.Second * 1)
            c1 <- "one"
        }()
        go func() {
            time.Sleep(time.Second * 1)
            c2 <- "two"
        }()
     
        // We'll use `select` to await both of these values
        // simultaneously, printing each one as it arrives.
        for i := 0; i < 2; i++ {
            select {
            case msg1 := <-c1:
                fmt.Println("received", msg1)
            case msg2 := <-c2:
                fmt.Println("received", msg2)
            }
        }
    }
    

    利用 channel 撰写 generator

    利用 channel 可以撰写共时执行的 generator,如下例:

    package main
     
    import (
        "fmt"
        "strings"
    )
     
    func main() {
        data := []string{
            "The yellow fish swims slowly in the water",
            "The brown dog barks loudly after a drink from its water bowl",
            "The dark bird of prey lands on a small tree after hunting for fish",
        }
     
        histogram := make(map[string]int)
        wordsCh := make(chan string)
     
        go func() {
            defer close(wordsCh)
     
            for _, line := range data {
                words := strings.Split(line, " ")
     
                for _, word := range words {
                    word = strings.ToLower(word)
                    wordsCh <- word
                }
            }
        }()
     
        for {
            word, opened := <- wordsCh
            if !opened {
                break
            }
            histogram[word]++
        }
     
        for k, v := range histogram {
            fmt.Println(fmt.Sprintf("%s\t(%d)", k, v))
        }
    }
    

    利用 mutex 将共时性程序同步化

    除了前述的 goroutine 和 channel 外,Go 也提供较传统的 Mutex。在共时性程序中,mutex 会将某一段程序暂时锁住,避免两个共时程序竞争同一块数据。以下范例节录自一个假想的向量类:

    package vector
     
    import (
        "sync"
    )
     
    type IVector interface {
        Len() int
        GetAt(int) float64
        SetAt(int, float64)
    }
     
    type Vector struct {
        sync.RWMutex
        vec []float64
    }
     
    func New(args ...float64) IVector {
        v := new(Vector)
        v.vec = make([]float64, len(args))
     
        for i, e := range args {
            v.SetAt(i, e)
     
        }
     
        return v
    }
     
    // The length of the vector
    func (v *Vector) Len() int {
        return len(v.vec)
    }
     
    // Getter
    func (v *Vector) GetAt(i int) float64 {
        if i < 0 || i >= v.Len() {
            panic("Index out of range")
        }
     
        return v.vec[i]
    }
     
    // Setter
    func (v *Vector) SetAt(i int, data float64) {
        if i < 0 || i >= v.Len() {
            panic("Index out of range")
        }
     
        v.Lock()
        v.vec[i] = data
        v.Unlock()
    }
     
    / Vector algebra delegating to function object.
    // This method delegates vector algebra to function object set by users, making
    // it faster then these methods relying on reflection.
    func Apply(v1 IVector, v2 IVector, f func(float64, float64) float64) IVector {
        _len := v1.Len()
     
        if !(_len == v2.Len()) {
            panic("Unequal vector size")
        }
     
        out := WithSize(_len)
     
        var wg sync.WaitGroup
     
        for i := 0; i < _len; i++ {
            wg.Add(1)
     
            go func(v1 IVector, v2 IVector, out IVector, f func(float64, float64) float64, i int) {
                defer wg.Done()
     
                out.SetAt(i, f(v1.GetAt(i), v2.GetAt(i)))
            }(v1, v2, out, f, i)
        }
     
        wg.Wait()
     
        return out
    }
    

    当我们要将数据存入内部的 float64 切片时,透过 Mutex 将切片暂时锁住,避免多个程序同时存取切片。在此情形外,数据都是各自独立的,所以,可以开启多个 goroutine 进行并行运算。

    【赞助商连结】