原文:http://tianyalong.icu/content.html?id=19
Protobuf
syntax = "proto3";
package main;
message String {
string value = 1;
}
syntax
语句表示采用proto3
的语法package
指令指明当前是main包,也可以针对不同的语言定制对应的包路径和名称message
关键字定义一个新的String
类型,在最终生成的Go语言代码中对应一个String
结构体。String
类型中只有一个字符串类型的value
成员,该成员编码时用1
编号代替名字。
protoc –go_out=. hello.proto生成go代码
- ProtoMessage
方法表示这是一个实现了proto.Message
接口的方
法
- 为每个成员生成了一个Get
方法GetValue
protoc –go_out=plugins=grpc:. hello.proto生成gpc代码,如果不添加--go_out=plugins=grpc
,只会对message生成代码。
protobuf生成rpc代码
protobuf只能生成message结构体的代码,只有借助插件才能够生成接口相关方法,protobuf借助plugin接口加载插件,共四个函数:
- Name() string
返回插件名称
- Init(g *Generator)
对插件进行初始化
- Generate(file *FileDescriptor)
生成主题代码
- GenerateImports(file *FileDescriptor)
导入包代码,即import()相关代码
其中GenerateImports
方法调用自定义的genImportCode
函数生成导入代码。Generate
方法调用自定义的genServiceCode
方法生成每个服务的代码。
genServiceCode
中就是保存每个函数的名字以及输入输出参数的名字,然后套用自定义的字符串模板
,将函数名字嵌入其中生成对应的代码。
客户端rpc原理
go中内置的rpc调用,通过Client.Call
进行同步阻塞调用,Client.Call
中通过Client.Go
方法实现一次异步调用,Client.GO
中通过调用Client.send
将Call完整的发送到RPC框架,Client.send
方法调用是线程安全的,所以多个goroutine可以同时向一个RPC连接发送请求。通过Client.Done
管道接收RPC完成或失败的结果,参数和相应结果存储在call.Args
和call.Reply
中。
简单实现
- 服务端
type HelloServiceImpl struct{}
func (p *HelloServiceImpl) Hello(ctx context.Context, args *String) (*String, error) {
reply := &String{Value: "hello:" + args.GetValue()}
return reply, nil
}
func main() {
grpcServer := grpc.NewServer()
RegisterHelloServiceServer(grpcServer, new(HelloServiceImpl))
lis, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal(err)
}
grpcServer.Serve(lis)
}
- 客户端
func main() {
conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := NewHelloServiceClient(conn)
reply, err := client.Hello(context.Background(), &String{Value: "hello"})
if err != nil {
log.Fatal(err)
}
fmt.Println(reply.GetValue())
}
基于RPC实现Watch功能
通过在一个goroutine中调用watch函数进行监控。
例如:一个修改服务端的配置方法set。先开一个goroutine启动监听方法watch,之后调用set方法修改,当有改动就调用方法向管道传参,watch感知到修改后就返回修改后的内容,否侧返回错误信息。
反向RPC
有时候rpc服务端是在内网中,外网无法直接连接,可以通过服务端主动连接到外网,然后客户端将TCP连接转换为rpc.Client,之后客户端调用服务器端方法。
gRPC流
protoc --go_out=plugins=grpc:. hello.proto
通过在参数和返回值中增加stream
来实现双向流
service HelloService {
rpc Hello (String) returns (String);
rpc Channel (stream String) returns (stream String);
}
生成代码
type HelloServiceServer interface {
Hello(context.Context, *String) (*String, error)
Channel(HelloService_ChannelServer) error
}
type HelloServiceClient interface {
Hello(ctx context.Context, in *String, opts ...grpc.CallOption) (
*String, error,
)
Channel(ctx context.Context, opts ...grpc.CallOption) (
HelloService_ChannelClient, error,
)
}
type HelloService_ChannelServer interface {
Send(*String) error
Recv() (*String, error)
grpc.ServerStream
}
type HelloService_ChannelClient interface {
Send(*String) error
Recv() (*String, error)
grpc.ClientStream
}
服务端用for
循环不断地接收客户端的请求并处理响应,客户端将Send
和Recv
方法放到不同的goroutine中,通过for
循环不断传输和接收响应。
基于gRPC实现发布订阅
在服务器端管理订阅的channel以及发布功能,客户端a调用服务端发布函数,将消息在服务端所有channel中发送。客户端b订阅消息,会调用订阅方法创建一个channel添加到服务器中,并通过for循环一直接收服务器端最新的消息。
证书认证
生成服务端和客户端证书, /CN=server.grpc.io
表示服务器的名字为 server.grpc.io
$ openssl genrsa -out server.key 2048
$ openssl req -new -x509 -days 3650 \
-subj "/C=GB/L=China/O=grpc-server/CN=server.grpc.io" \
-key server.key -out server.crt
$ openssl genrsa -out client.key 2048
$ openssl req -new -x509 -days 3650 \
-subj "/C=GB/L=China/O=grpc-client/CN=client.grpc.io" \
-key client.key -out client.crt
- 服务端使用证书
creds, err := credentials.NewServerTLSFromFile("server.crt", "server.key")
if err != nil {
log.Fatal(err)
}
server := grpc.NewServer(grpc.Creds(creds))
- 客户端使用证书
func main() {
creds, err := credentials.NewClientTLSFromFile(
"server.crt", "server.grpc.io",
)
if err != nil {
log.Fatal(err)
}
conn, err := grpc.Dial("localhost:5000",
grpc.WithTransportCredentials(creds),
)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
...
}
根证书
为了避免证书的传递过程中被篡改,可以通过一个安全可靠的根证书分别对服务器和客户端的证书进行签名。这样客户端或服务器在收到对方的证书后可以通过根证书进行验证证书的有效性。
- 生成根证书
openssl req -new -x509 -days 3650 -subj "/C=GB/L=China/O=gobook/CN=github.com" -key ca.key -out ca.crt
- 重新对服务器端证书进行签名
openssl x509 -req -sha256 -CA ca.crt -CAkey ca.key -CAcreateserial -days 3650 -in server.csr -out server.crt
签名的过程中引入了一个新的以.csr为后缀名的文件,它表示证书签名请求文件。在证书签名完成之后可以删除.csr文件。
- 重新实现客户端
func main() {
certificate, err := tls.LoadX509KeyPair("client.crt", "client.key")
if err != nil {
log.Fatal(err)
}
certPool := x509.NewCertPool()
ca, err := ioutil.ReadFile("ca.crt")
if err != nil {
log.Fatal(err)
}
if ok := certPool.AppendCertsFromPEM(ca); !ok {
log.Fatal("failed to append ca certs")
}
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{certificate},
ServerName: tlsServerName, // NOTE: this is required!
RootCAs: certPool,
})
conn, err := grpc.Dial(
"localhost:5000", grpc.WithTransportCredentials(creds),
)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
}
在新的客户端代码中,我们不再直接依赖服务器端证书文件。在credentials.NewTLS函数调用中,客户端通过引入一个CA根证书和服务器的名字来实现对服务器进行验证。客户端在链接服务器时会首先请求服务器的证书,然后使用CA根证书对收到的服务器端证书进行验证。
- 如果客户端的证书也采用CA根证书签名的话,服务器端也可以对客户端进行证书认证。我们用CA根证书对客户端证书签名:
$ openssl req -new \
-subj "/C=GB/L=China/O=client/CN=client.io" \
-key client.key \
-out client.csr
$ openssl x509 -req -sha256 \
-CA ca.crt -CAkey ca.key -CAcreateserial -days 3650 \
-in client.csr \
-out client.crt
因为引入了CA根证书签名,在启动服务端时同样要配置根证书:
func main() {
certificate, err := tls.LoadX509KeyPair("server.crt", "server.key")
if err != nil {
log.Fatal(err)
}
certPool := x509.NewCertPool()
ca, err := ioutil.ReadFile("ca.crt")
if err != nil {
log.Fatal(err)
}
if ok := certPool.AppendCertsFromPEM(ca); !ok {
log.Fatal("failed to append certs")
}
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{certificate},
ClientAuth: tls.RequireAndVerifyClientCert, // NOTE: this isoptional!
ClientCAs: certPool,
})
server := grpc.NewServer(grpc.Creds(creds))
...
}
Token认证
我们可以创建一个Authentication类型,实现PerRPCCredentials
接口,用于实现用户名和密码的认证在GetRequestMetadata
方法中返回认证需要的必要信息。RequireTransportSecurity
方法表示是否要求底层使用安全链接。
type Authentication struct {
User string
Password string
}
func (a *Authentication) GetRequestMetadata(context.Context, ...string) (map[string]string, error,) {
return map[string]string{"user": a.User, "password": a.Password}, nil
}
func (a *Authentication) RequireTransportSecurity() bool {
return false //建议开启,为了代码演示简单,没有开
}
- 客户端实现
func main() {
auth := Authentication{
Login: "gopher",
Password: "password",
}
conn, err := grpc.Dial("localhost"+port, grpc.WithInsecure(), grpc.WithPerRPCCredentials(&auth))
if err != nil {
log.Fatal(err)
}
defer conn.Close()
}
- 服务端实现
type grpcServer struct{ auth *Authentication }
func (p *grpcServer) SomeMethod(ctx context.Context, in *HelloRequest,) (*HelloReply, error) {
if err := p.auth.Auth(ctx); err != nil {
return nil, err
}
return &HelloReply{Message: "Hello " + in.Name}, nil
}
func (a *Authentication) Auth(ctx context.Context) error {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return fmt.Errorf("missing credentials")
}
var appid string
var appkey string
if val, ok := md["login"]; ok {
appid = val[0]
}
if val, ok := md["password"]; ok {
appkey = val[0]
}
if appid != a.Login || appkey != a.Password {
return grpc.Errorf(codes.Unauthenticated, "invalid token")
}
return nil
}
详细地认证工作主要在Authentication.Auth
方法中完成。首先通过
metadata.FromIncomingContext
从ctx
上下文中获取元信息,然后取出相应的认证信息进行认证。如果认证失败,则返回一个codes.Unauthenticated
类型地错误。
截取器
gRPC中的grpc.UnaryInterceptor
和grpc.StreamInterceptor
分别对普通方法和流方法提供了截取器的支持。类似go中的中间件,在函数执行前可以先执行一些日志或者调用一些Token验证函数,或是函数执行后做一些校验工作。需要实现filter函数:
func filter(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler,) (resp interface{}, err error) {
log.Println("fileter:", info)
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic: %v", r)
}
}()
return handler(ctx, req)
}
ctx
和req
参数就是每个普通的RPC方法的前两个参数。info
参数表示当前是对应的那个gRPC方法。handler
参数对应当前的gRPC方法函数实体。
使用filter截取器函数,只需要在启动gRPC服务时作为参数输入即可:
server := grpc.NewServer(grpc.UnaryInterceptor(filter))
不过grpc内置的截取器一个服务只能使用一个,即只能对一个服务的一个函数使用。
可以使用开源的grpc-ecosystem
项目中的go-grpc-middleware
包已经基于gRPC对截取器实现了链式截取器的支持。
import "github.com/grpc-ecosystem/go-grpc-middleware"
myServer := grpc.NewServer(
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
filter1, filter2, ...
)),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
filter1, filter2, ...
)),
)
gRPC和Web服务共存
gRPC构建在HTTP/2协议之上,因此我们可以将gRPC服务和普通的Web服务架设在同一个端口之上。
- 没有启动TLS的HTTP2
func main() {
mux := http.NewServeMux()
h2Handler := h2c.NewHandler(mux, &http2.Server{})
server = &http.Server{Addr: ":3999", Handler: h2Handler}
server.ListenAndServe()
}
- 普通https服务
func main() {
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
fmt.Fprintln(w, "hello")
})
http.ListenAndServeTLS(port, "server.crt", "server.key",
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mux.ServeHTTP(w, r)
return
}),
)
}
- 带证书gRPC服务
func main() {
creds, err := credentials.NewServerTLSFromFile("server.crt", "server.key")
if err != nil {
log.Fatal(err)
}
grpcServer := grpc.NewServer(grpc.Creds(creds))
...
}
- 同时支持Web和gRPC协议的路由处理函数
func main() {
http.ListenAndServeTLS(port, "server.crt", "server.key",
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.ProtoMajor != 2 {
mux.ServeHTTP(w, r)
return
}
if strings.Contains(r.Header.Get("Content-Type"), "application/grpc",) {
grpcServer.ServeHTTP(w, r) // gRPC Server
return
}
mux.ServeHTTP(w, r)
return
}),
)
}