Go云原生与微服务学习三
Go云原生与微服务学习三
metadata
metadata是指在处理RPC请求和响应过程中需要但又不属于具体业务的信息,比如身份验证等详细信息,metadata采用的是key-value的键值对形式,其中键是string类型,值通常是[]string类型。有点类似于我们的HTTP headers中的键值对,通常来说metadata可以包含认证token 请求标识等等信息
元数据对grpc本身是不可见的,也就是不定义在.proto文件中,我们通常是直接在应用代码程序中或者中间件中使用metadata
在Go语言中 metadata类型定义如下
type MD map[string][]string
[code]
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (pb.HelloResponse, error) {
defer func() {
trailer := metadata.Pairs(
“timestamp”, strconv.Itoa(int(time.Now().Unix())),
)
grpc.SetTrailer(ctx, trailer)
}()
//在执行业务逻辑之前 首先要check metadata中是否包含token
md, ok := metadata.FromIncomingContext(ctx)
fmt.Printf(“md:%#v,ok:#%v”, md, ok)
if !ok {
return nil, status.Error(codes.Unauthenticated, “无效请求”)
}
//if vl,ok:=md[“token”];ok {
// if len(vl) > 0 && vl[0]== “app-test-chengxisheng”
//}
vl := md.Get(“token”)
if len(vl) < 1 || vl[0] != “app-test-chengxisheng” {
return nil, status.Error(codes.Unauthenticated, “无效token”)
}
reply := “hello” + in.GetName()
//发送数据前先发送header
header := metadata.New(map[string]string{
“location”: “Beijing”,
})
grpc.SendHeader(ctx, header)
return &pb.HelloResponse{Reply: reply}, nil
}
[/code]
[code]
func main() {
//解析命令行参数
flag.Parse()
//连接server 带加密连接
conn, err := grpc.Dial(“127.0.0.1:8972”, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf(“grpc.Dial failed,err:%v”, err)
return
}
defer conn.Close()
//创建客户端
c := proto.NewGreeterClient(conn)
//使用context进行控制,传入background和超时时间一秒钟
ctx, cancel := context.WithTimeout(context.Background(), time.Second1)
defer cancel()
md := metadata.Pairs(
"token", "app-test-chengxisheng",
)
//返回的是一个新的ctx
ctx = metadata.NewOutgoingContext(ctx, md)
//接受metadata
//在发送调用的时候就得声明变量
var header, trailer metadata.MD
resp, err := c.SayHello(ctx, &proto.HelloRequest{Name: *name}, grpc.Header(&header), grpc.Trailer(&trailer))
if err != nil {
log.Printf("c.SayHello failed, err:%v", err)
return
}
//获取header 和 trailer
log.Printf("header:%v", header)
// 拿到RPC响应
log.Printf("resp:%v", resp.GetReply())
log.Printf("trailer:%v", trailer)
//callLotsOfReplies(c)
//runLotsOfGreeting(c)
//runBidiHello(c)
}
[/code]
error code
gRPC类似于HTTP定义了一套响应的状态码,由google.golang.org/grpc/codes定义,本质上每个Code都是一个uint32
| Code | 值 | 含义 |
|---|---|---|
| OK | 0 | 请求成功 |
| Canceled | 1 | 操作已取消 |
| Unknown | 2 | 未知错误。如果从另一个地址空间接收到的状态值属 于在该地址空间中未知的错误空间,则可以返回此错误的示例。 没有返回足够的错误信息的API引发的错误也可能会转换为此错误 |
| InvalidArgument | 3 | 表示客户端指定的参数无效。 请注意,这与 FailedPrecondition 不同。 它表示无论系统状态如何都有问题的参数(例如,格式错误的文件名)。 |
| DeadlineExceeded | 4 | 表示操作在完成之前已过期。对于改变系统状态的操作,即使操作成功完成,也可能会返回此错误。 例如,来自服务器的成功响应可能已延迟足够长的时间以使截止日期到期。 |
| NotFound | 5 | 表示未找到某些请求的实体(例如,文件或目录)。 |
| AlreadyExists | 6 | 创建实体的尝试失败,因为实体已经存在。 |
| PermissionDenied | 7 | 表示调用者没有权限执行指定的操作。 它不能用于拒绝由耗尽某些资源引起的(使用 ResourceExhausted )。 如果无法识别调用者,也不能使用它(使用 Unauthenticated )。 |
| ResourceExhausted | 8 | 表示某些资源已耗尽,可能是每个用户的配额,或者整个文件系统空间不足 |
| FailedPrecondition | 9 | 指示操作被拒绝,因为系统未处于操作执行所需的状态。 例如,要删除的目录可能是非空的,rmdir 操作应用于非目录等。 |
| Aborted | 10 | 表示操作被中止,通常是由于并发问题,如排序器检查失败、事务中止等。 |
| OutOfRange | 11 | 表示尝试超出有效范围的操作。 |
| Unimplemented | 12 | 表示此服务中未实施或不支持/启用操作。 |
| Internal | 13 | 意味着底层系统预期的一些不变量已被破坏。 如果你看到这个错误,则说明问题很严重。 |
| Unavailable | 14 | 表示服务当前不可用。这很可能是暂时的情况,可以通过回退重试来纠正。 请注意,重试非幂等操作并不总是安全的。 |
| DataLoss | 15 | 表示不可恢复的数据丢失或损坏 |
| Unauthenticated | 16 | 表示请求没有用于操作的有效身份验证凭据 |
| _maxCode | 17 | - |
我们可以通过我们
我们可以通过status来实现自定义的错误声明
[code]
package main
import (
"context"
"fmt"
"hello_server/pb"
"net"
"sync"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// grpc server
type server struct {
pb.UnimplementedGreeterServer
mu sync.Mutex // count的并发锁
count map[string]int // 记录每个name的请求次数
}
// SayHello 是我们需要实现的方法
// 这个方法是我们对外提供的服务
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
s.count[in.Name]++ // 记录用户的请求次数
// 超过1次就返回错误
if s.count[in.Name] > 1 {
st := status.New(codes.ResourceExhausted, "Request limit exceeded.")
ds, err := st.WithDetails(
&errdetails.QuotaFailure{
Violations: []*errdetails.QuotaFailure_Violation{{
Subject: fmt.Sprintf("name:%s", in.Name),
Description: "限制每个name调用一次",
}},
},
)
if err != nil {
return nil, st.Err()
}
return nil, ds.Err()
}
// 正常返回响应
reply := "hello " + in.GetName()
return &pb.HelloResponse{Reply: reply}, nil
}
func main() {
// 启动服务
l, err := net.Listen("tcp", ":8972")
if err != nil {
fmt.Printf("failed to listen, err:%v\n", err)
return
}
s := grpc.NewServer() // 创建grpc服务
// 注册服务,注意初始化count
pb.RegisterGreeterServer(s, &server{count: make(map[string]int)})
// 启动服务
err = s.Serve(l)
if err != nil {
fmt.Printf("failed to serve,err:%v\n", err)
return
}
}
[/code]
[code] package main
import ("context"
"flag"
"fmt"
"google.golang.org/grpc/status"
"hello_client/pb"
"log"
"time"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// grpc 客户端
// 调用server端的 SayHello 方法
var name = flag.String("name", "七米", "通过-name告诉server你是谁")
func main() {
flag.Parse() // 解析命令行参数
// 连接server
conn, err := grpc.Dial("127.0.0.1:8972", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("grpc.Dial failed,err:%v", err)
return
}
defer conn.Close()
// 创建客户端
c := pb.NewGreeterClient(conn) // 使用生成的Go代码
// 调用RPC方法
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
resp, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
if err != nil {
s := status.Convert(err) // 将err转为status
for _, d := range s.Details() { // 获取details
switch info := d.(type) {
case *errdetails.QuotaFailure:
fmt.Printf("Quota failure: %s\n", info)
default:
fmt.Printf("Unexpected type: %s\n", info)
}
}
fmt.Printf("c.SayHello failed, err:%v\n", err)
return
}
// 拿到了RPC响应
log.Printf("resp:%v\n", resp.GetReply())
}
[/code]
拦截器
gRPC为拦截器提供了一些简单的API,拦截器可以拦截每个RPC调用的执行,用户可以使用拦截器进行日志记录、身份验证/授权、指标收集以及其他可以跨RPC共享的功能
在gRPC中,拦截器根据拦截的RPC调用类型可以分为一元拦截器和流式拦截器两种 [code] // unaryInterceptor 客户端一元拦截器 func unaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts …grpc.CallOption) error { var credsConfigured bool // 看请求是不是带了token for _, o := range opts { _, ok := o.(grpc.PerRPCCredsCallOption) if ok { credsConfigured = true break } } // 没带 塞一个进去 if !credsConfigured { opts = append(opts, grpc.PerRPCCredentials(oauth.NewOauthAccess(&oauth2.Token{ AccessToken: “some-secret-token”, }))) } start := time.Now() err := invoker(ctx, method, req, reply, cc, opts…) end := time.Now() fmt.Printf(“RPC: %s, start time: %s, end time: %s, err: %v\n”, method, start.Format(“Basic”), end.Format(time.RFC3339), err) return err } [/code] [code] // 需要自定义类型 type wrappedStream struct { grpc.ClientStream }
// 重写对应的方法 为拦截器的日志添加更详细的定制化内容
func (w *wrappedStream) RecvMsg(m interface{}) error {
logger("Receive a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))
return w.ClientStream.RecvMsg(m)
}
func (w *wrappedStream) SendMsg(m interface{}) error {
logger("Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))
return w.ClientStream.SendMsg(m)
}
func newWrappedStream(s grpc.ClientStream) grpc.ClientStream {
return &wrappedStream{s}
}
func streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
var credsConfigured bool
for _, o := range opts {
_, ok := o.(*grpc.PerRPCCredsCallOption)
if ok {
credsConfigured = true
break
}
}
if !credsConfigured {
opts = append(opts, grpc.PerRPCCredentials(oauth.NewOauthAccess(&oauth2.Token{
AccessToken: "some-secret-token",
})))
}
s, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
return nil, err
}
return newWrappedStream(s), nil
}
[/code]
[code] // unaryInterceptor 服务端一元拦截器 func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { // authentication (token verification) md, ok := metadata.FromIncomingContext(ctx) if !ok { return nil, status.Errorf(codes.InvalidArgument, “missing metadata”) } // 验证token是否有效 if !valid(md[“authorization”]) { return nil, status.Errorf(codes.Unauthenticated, “invalid token”) } m, err := handler(ctx, req) if err != nil { fmt.Printf(“RPC failed with error %v\n”, err) } return m, err } [/code] [code] // streamInterceptor 服务端流拦截器 func streamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { // authentication (token verification) md, ok := metadata.FromIncomingContext(ss.Context()) if !ok { return status.Errorf(codes.InvalidArgument, “missing metadata”) } if !valid(md[“authorization”]) { return status.Errorf(codes.Unauthenticated, “invalid token”) }
err := handler(srv, newWrappedStream(ss))
if err != nil {
fmt.Printf("RPC failed with error %v\n", err)
}
return err
}
[/code]
gRPC-Gateway
gRPC-Gateway是一个protoc插件,它读取gRPC服务定义的