← Back to blog

Go云原生与微服务学习三

云原生Go微服务

Go云原生与微服务学习三

metadata

metadata是指在处理RPC请求和响应过程中需要但又不属于具体业务的信息,比如身份验证等详细信息,metadata采用的是key-value的键值对形式,其中键是string类型,值通常是[]string类型。有点类似于我们的HTTP headers中的键值对,通常来说metadata可以包含认证token 请求标识等等信息

元数据对grpc本身是不可见的,也就是不定义在.proto文件中,我们通常是直接在应用代码程序中或者中间件中使用metadata

在Go语言中 metadata类型定义如下

type MD map[string][]string [code] func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (pb.HelloResponse, error) { defer func() { trailer := metadata.Pairs( “timestamp”, strconv.Itoa(int(time.Now().Unix())), ) grpc.SetTrailer(ctx, trailer) }() //在执行业务逻辑之前 首先要check metadata中是否包含token md, ok := metadata.FromIncomingContext(ctx) fmt.Printf(“md:%#v,ok:#%v”, md, ok) if !ok { return nil, status.Error(codes.Unauthenticated, “无效请求”) } //if vl,ok:=md[“token”];ok { // if len(vl) > 0 && vl[0]== “app-test-chengxisheng” //} vl := md.Get(“token”) if len(vl) < 1 || vl[0] != “app-test-chengxisheng” { return nil, status.Error(codes.Unauthenticated, “无效token”) } reply := “hello” + in.GetName() //发送数据前先发送header header := metadata.New(map[string]string{ “location”: “Beijing”, }) grpc.SendHeader(ctx, header) return &pb.HelloResponse{Reply: reply}, nil } [/code] [code] func main() { //解析命令行参数 flag.Parse() //连接server 带加密连接 conn, err := grpc.Dial(“127.0.0.1:8972”, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatalf(“grpc.Dial failed,err:%v”, err) return } defer conn.Close() //创建客户端 c := proto.NewGreeterClient(conn) //使用context进行控制,传入background和超时时间一秒钟 ctx, cancel := context.WithTimeout(context.Background(), time.Second1) defer cancel()

	md := metadata.Pairs(
		"token", "app-test-chengxisheng",
	)
	//返回的是一个新的ctx
	ctx = metadata.NewOutgoingContext(ctx, md)
	//接受metadata
	//在发送调用的时候就得声明变量
	var header, trailer metadata.MD
	resp, err := c.SayHello(ctx, &proto.HelloRequest{Name: *name}, grpc.Header(&header), grpc.Trailer(&trailer))
	if err != nil {
		log.Printf("c.SayHello failed, err:%v", err)
		return
	}
	//获取header 和 trailer
	log.Printf("header:%v", header)
	// 拿到RPC响应
	log.Printf("resp:%v", resp.GetReply())
	log.Printf("trailer:%v", trailer)
	//callLotsOfReplies(c)
	//runLotsOfGreeting(c)
	//runBidiHello(c)
}

[/code]

error code

gRPC类似于HTTP定义了一套响应的状态码,由google.golang.org/grpc/codes定义,本质上每个Code都是一个uint32

Code含义
OK0请求成功
Canceled1操作已取消
Unknown2未知错误。如果从另一个地址空间接收到的状态值属 于在该地址空间中未知的错误空间,则可以返回此错误的示例。 没有返回足够的错误信息的API引发的错误也可能会转换为此错误
InvalidArgument3表示客户端指定的参数无效。 请注意,这与 FailedPrecondition 不同。 它表示无论系统状态如何都有问题的参数(例如,格式错误的文件名)。
DeadlineExceeded4表示操作在完成之前已过期。对于改变系统状态的操作,即使操作成功完成,也可能会返回此错误。 例如,来自服务器的成功响应可能已延迟足够长的时间以使截止日期到期。
NotFound5表示未找到某些请求的实体(例如,文件或目录)。
AlreadyExists6创建实体的尝试失败,因为实体已经存在。
PermissionDenied7表示调用者没有权限执行指定的操作。 它不能用于拒绝由耗尽某些资源引起的(使用 ResourceExhausted )。 如果无法识别调用者,也不能使用它(使用 Unauthenticated )。
ResourceExhausted8表示某些资源已耗尽,可能是每个用户的配额,或者整个文件系统空间不足
FailedPrecondition9指示操作被拒绝,因为系统未处于操作执行所需的状态。 例如,要删除的目录可能是非空的,rmdir 操作应用于非目录等。
Aborted10表示操作被中止,通常是由于并发问题,如排序器检查失败、事务中止等。
OutOfRange11表示尝试超出有效范围的操作。
Unimplemented12表示此服务中未实施或不支持/启用操作。
Internal13意味着底层系统预期的一些不变量已被破坏。 如果你看到这个错误,则说明问题很严重。
Unavailable14表示服务当前不可用。这很可能是暂时的情况,可以通过回退重试来纠正。 请注意,重试非幂等操作并不总是安全的。
DataLoss15表示不可恢复的数据丢失或损坏
Unauthenticated16表示请求没有用于操作的有效身份验证凭据
_maxCode17-

我们可以通过我们

我们可以通过status来实现自定义的错误声明 [code] package main

import (
	"context"
	"fmt"
	"hello_server/pb"
	"net"
	"sync"

	"google.golang.org/genproto/googleapis/rpc/errdetails"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

// grpc server

type server struct {
	pb.UnimplementedGreeterServer
	mu    sync.Mutex     // count的并发锁
	count map[string]int // 记录每个name的请求次数
}

// SayHello 是我们需要实现的方法
// 这个方法是我们对外提供的服务
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.count[in.Name]++ // 记录用户的请求次数
	// 超过1次就返回错误
	if s.count[in.Name] > 1 {
		st := status.New(codes.ResourceExhausted, "Request limit exceeded.")
		ds, err := st.WithDetails(
			&errdetails.QuotaFailure{
				Violations: []*errdetails.QuotaFailure_Violation{{
					Subject:     fmt.Sprintf("name:%s", in.Name),
					Description: "限制每个name调用一次",
				}},
			},
		)
		if err != nil {
			return nil, st.Err()
		}
		return nil, ds.Err()
	}
	// 正常返回响应
	reply := "hello " + in.GetName()
	return &pb.HelloResponse{Reply: reply}, nil
}

func main() {
	// 启动服务
	l, err := net.Listen("tcp", ":8972")
	if err != nil {
		fmt.Printf("failed to listen, err:%v\n", err)
		return
	}
	s := grpc.NewServer() // 创建grpc服务
	// 注册服务,注意初始化count
	pb.RegisterGreeterServer(s, &server{count: make(map[string]int)})
	// 启动服务
	err = s.Serve(l)
	if err != nil {
		fmt.Printf("failed to serve,err:%v\n", err)
		return
	}
}

[/code]


[code] package main

import ("context"
	"flag"
	"fmt"
	"google.golang.org/grpc/status"
	"hello_client/pb"
	"log"
	"time"

	"google.golang.org/genproto/googleapis/rpc/errdetails"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

// grpc 客户端
// 调用server端的 SayHello 方法

var name = flag.String("name", "七米", "通过-name告诉server你是谁")

func main() {
	flag.Parse() // 解析命令行参数

	// 连接server
	conn, err := grpc.Dial("127.0.0.1:8972", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("grpc.Dial failed,err:%v", err)
		return
	}
	defer conn.Close()
	// 创建客户端
	c := pb.NewGreeterClient(conn) // 使用生成的Go代码
	// 调用RPC方法
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	resp, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
	if err != nil {
		s := status.Convert(err)        // 将err转为status
		for _, d := range s.Details() { // 获取details
			switch info := d.(type) {
			case *errdetails.QuotaFailure:
				fmt.Printf("Quota failure: %s\n", info)
			default:
				fmt.Printf("Unexpected type: %s\n", info)
			}
		}
		fmt.Printf("c.SayHello failed, err:%v\n", err)
		return
	}
	// 拿到了RPC响应
	log.Printf("resp:%v\n", resp.GetReply())
}

[/code]

拦截器

gRPC为拦截器提供了一些简单的API,拦截器可以拦截每个RPC调用的执行,用户可以使用拦截器进行日志记录、身份验证/授权、指标收集以及其他可以跨RPC共享的功能

在gRPC中,拦截器根据拦截的RPC调用类型可以分为一元拦截器和流式拦截器两种 [code] // unaryInterceptor 客户端一元拦截器 func unaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts …grpc.CallOption) error { var credsConfigured bool // 看请求是不是带了token for _, o := range opts { _, ok := o.(grpc.PerRPCCredsCallOption) if ok { credsConfigured = true break } } // 没带 塞一个进去 if !credsConfigured { opts = append(opts, grpc.PerRPCCredentials(oauth.NewOauthAccess(&oauth2.Token{ AccessToken: “some-secret-token”, }))) } start := time.Now() err := invoker(ctx, method, req, reply, cc, opts…) end := time.Now() fmt.Printf(“RPC: %s, start time: %s, end time: %s, err: %v\n”, method, start.Format(“Basic”), end.Format(time.RFC3339), err) return err } [/code] [code] // 需要自定义类型 type wrappedStream struct { grpc.ClientStream }

// 重写对应的方法 为拦截器的日志添加更详细的定制化内容 
func (w *wrappedStream) RecvMsg(m interface{}) error {
	logger("Receive a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))
	return w.ClientStream.RecvMsg(m)
}

func (w *wrappedStream) SendMsg(m interface{}) error {
	logger("Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))
	return w.ClientStream.SendMsg(m)
}

func newWrappedStream(s grpc.ClientStream) grpc.ClientStream {
	return &wrappedStream{s}
}

func streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
	var credsConfigured bool
	for _, o := range opts {
		_, ok := o.(*grpc.PerRPCCredsCallOption)
		if ok {
			credsConfigured = true
			break
		}
	}
	if !credsConfigured {
		opts = append(opts, grpc.PerRPCCredentials(oauth.NewOauthAccess(&oauth2.Token{
			AccessToken: "some-secret-token",
		})))
	}
	s, err := streamer(ctx, desc, cc, method, opts...)
	if err != nil {
		return nil, err
	}
	return newWrappedStream(s), nil
}

[/code]

[code] // unaryInterceptor 服务端一元拦截器 func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { // authentication (token verification) md, ok := metadata.FromIncomingContext(ctx) if !ok { return nil, status.Errorf(codes.InvalidArgument, “missing metadata”) } // 验证token是否有效 if !valid(md[“authorization”]) { return nil, status.Errorf(codes.Unauthenticated, “invalid token”) } m, err := handler(ctx, req) if err != nil { fmt.Printf(“RPC failed with error %v\n”, err) } return m, err } [/code] [code] // streamInterceptor 服务端流拦截器 func streamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { // authentication (token verification) md, ok := metadata.FromIncomingContext(ss.Context()) if !ok { return status.Errorf(codes.InvalidArgument, “missing metadata”) } if !valid(md[“authorization”]) { return status.Errorf(codes.Unauthenticated, “invalid token”) }

	err := handler(srv, newWrappedStream(ss))
	if err != nil {
		fmt.Printf("RPC failed with error %v\n", err)
	}
	return err
}

[/code]

gRPC-Gateway

gRPC-Gateway是一个protoc插件,它读取gRPC服务定义的