本文摘录了许式伟 《Go,基于连接与组合的语言》部分内容,为了便于理解,我在其后端写了个完整的示例程序帮助理解,这篇文章 一是展示go在并行编程中的伟大,也是理解和学习闭包的活的教科书

正文

让我们从Unix谈起。Go语言与Unix、C语言有着极深的渊源。Go语言的领袖们参与甚至主导了Unix和C语言的设计。Ken Thompson 甚至算得上Unix和C语言的鼻祖。Go语言亦深受Unix和C语言的设计哲学影响。

在Unix世界里,组件就是应用程序(app),每个app可大体抽象为:

  • 输入:stdin(标准输入), params(命令行参数)
  • 输出:stdout(标准输出)
  • 协议:text (data stream)

不同的应用程序(app)如何连接?答案是:管道(pipeline)。在Unix世界中大家对这样的东西已经很熟悉了:

1
app1 params1 | app2 params2

通过管道(pipeline),可以将一个应用程序的输出(stdout)转换为另一个应用程序的输入(stdin)。更为神奇的一点,是这些应用程序是并行执行的。app1每产生一段输出,立即会被app2所处理。所以管道(pipeline)称得上是最古老,同时也是极其优秀的并行设施,简单而强大。

需要注意的是,Unix世界中不同应用程序直接是松散耦合的。上游app的输出是xml还是json,下游app需要知晓,但并无任何强制的约束。同一输出,不同的下游app,对协议的理解甚至都可能并不相同。例如,上游app输出一段xml文本,对于某个下游app来说,是一颗dom树,但对linecount程序来说只是一个多行的文本,对于英文单词词频统计程序来说,是一篇英文文章。

为了方便理解,我们先尝试在Go语言中模拟整个Unix的管道(pipeline)机制。首先是应用程序(app),我们抽象为:

1
func(in io.Reader, out io.Writer, args []string)

也就是说,Unix 中的

app1 params1 | app2 params2 对应Go语言中是:

1
pipe( bind(app1, params1), bind(app2, params2) )

其中,bind 函数实现如下:

1
2
3
4
5
6
7
8
9
func bind(
    app func(in io.Reader, out io.Writer, args []string),
    args []string
) func(in io.Reader, out io.Writer) {

    return func(in io.Reader, out io.Writer) {
        app(in, out, args)
    }
}

要理解bind函数,需要先理解“闭包”。Go语言中,应用程序以一个闭包的形式体现。如果你熟悉函数式编程,不难发现,这个bind函数其实就是所谓的柯里化(currying)。

pipe函数如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func pipe(
    app1 func(in io.Reader, out io.Writer),
    app2 func(in io.Reader, out io.Writer)
) func(in io.Reader, out io.Writer) {

    return func(in io.Reader, out io.Writer) {
        pr, pw := io.Pipe()
        defer pw.Close()
        go func() {
            defer pr.Close()
            app2(pr, out)
        }()
        app1(in, pw)
    }
}

要理解pipe函数,除了“闭包”外,需要知晓defer关键字和goroutine(go关键字)。defer语句会在函数退出时执行(无论是否发生了异常),通常用于资源的清理操作(比如关闭文件句柄等)。有了defer语句,Go语言中的错误处理代码显得非常优雅。在一个正常的函数调用前加上go关键字,就会使得该函数在新的goroutine中并行执行。理解了这些背景,这个pipe函数不难理解,无非是:先创建一个管道,让app1读入数据(in),并向管道的写入端(pw)输出,启动一个新goroutine,让app2从管道的读入端读取数据,并将处理结果输出(out)。这样得到的app就是app1和app2的组合了。

你甚至可以对多个app进行组合:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func pipe(apps ...func(in io.Reader, out io.Writer)) func(in io.Reader, out io.Writer) {

    if len(apps) == 0 { return nil }
    app := apps[0]
    for i := 1; i < len(apps); i++ {
        app1, app2 := app, apps[i]
        app = func(in io.Reader, out io.Writer) {
            pr, pw := io.Pipe()
            defer pw.Close()
            go func() {
                defer pr.Close()
                app2(pr, out)
            }()
            app1(in, pw)
        }
    }
    return app
}

我们举个比较实际的例子,假设我们有2个应用程序tar(打包)、gzip(压缩):

1
2
func tar(io.Reader, out io.Writer, files []string)
func gzip(in io.Reader, out io.Writer)

那么打包并压缩的代码是:

1
pipe( bind(tar, files), gzip )(nil, out)

通过对管道(pipeline)的模拟我们可以看出,Go语言对并行支持是非常强大的,这主要得益于Go的轻量级进程(goroutine)。

实例程序,帮助理解管道:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package main

import (
	"io"
	"os"
	"bufio"
	"bytes"
	"fmt"
	"strconv"
)

//bind函数主要是用来为pipe函数整合用的,通过将闭包将函数签名变成pipe所需的样子
//返回一个函数闭包,将一个函数字面量app和字符串slice 传入其中
func bind(app func(in io.Reader, out io.Writer, args []string), args []string) func(in io.Reader, out io.Writer) {
	return func(in io.Reader, out io.Writer) {
		app(in, out, args)
	}
}

//将两个函数插入到管道的中间,调用者只需调用pipe返回的函数字面量,并传入管道的首尾两端,即可实现管道
//返回一个新的函数闭包
func pipe(app1 func(in io.Reader, out io.Writer), app2 func(in io.Reader, out io.Writer)) func(in io.Reader, out io.Writer) {
	return func(in io.Reader, out io.Writer) {
		pr, pw := io.Pipe()
		defer pw.Close()
		go func() {
			defer pr.Close()
			app2(pr, out)
		}()
		app1(in, pw)
	}
}

//读取args slice的每个字符串,将其作为文件名,读取文件,并在文件的每一行首部加上行号,写入到out中
//此处in没有使用到,主要是为了保证管道定义的一致性
func app1(in io.Reader, out io.Writer, args []string) {
	for _, v := range args {
		//fmt.Println(v)
		file, err := os.Open(v)
		if err != nil {
			continue
		}
		defer file.Close()
		buf := bufio.NewReader(file)
		for i:=1; ;i++{
			line, err := buf.ReadBytes('\n')
			if err != nil {
				break
			}
			linenum := strconv.Itoa(i)
			nline := []byte(linenum + " ")
			nline = append(nline, line...)
			out.Write(nline)
		}
	}
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//app2 主要是将字节流转化为大写,中文可能会有点问题,不过主要是演示用,重在理解思想
//read from in, convert byte to Upper ,write the result to out
func app2(in io.Reader, out io.Writer) {
	rd := bufio.NewReader(in)
	p := make([]byte, 10)
	for {
		n, _ := rd.Read(p)
		if n == 0 {
			break
		}
		t := bytes.ToUpper(p[:n])
		out.Write(t)
	}
}

func main() {
	args := os.Args[1:]
	for _, v := range args {
		fmt.Println(v)
	}
    p := pipe(bind(app1, args), app2)
	p(os.Stdin, os.Stdout)
}

参考

文章原文-golang 并发设计模式(二)–管道模式1 Go,基于连接与组合的语言(上)