golang中channel+error来做异步错误处理有多香

软件发布|下载排行|最新软件

当前位置:首页IT学院IT技术

golang中channel+error来做异步错误处理有多香

文大侠666   2023-02-03 我要评论

官方推荐golang中错误处理当做值处理, 既然是值那就可以在channel中传输,本文带你看看golang中channel+error来做异步错误处理有多香,看完本文还会觉得golang的错误处理相比java try catch一点优势都没有吗?

场景

如下,一次任务起多个协程异步处理任务,比如同时做服务/redis/mysql/kafka初始化,当某一个协程出现错误(初始化失败)时,程序是停止还是继续呢?如何记录错误?如何控制优雅的退出全部工作协程呢?

image.png

为了解决类似的问题,常见如下三种解决方案:

1.中断退出并记录日志

如下,最简单粗暴的方式就是,一旦协程中发生错误,记录日志立即退出,外层如果想取到错误可以通过共享全局变量,单个协程无法控制所有协程动作。

// 出错中断协程,打印日志,退出

func TestSimpleExit(t *testing.T) {
	wg := sync.WaitGroup{}

	wg.Add(1)
	go func() {
		defer wg.Done()

		//do something
		if err := doSomething(); err != nil {
			t.Logf("Error when call doSomething:%v\n", err)
			return
		}
	}()

	wg.Wait()
}

2.监控error,可选记录日志或退出

前面讲过,既然error是值,那就可以在channel中传输,可以单独开一个channel,所有协程错误都发送到这个通道。

	// 数据处理流程
	dataFunc := func(ctx context.Context, dataChan chan int, errChan chan error) {
		defer wg.Done()

		for {
			select {
			case v, ok := <-dataChan:
				if !ok {
					log.Println("Receive data channel close msg!")
					return
				}

				if err := doSomething2(v); err != nil {
					errChan <- err
					continue
				}

				// do ...

			case <-ctx.Done():
				log.Println("Receive exit msg!")
				return
			}
		}
	}

	wg.Add(1)
	go dataFunc(ctx, dataChan, errChan)

	wg.Add(1)
	go dataFunc(ctx, dataChan, errChan)

监控错误error通道,统一记录和退出,一旦检测到错误可以通过ctx通知所有协程退出,这里可以灵活控制监控到错误时的错误处理策略(是否记录日志/是否退出等),error通道可以同步或异步处理
整体流程如下

image.png

异步监控error

	// 错误处理流程,error处理通道异步等待
	wg.Add(1)
	go func(errChan chan error) {
		defer wg.Done()

		for {
			select {
			case v, ok := <-errChan:
				if !ok {
					log.Println("Receice err channel close msg!")
					return
				}

				// 收到错误时,可选择记录日志或退出
				if v != nil {
					t.Logf("Error when call doSomething:%v\n", v)
					cancel() // 通知全部退出
					return
				}

			case <-ctx.Done():
				log.Println("Receive exit msg!")
				return
			}
		}

	}(errChan)

	dataChan <- 1
	wg.Wait()

同步监控error

	// 错误处理流程,error处理通道同步等待
	for {
		select {
		case v, ok := <-errChan:
			if !ok {
				log.Println("Receice err channel close msg!")
				goto EXIT
			}

			// 收到错误时,可选择记录日志或退出
			if v != nil {
				t.Logf("Error when call doSomething:%v\n", v)
				cancel()
				goto EXIT
			}

		case <-ctx.Done():
			log.Println("Receive exit msg!")
			goto EXIT
		}
	}

EXIT:
	wg.Wait()

3.官方库errgroup

考虑到error输出到通道后统一处理是golang常用手段,官方也针对封装了一个error处理包,errgroup顾名思义,多个协程的error被当做一个组,一旦某个协程出错所有协程都退出,只输出第一个error

func TestSimpleChannel5(t *testing.T) {
	eg, ctx := errgroup.WithContext(context.Background())

	dataChan := make(chan int)
	defer close(dataChan)

	// 数据处理流程
	dataFunc := func() error {
		for {
			select {
			case v, ok := <-dataChan:
				if !ok {
					log.Println("Receive data channel close msg!")
					return nil
				}

				if err := doSomething2(v); err != nil {
					return err
				}

				// do ...

			// 增加ctx通知完成
			case <-ctx.Done():
				log.Println("Receive exit msg!")
				return nil
			}
		}
	}

	eg.Go(dataFunc)
	eg.Go(dataFunc)
	eg.Go(dataFunc)

	dataChan <- 1

	// 错误处理流程,任何一个协程出现error,则会调用ctx对应cancel函数,所有相关协程都会退出
	if err := eg.Wait(); err != nil {
		fmt.Printf("Something is wrong->%v\n", err)
	}
}

类似上一小节,可以看到errgroup就是结合waitgroup cancel和channel通道封装的

4.监控error,全部日志合并后输出

同样是上述场景,有时候我们的需求是返回所有的错误(不是第一个错误)。

  • 比如在服务启动时,对 redis、kafka、mysql 等各种资源初始化场景,可以把所有相关资源初始化的错误都返回,展示给用户统一排查。
  • 另一种场景就是在 web 请求中,校验请求参数时,返回所有参数的校验错误给客户端的场景。

这种需求,一般考虑使用多错误管理(hashicorp/go-multierror库),如下一个简答同步任务演示多错误管理,所有返回的错误可以通过Append归并成一个错误,实际上是通过error wrap的方式合并起来的,因此也可以使用Is/As判断嵌套error。

// 多路协程error合并,用于多路check场景
func TestSimpleChannel3(t *testing.T) {

	// 同步执行多个任务,返回error合并
	var err = func() error {
		var result error

		if err := doSomething(); err != nil {
			result = multierror.Append(result, err)
		}

		if err := doSomething2(nil); err != nil {
			result = multierror.Append(result, err)
		}

		return result
	}()

	// 打印输出
	if err != nil {
		fmt.Printf("%v\n", err)
	}

	// 获取错误列表
	if err != nil {
		if merr, ok := err.(*multierror.Error); ok {
			fmt.Printf("%v\n", merr.Errors)
		}
	}

	// 判断是否为某种类型
	if err != nil && errors.Is(err, Error1) {
		fmt.Println("Errors contain error 1")
	}

	// 判断是否其中一个error能够转换成指定error
	var e MyError
	if err != nil && errors.As(err, &e) {
		fmt.Println("One Error can be convert to nyerror")
	}
}

那么,在起多个异步任务时,就可以如下处理,返回的多个error通过channel消费合并展示。

func TestSimpleChannel4(t *testing.T) {
	wg := sync.WaitGroup{}

	taskNum := 10
	errChan := make(chan error, taskNum)

	// 异步执行多个任务
	step := func(stepNum int, errChan chan error) {
		defer wg.Done()
		errChan <- fmt.Errorf("step %d error", stepNum)
	}

	for i := 0; i < taskNum; i++ {
		wg.Add(1)
		go step(i, errChan)
	}

	// 等待任务完成
	go func() {
		wg.Wait()
		close(errChan)
	}()

	// err通道阻塞等待,可能的所有错误合并
	var result *multierror.Error
	for err := range errChan {
		result = multierror.Append(result, err)
	}

	// 出现一个错误时,选择记录日志或退出
	if len(result.Errors) != 0 {
		log.Println(result.Errors)
	}
}

参考文献

演示代码 https://gitee.com/wenzhou1219/go-in-prod/tree/master/error_group

errgroup源码解析 https://zhuanlan.zhihu.com/p/416054707
errgroup使用参考 https://zhuanlan.zhihu.com/p/338999914

go-multierror使用参考 https://zhuanlan.zhihu.com/p/581030231

Copyright 2022 版权所有 软件发布 访问手机版

声明:所有软件和文章来自软件开发商或者作者 如有异议 请与本站联系 联系我们