gRPC

服务注册与发现

go-doudou支持两种服务注册与发现机制:etcdnacos。REST服务注册在注册中心的服务名称会自动加上 _rest 后缀,gRPC服务注册在注册中心的服务名称会自动加上 _grpc,以作区分。

提示

etcdnacos两种机制可以在一个服务中同时使用

GDD_SERVICE_DISCOVERY_MODE=etcd,nacos
1

Etcd

go-doudou从v2版本起内建支持使用etcd作为注册中心,实现服务注册与发现。需配置如下环境变量:

  • GDD_SERVICE_NAME: 服务名称,必须
  • GDD_SERVICE_DISCOVERY_MODE: 服务注册与发现机制名称,etcd,必须
  • GDD_ETCD_ENDPOINTS: etcd连接地址,必须
GDD_SERVICE_NAME=grpcdemo-server
GDD_SERVICE_DISCOVERY_MODE=etcd
GDD_ETCD_ENDPOINTS=localhost:2379
1
2
3

Nacos

go-doudou内建支持使用阿里开发的Nacos作为注册中心,实现服务注册与发现。需配置如下环境变量:

  • GDD_SERVICE_NAME: 服务名称,必须
  • GDD_NACOS_SERVER_ADDR: Nacos服务端地址
  • GDD_SERVICE_DISCOVERY_MODE: 服务发现机制名称
GDD_SERVICE_NAME=test-svc # Required
GDD_NACOS_SERVER_ADDR=http://localhost:8848/nacos # Required
GDD_SERVICE_DISCOVERY_MODE=nacos # Required
1
2
3

Zookeeper

go-doudou内建支持使用Zookeeper作为注册中心,实现服务注册与发现。需配置如下环境变量:

  • GDD_SERVICE_NAME: 服务名称,必须
  • GDD_SERVICE_DISCOVERY_MODE: 服务发现机制名称,必须
  • GDD_ZK_SERVERS: Nacos服务端地址,必须
GDD_SERVICE_NAME=cloud.unionj.ServiceB # Required
GDD_SERVICE_DISCOVERY_MODE=zk # Required
GDD_ZK_SERVERS=localhost:2181 # Required
GDD_ZK_DIRECTORY_PATTERN=/dubbo/%s/providers
GDD_SERVICE_GROUP=group
GDD_SERVICE_VERSION=v2.2.2
1
2
3
4
5
6

客户端负载均衡

简单轮询负载均衡 (Etcd用)

需调用 etcd.NewRRGrpcClientConn("注册在etcd中的服务名称", tlsOption) 创建 *grpc.ClientConn 实例。

func main() {
  // 程序退出前需要关闭etcd客户端
	defer etcd.CloseEtcdClient()
	conf := config.LoadFromEnv()

	tlsOption := grpc.WithTransportCredentials(insecure.NewCredentials())
  // 创建支持etcd简单轮询负载均衡机制的gRPC连接
	grpcConn := etcd.NewRRGrpcClientConn("grpcdemo-server_grpc", tlsOption)
  // 程序退出前需要关闭gRPC连接
	defer grpcConn.Close()

	svc := service.NewEnumDemo(conf, pb.NewHelloworldServiceClient(grpcConn),
		client.NewHelloworldClient(ddclient.WithClient(newClient()), ddclient.WithProvider(restProvider)))
	handler := httpsrv.NewEnumDemoHandler(svc)
	srv := rest.NewRestServer()
	srv.AddRoute(httpsrv.Routes(handler)...)
	srv.Run()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

平滑加权轮询负载均衡 (Etcd用)

需调用 etcd.NewSWRRGrpcClientConn("注册在etcd中的服务名称", tlsOption) 创建 *grpc.ClientConn 实例。

func main() {
  // 程序退出前需要关闭etcd客户端
	defer etcd.CloseEtcdClient()
	conf := config.LoadFromEnv()

	tlsOption := grpc.WithTransportCredentials(insecure.NewCredentials())
  // 创建支持etcd平滑加权轮询负载均衡机制(SWRR)的gRPC连接
	grpcConn := etcd.NewSWRRGrpcClientConn("grpcdemo-server_grpc", tlsOption)
  // 程序退出前需要关闭gRPC连接
	defer grpcConn.Close()

	svc := service.NewEnumDemo(conf, pb.NewHelloworldServiceClient(grpcConn))
	handler := httpsrv.NewEnumDemoHandler(svc)
	srv := rest.NewRestServer()
	srv.AddRoute(httpsrv.Routes(handler)...)
	srv.Run()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

简单轮询负载均衡 (nacos用)

调用 nacos.NewRRGrpcClientConn 方法,创建gRPC连接。

func main() {
  // 程序退出前需要关闭nacos客户端
	defer nacos.CloseNamingClient()
	conf := config.LoadFromEnv()

	tlsOption := grpc.WithTransportCredentials(insecure.NewCredentials())

	// 创建支持nacos简单轮询负载均衡机制的gRPC连接
	grpcConn := nacos.NewRRGrpcClientConn(nacos.NacosConfig{
		ServiceName: "grpcdemo-server_grpc",
	}, tlsOption)
  // 程序退出前需要关闭gRPC连接
	defer grpcConn.Close()


	svc := service.NewEnumDemo(conf, pb.NewHelloworldServiceClient(grpcConn))
	handler := httpsrv.NewEnumDemoHandler(svc)
	srv := rest.NewRestServer()
	srv.AddRoute(httpsrv.Routes(handler)...)
	srv.Run()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

加权轮询负载均衡 (nacos用)

调用 nacos.NewWRRGrpcClientConn 方法,创建gRPC连接。

func main() {
  // 程序退出前需要关闭nacos客户端
	defer nacos.CloseNamingClient()
	conf := config.LoadFromEnv()

	tlsOption := grpc.WithTransportCredentials(insecure.NewCredentials())

	// 创建支持nacos加权轮询负载均衡机制的gRPC连接
	grpcConn := nacos.NewWRRGrpcClientConn(nacos.NacosConfig{
		ServiceName: "grpcdemo-server_grpc",
	}, tlsOption)
  // 程序退出前需要关闭gRPC连接
	defer grpcConn.Close()


	svc := service.NewEnumDemo(conf, pb.NewHelloworldServiceClient(grpcConn))
	handler := httpsrv.NewEnumDemoHandler(svc)
	srv := rest.NewRestServer()
	srv.AddRoute(httpsrv.Routes(handler)...)
	srv.Run()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

简单轮询负载均衡 (zookeeper用)

调用 zk.NewRRGrpcClientConn 方法,创建gRPC连接。

func main() {
	...
  tlsOption := grpc.WithTransportCredentials(insecure.NewCredentials())

	opts := []grpc_retry.CallOption{
		grpc_retry.WithBackoff(grpc_retry.BackoffLinear(100 * time.Millisecond)),
		grpc_retry.WithCodes(codes.NotFound, codes.Aborted),
	}

	dialOptions := []grpc.DialOption{
		tlsOption,
		grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
			grpc_opentracing.StreamClientInterceptor(),
			grpc_retry.StreamClientInterceptor(opts...),
		)),
		grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
			grpc_opentracing.UnaryClientInterceptor(),
			grpc_retry.UnaryClientInterceptor(opts...),
		)),
	}

	// Set up a connection to the server.
	grpcConn := zk.NewRRGrpcClientConn(zk.ServiceConfig{
		Name:    "cloud.unionj.ServiceB_grpc",
		Group:   "",
		Version: "",
	}, dialOptions...)
	defer grpcConn.Close()

	svc := service.NewServiceA(conf, bClient, pb.NewServiceBServiceClient(grpcConn))
	...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

加权轮询负载均衡 (zookeeper用)

调用 zk.NewSWRRGrpcClientConn 方法,创建gRPC连接。

func main() {
	...
  tlsOption := grpc.WithTransportCredentials(insecure.NewCredentials())

	opts := []grpc_retry.CallOption{
		grpc_retry.WithBackoff(grpc_retry.BackoffLinear(100 * time.Millisecond)),
		grpc_retry.WithCodes(codes.NotFound, codes.Aborted),
	}

	dialOptions := []grpc.DialOption{
		tlsOption,
		grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
			grpc_opentracing.StreamClientInterceptor(),
			grpc_retry.StreamClientInterceptor(opts...),
		)),
		grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
			grpc_opentracing.UnaryClientInterceptor(),
			grpc_retry.UnaryClientInterceptor(opts...),
		)),
	}

	// Set up a connection to the server.
	grpcConn := zk.NewSWRRGrpcClientConn(zk.ServiceConfig{
		Name:    "cloud.unionj.ServiceB_grpc",
		Group:   "",
		Version: "",
	}, dialOptions...)
	defer grpcConn.Close()

	svc := service.NewServiceA(conf, bClient, pb.NewServiceBServiceClient(grpcConn))
	...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

登录鉴权

go-doudouframework/grpcx/interceptors/grpcx_auth 包里内置了登录授权相关的拦截器 grpcx_auth.UnaryServerInterceptorgrpcx_auth.StreamServerInterceptor,以及接口 grpcx_auth.Authorizer。开发者可以自定义 grpcx_auth.Authorizer 的接口实现。以下是用法示例:

接口定义

请注意接口方法定义上方的@role注解。go-doudou 的注解用法请参考官方文档的 指南->接口定义->注解->gRPC服务中的使用方法 相关章节。

package service

import "context"

//go:generate go-doudou svc http
//go:generate go-doudou svc grpc

type Annotation interface {
	// 此接口可公开访问,无需校验登录和权限
	GetGuest(ctx context.Context) (data string, err error)
	// 此接口只有登录用户有权访问
	// @role(USER,ADMIN)
	GetUser(ctx context.Context) (data string, err error)
	// 此接口只有管理员有权访问
	// @role(ADMIN)
	GetAdmin(ctx context.Context) (data string, err error)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

grpcx_auth.Authorizer接口实现

以下是基于http basic登录鉴权的自定义 grpcx_auth.Authorizer 接口实现的示例代码:

package grpc

import (
	"annotation/vo"
	"context"
	"encoding/base64"
	grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
	"github.com/unionj-cloud/go-doudou/v2/framework/grpcx/interceptors/grpcx_auth"
	"github.com/unionj-cloud/go-doudou/v2/toolkit/sliceutils"
	"strings"

	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

// 确保AuthInterceptor结构体实现grpcx_auth.Authorizer接口
var _ grpcx_auth.Authorizer = (*AuthInterceptor)(nil)

// AuthInterceptor是grpcx_auth.Authorizer接口的实现
type AuthInterceptor struct {
	// 此处为了简单,采用模拟数据库用户角色表的基于内存的数据结构,
	// 但实际项目中通常会定义一个数据库连接实例作为成员变量,
	// 采用真实的数据库来查询用户表、角色表、权限表等
	userStore vo.UserStore
}

// NewAuthInterceptor是创建AuthInterceptor结构体实例的工厂方法
func NewAuthInterceptor(userStore vo.UserStore) *AuthInterceptor {
	return &AuthInterceptor{
		userStore: userStore,
	}
}

// 解析http basic token,返回用户名和密码
func parseToken(token string) (username, password string, ok bool) {
	c, err := base64.StdEncoding.DecodeString(token)
	if err != nil {
		return "", "", false
	}
	cs := string(c)
	username, password, ok = strings.Cut(cs, ":")
	if !ok {
		return "", "", false
	}
	return username, password, true
}

// Authorize方法实现了grpcx_auth.Authorizer接口
func (interceptor *AuthInterceptor) Authorize(ctx context.Context, fullMethod string) (context.Context, error) {
	method := fullMethod[strings.LastIndex(fullMethod, "/")+1:]
	// go-doudou的注解用法请参考官方文档的"指南->接口定义->注解->gRPC服务中的使用方法"章节
	// 如果gRPC方法定义没有@role注解,则表示可以公开访问,无须鉴权,直接放行
	if !MethodAnnotationStore.HasAnnotation(method, "@role") {
		return ctx, nil
	}
	// 此处依赖了第三方开源库github.com/grpc-ecosystem/go-grpc-middleware的auth包
	// 从metadata里提取http basic token
	token, err := grpc_auth.AuthFromMD(ctx, "Basic")
	if err != nil {
		return ctx, err
	}
	// 解析http basic token,返回用户名和密码
	user, pass, ok := parseToken(token)
	if !ok {
		return ctx, status.Error(codes.Unauthenticated, "Provide user name and password")
	}
	// 通过用户名和密码查到该用户的角色
	role, exists := interceptor.userStore[vo.Auth{user, pass}]
	if !exists {
		return ctx, status.Error(codes.Unauthenticated, "Provide user name and password")
	}
	// 从MethodAnnotationStore中查出可以访问该gRPC方法的角色列表
	params := MethodAnnotationStore.GetParams(method, "@role")
	// 判断该用户的角色是否包含在角色列表中,如果在,则通过了鉴权,如果不在,则拒绝访问
	if !sliceutils.StringContains(params, role.StringGetter()) {
		return ctx, status.Error(codes.PermissionDenied, "Access denied")
	}
	return ctx, nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79

main函数

func main() {
	conf := config.LoadFromEnv()

	svc := service.NewAnnotation(conf)

  // 模拟数据库用户角色表的基于内存的数据结构
	userStore := vo.UserStore{
		vo.Auth{
			User: "guest",
			Pass: "guest",
		}: vo.GUEST,
		vo.Auth{
			User: "user",
			Pass: "user",
		}: vo.USER,
		vo.Auth{
			User: "admin",
			Pass: "admin",
		}: vo.ADMIN,
	}

  // 创建自定义的grpcx_auth.Authorizer接口实现
	authorizer := pb.NewAuthInterceptor(userStore)

	grpcServer := grpcx.NewGrpcServer(
		grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
			grpc_ctxtags.StreamServerInterceptor(),
			grpc_opentracing.StreamServerInterceptor(),
			grpc_prometheus.StreamServerInterceptor,
			logging.StreamServerInterceptor(grpczerolog.InterceptorLogger(zlogger.Logger)),
			// 将authorizer传入grpcx_auth拦截器中
			grpcx_auth.StreamServerInterceptor(authorizer),
			grpc_recovery.StreamServerInterceptor(),
		)),
		grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
			grpc_ctxtags.UnaryServerInterceptor(),
			grpc_opentracing.UnaryServerInterceptor(),
			grpc_prometheus.UnaryServerInterceptor,
			logging.UnaryServerInterceptor(grpczerolog.InterceptorLogger(zlogger.Logger)),
			// 将authorizer传入grpcx_auth拦截器中
			grpcx_auth.UnaryServerInterceptor(authorizer),
			grpc_recovery.UnaryServerInterceptor(),
		)),
	)
	pb.RegisterAnnotationServiceServer(grpcServer, svc)
	grpcServer.Run()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

限流

用法

go-doudou内置了基于golang.org/x/time/rate在新窗口打开实现的令牌桶算法的内存限流器。

github.com/unionj-cloud/go-doudou/v2/framework/ratelimit/memrate包里有一个MemoryStore结构体,存储了key和Limiter实例对。Limiter实例是限流器实例,key是该限流器实例的键。

你可以往memrate.NewLimiter工厂函数传入一个可选函数memrate.WithTimer,设置当key空闲时间超过timeout以后的回调函数,比如可以从MemoryStore实例里将该key删除,以释放内存资源。

go-doudou还提供了基于 go-redis/redis_rate在新窗口打开 库封装的GCRA限流算法的redis限流器。该限流器支持跨实例的全局限流。

内存限流器示例

内存限流器基于本机内存,只支持本机限流。首先需要调用 memrate.NewMemoryStore 创建一个 MemoryStore 实例,存储要限制的key和与之对应的限流器。然后调用 grpcx_ratelimit.NewRateLimitInterceptor(grpcx_ratelimit.WithMemoryStore(mstore)) 创建一个 grpcx_ratelimit.RateLimitInterceptor 拦截器实例。然后需要自定义一个 grpcx_ratelimit.KeyGetter 接口的实现结构体,实现从 context.Context 提取key的逻辑。最后在拦截器链 中加入代码 rl.UnaryServerInterceptor(keyGetter), 即可实现限流。下面是一个对客户端ip限流的示例。

func main() {
	defer etcd.CloseEtcdClient()
	conf := config.LoadFromEnv()
	svc := service.NewHelloworld(conf)

	go func() {
		mstore := memrate.NewMemoryStore(func(_ context.Context, store *memrate.MemoryStore, key string) ratelimit.Limiter {
      // 限流器创建函数,表示创建一个每秒允许处理10次请求,峰值最多允许处理30次请求,同时空闲时间最长10秒的限流器。空闲超过10秒会从内存中删除,已释放内存空间。
      // 空闲时间至少要大于 1 / rate * burst 才有意义,也就是至少要等令牌桶重新填满恢复初始状态以后。
			return memrate.NewLimiter(10, 30, memrate.WithTimer(10*time.Second, func() {
				store.DeleteKey(key)
			}))
		})
		rl := grpcx_ratelimit.NewRateLimitInterceptor(grpcx_ratelimit.WithMemoryStore(mstore))
		keyGetter := &RateLimitKeyGetter{}
		grpcServer := grpcx.NewGrpcServer(
			grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
        // 本示例中必须加grpc_ctxtags拦截器,它会自动帮我们往上下文context.Context中加入RPC调用方的"peer.address"信息
				grpc_ctxtags.StreamServerInterceptor(),
				grpc_opentracing.StreamServerInterceptor(),
				grpc_prometheus.StreamServerInterceptor,
				logging.StreamServerInterceptor(grpczerolog.InterceptorLogger(zlogger.Logger)),
				rl.StreamServerInterceptor(keyGetter),
				grpc_recovery.StreamServerInterceptor(),
			)),
			grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
        // 本示例中必须加grpc_ctxtags拦截器,它会自动帮我们往上下文context.Context中加入RPC调用方的"peer.address"信息
				grpc_ctxtags.UnaryServerInterceptor(),
				grpc_opentracing.UnaryServerInterceptor(),
				grpc_prometheus.UnaryServerInterceptor,
				logging.UnaryServerInterceptor(grpczerolog.InterceptorLogger(zlogger.Logger)),
				rl.UnaryServerInterceptor(keyGetter),
				grpc_recovery.UnaryServerInterceptor(),
			)),
		)
		pb.RegisterHelloworldServiceServer(grpcServer, svc)
		grpcServer.Run()
	}()

	handler := httpsrv.NewHelloworldHandler(svc)
	srv := rest.NewRestServer()
	srv.AddRoute(httpsrv.Routes(handler)...)
	srv.Run()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

自定义 grpcx_ratelimit.KeyGetter 接口的实现结构体:

var _ grpcx_ratelimit.KeyGetter = (*RateLimitKeyGetter)(nil)

type RateLimitKeyGetter struct {
}

func (r *RateLimitKeyGetter) GetKey(ctx context.Context, _ string) string {
	var peerAddr string
	if value, ok := grpc_ctxtags.Extract(ctx).Values()["peer.address"]; ok {
		peerAddr = value.(string)
	}
	if stringutils.IsEmpty(peerAddr) {
		if value, ok := peer.FromContext(ctx); ok {
			peerAddr = value.Addr.String()
		}
	}
	return peerAddr[:strings.LastIndex(peerAddr, ":")]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

Redis限流器示例

Redis限流器可以用于需要多个实例同时对一个key限流的场景。key的过期时间等于根据速率计算的生成1个令牌所需的时间

func main() {
	defer etcd.CloseEtcdClient()
	conf := config.LoadFromEnv()
	svc := service.NewHelloworld(conf)

	go func() {
    rdb := redis.NewClient(&redis.Options{
			Addr: "localhost:6379",
		})
		fn := redisrate.LimitFn(func(ctx context.Context) ratelimit.Limit {
      // 限流器创建函数,表示创建一个每秒允许处理10次请求,峰值最多允许处理30次请求的限流器。
			return ratelimit.PerSecondBurst(10, 30)
		})
		rl := grpcx_ratelimit.NewRateLimitInterceptor(grpcx_ratelimit.WithRedisStore(rdb, fn))
		keyGetter := &RateLimitKeyGetter{}
		grpcServer := grpcx.NewGrpcServer(
			grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
        // 本示例中必须加grpc_ctxtags拦截器,它会自动帮我们往上下文context.Context中加入RPC调用方的"peer.address"信息
				grpc_ctxtags.StreamServerInterceptor(),
				grpc_opentracing.StreamServerInterceptor(),
				grpc_prometheus.StreamServerInterceptor,
				logging.StreamServerInterceptor(grpczerolog.InterceptorLogger(zlogger.Logger)),
				rl.StreamServerInterceptor(keyGetter),
				grpc_recovery.StreamServerInterceptor(),
			)),
			grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
        // 本示例中必须加grpc_ctxtags拦截器,它会自动帮我们往上下文context.Context中加入RPC调用方的"peer.address"信息
				grpc_ctxtags.UnaryServerInterceptor(),
				grpc_opentracing.UnaryServerInterceptor(),
				grpc_prometheus.UnaryServerInterceptor,
				logging.UnaryServerInterceptor(grpczerolog.InterceptorLogger(zlogger.Logger)),
				rl.UnaryServerInterceptor(keyGetter),
				grpc_recovery.UnaryServerInterceptor(),
			)),
		)
		pb.RegisterHelloworldServiceServer(grpcServer, svc)
		grpcServer.Run()
	}()

	handler := httpsrv.NewHelloworldHandler(svc)
	srv := rest.NewRestServer()
	srv.AddRoute(httpsrv.Routes(handler)...)
	srv.Run()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

自定义 grpcx_ratelimit.KeyGetter 接口的实现结构体示例请参考上文内存限流器示例。

重试

实现重试机制需要依赖第三方开源库 github.com/grpc-ecosystem/go-grpc-middlewareretry 模块,将重试拦截器加入 dialOptions 切片中,再将 dialOptions 作为入参放入负载均衡客户端工厂函数中创建gRPC客户端连接实例。具体用法请参考源码中的注释:https://github.com/grpc-ecosystem/go-grpc-middleware/blob/master/retry在新窗口打开

tlsOption := grpc.WithTransportCredentials(insecure.NewCredentials())

opts := []grpc_retry.CallOption{
	grpc_retry.WithBackoff(grpc_retry.BackoffLinear(100 * time.Millisecond)),
	grpc_retry.WithCodes(codes.NotFound, codes.Aborted),
}

dialOptions := []grpc.DialOption{
	tlsOption,
	grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
		grpc_retry.StreamClientInterceptor(opts...),
	)),
	grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
		grpc_retry.UnaryClientInterceptor(opts...),
	)),
}

grpcConn := nacos.NewWRRGrpcClientConn(nacos.NacosConfig{
	ServiceName: "grpcdemo-server_grpc",
}, dialOptions...)
defer grpcConn.Close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

日志

用法

go-doudougithub.com/unionj-cloud/go-doudou/v2/toolkit/zlogger包里内置了一个全局的zerolog.Logger。如果GDD_ENV环境变量不等于空字符串和dev,则会带上一些关于服务本身的元数据。

你也可以调用InitEntry函数自定义zerolog.Logger实例。

你还可以通过配置GDD_LOG_LEVEL环境变量来设置日志级别,配置GDD_LOG_FORMAT环境变量来设置日志格式是json还是text

你可以通过配置GDD_LOG_REQ_ENABLE=true来开启http请求和响应的日志打印,默认是false,即不打印。

示例

// 你可以用lumberjack这个库给服务增加日志rotate的功能
zlogger.SetOutput(io.MultiWriter(os.Stdout, &lumberjack.Logger{
			Filename:   filepath.Join(os.Getenv("LOG_PATH"), fmt.Sprintf("%s.log", "usersvc")),
		  MaxSize:    5,  // 单份日志文件最大5M,超过就会创建新的日志文件
      MaxBackups: 10, // 最多保留10份日志文件
      MaxAge:     7,  // 日志文件最长保留7天
      Compress:   true, // 是否开启日志压缩
}))
1
2
3
4
5
6
7
8

ELK技术栈

logger包支持集成ELK技术栈。

示例

version: '3.9'

services:

 elasticsearch:
   container_name: elasticsearch
   image: "docker.elastic.co/elasticsearch/elasticsearch:7.2.0"
   environment:
     - "ES_JAVA_OPTS=-Xms1g -Xmx1g"
     - "discovery.type=single-node"
   ports:
     - "9200:9200"
   volumes:
     - ./esdata:/usr/share/elasticsearch/data
   networks:
     testing_net:
       ipv4_address: 172.28.1.9

 kibana:
   container_name: kibana
   image: "docker.elastic.co/kibana/kibana:7.2.0"
   ports:
     - "5601:5601"
   networks:
     testing_net:
       ipv4_address: 172.28.1.10

 filebeat:
   container_name: filebeat
   image: "docker.elastic.co/beats/filebeat:7.2.0"
   volumes:
     - ./filebeat.yml:/usr/share/filebeat/filebeat.yml:ro
     - ./log:/var/log
   networks:
     testing_net:
       ipv4_address: 172.28.1.11

networks:
  testing_net:
    ipam:
      driver: default
      config:
        - subnet: 172.28.0.0/16
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

截图

elk

Jaeger调用链监控

用法

集成Jaeger调用链监控需要以下步骤:

  1. 启动Jaeger
docker run -d --name jaeger \
  -p 6831:6831/udp \
  -p 16686:16686 \
  jaegertracing/all-in-one:1.29
1
2
3
4
  1. .env文件添加两行配置
JAEGER_AGENT_HOST=localhost
JAEGER_AGENT_PORT=6831
1
2
  1. main函数里靠前的位置添加三行代码
tracer, closer := tracing.Init()
defer closer.Close()
opentracing.SetGlobalTracer(tracer)
1
2
3
  1. 服务端在调用 grpcx.NewGrpcServer 函数创建 grpcx.GrpcServer 实例时通过 grpc_opentracing.StreamServerInterceptor(),grpc_opentracing.UnaryServerInterceptor(), 两行代码加上opentracing拦截器
func main() {
	defer nacos.CloseNamingClient()
	conf := config.LoadFromEnv()

	tracer, closer := tracing.Init()
	defer closer.Close()
	opentracing.SetGlobalTracer(tracer)
	
	svc := service.NewHelloworld(conf)
	grpcServer := grpcx.NewGrpcServer(
		grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
			grpc_ctxtags.StreamServerInterceptor(),
			grpc_opentracing.StreamServerInterceptor(),
			grpc_prometheus.StreamServerInterceptor,
			logging.StreamServerInterceptor(grpczerolog.InterceptorLogger(zlogger.Logger)),
			grpc_recovery.StreamServerInterceptor(),
		)),
		grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
			grpc_ctxtags.UnaryServerInterceptor(),
			grpc_opentracing.UnaryServerInterceptor(),
			grpc_prometheus.UnaryServerInterceptor,
			logging.UnaryServerInterceptor(grpczerolog.InterceptorLogger(zlogger.Logger)),
			grpc_recovery.UnaryServerInterceptor(),
		)),
	)
	pb.RegisterHelloworldServiceServer(grpcServer, svc)
	grpcServer.Run()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
  1. 客户端也需要给grpc客户端连接实例加上opentracing拦截器,使客户端在发起gRPC请求的时候可以由opentracing实现(jaeger)将span id注入metadata,否则和服务端的调用链串不起来。
func main() {
	defer nacos.CloseNamingClient()
	conf := config.LoadFromEnv()

	tracer, closer := tracing.Init()
	defer closer.Close()
	opentracing.SetGlobalTracer(tracer)

	tlsOption := grpc.WithTransportCredentials(insecure.NewCredentials())

	dialOptions := []grpc.DialOption{
		tlsOption,
		grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
			grpc_opentracing.StreamClientInterceptor(),
		)),
		grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
			grpc_opentracing.UnaryClientInterceptor(),
		)),
	}

	grpcConn := nacos.NewWRRGrpcClientConn(nacos.NacosConfig{
		ServiceName: "grpcdemo-server_grpc",
	}, dialOptions...)
	defer grpcConn.Close()

	restProvider := nacos.NewWRRServiceProvider("grpcdemo-server_rest")
	svc := service.NewEnumDemo(conf, pb.NewHelloworldServiceClient(grpcConn),
		client.NewHelloworldClient(ddclient.WithClient(newClient()), ddclient.WithProvider(restProvider)))
	handler := httpsrv.NewEnumDemoHandler(svc)
	srv := rest.NewRestServer()
	srv.AddRoute(httpsrv.Routes(handler)...)
	srv.Run()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

截图

jaeger3jaeger4