小言_互联网的博客

GO grpc

211人阅读  评论(0)

视频地址:https://www.bilibili.com/video/BV1eE411T7GC?p=18

  • 建立服务端 go_grpc(工程名,刚开始忘记加server)

创建相应的目录

1.terminal运行

go get google.golang.org/grpc 

2.编写message.proto&&enum.proto

坑1 可能import会出现问题,就算在goland上报错只要能顺利执行下一步的指令就可以忽略红色报错

message


  
  1. syntax = "proto3";
  2. import "enums.proto";
  3. import "google/protobuf/timestamp.proto";
  4. option go_package = "./pb";
  5. package pb;
  6. message Employee {
  7. int32 id = 1;
  8. int32 number = 2;
  9. string firstName = 3;
  10. string lastName = 4;
  11. MonthSalary monthSalary = 6;
  12. EmployeeStatus status = 7;
  13. google.protobuf.Timestamp lastModfied = 8;
  14. reserved 5;
  15. reserved "salary";
  16. }
  17. message MonthSalary {
  18. float basic = 1;
  19. float bonus = 2;
  20. }
  21. message GetByNoRequest {
  22. int32 number = 1;
  23. }
  24. message EmployeeResponse {
  25. Employee employee = 1;
  26. }
  27. message GetAllRequest { }
  28. message AddPhotoRequest {
  29. bytes data = 1;
  30. }
  31. message AddPhotoResponse {
  32. bool isOk = 1;
  33. }
  34. message EmployeeRequest {
  35. Employee employee = 1;
  36. }
  37. service EmployeeService {
  38. rpc GetByNo(GetByNoRequest) returns (EmployeeResponse);
  39. rpc GetAll(GetAllRequest) returns (stream EmployeeResponse);
  40. rpc AddPhoto(stream AddPhotoRequest) returns (AddPhotoResponse);
  41. rpc Save(EmployeeRequest) returns (EmployeeResponse);
  42. rpc SaveAll(stream EmployeeRequest) returns (stream EmployeeResponse);
  43. }

enum


  
  1. syntax = "proto3";
  2. option go_package = "./pb";
  3. package pb;
  4. enum EmployeeStatus {
  5. NORMAL = 0;
  6. ON_VACATION = 1;
  7. RESIGNED = 2;
  8. RETIRED = 3;
  9. }

 

3.运行指令生成两个go proto文件//

注意一定要使用plugin命令,不然无法生成对应的service的proto

注意proto中的

option go_package = "./pb";
protoc ./*.proto --go_out=plugins=grpc:../

运行完上述指令之后会自动在pb文件下创立pb.go

4.生成证书认证

巨坑,见下面的客户端

server 代码


  
  1. package main
  2. import (
  3. "errors"
  4. "fmt"
  5. "go_grpc/pb"
  6. "golang.org/x/net/context"
  7. "google.golang.org/grpc"
  8. "google.golang.org/grpc/credentials"
  9. "google.golang.org/grpc/metadata"
  10. "io"
  11. "log"
  12. "net"
  13. )
  14. const port = ":5001"
  15. func main(){
  16. listen, err := net.Listen( "tcp",port)
  17. if err != nil {
  18. log.Fatalln(err.Error())
  19. }
  20. //创建creds证书
  21. creds,err := credentials.NewServerTLSFromFile( "server.pem", "server.key")
  22. if err != nil {
  23. log.Fatalln(err.Error())
  24. }
  25. //通过grpc传递creds证书
  26. options := []grpc.ServerOption{grpc.Creds(creds)}
  27. //创建server
  28. server := grpc.NewServer(options...)
  29. pb.RegisterEmployeeServiceServer(server, new(employeeService))
  30. log.Println( "gRPC Server started ..." + port)
  31. //开启server监听listen端口号
  32. server.Serve(listen)
  33. }
  34. type employeeService struct{}
  35. //GetByNo:通过员工编号找到员工
  36. //一元消息传递
  37. func (s *employeeService) GetByNo (ctx context.Context,
  38. req *pb.GetByNoRequest) (*pb.EmployeeResponse, error){
  39. for _, e := range employees {
  40. if req.Number == e.Number {
  41. return &pb.EmployeeResponse{
  42. Employee: &e,
  43. }, nil
  44. }
  45. }
  46. return nil,errors.New( "Employee not found")
  47. }
  48. //二元消息传递
  49. //服务端会将数据以streaming的形式传回
  50. func (s *employeeService)GetAll(req *pb.GetAllRequest,
  51. stream pb.EmployeeService_GetAllServer) error {
  52. for _,e := range employees {
  53. //stream.send会将数据一块块的传给客户端
  54. stream.Send(&pb.EmployeeResponse{
  55. Employee: &e,
  56. })
  57. }
  58. return nil
  59. }
  60. //client 以stream的形式传输图片给服务端
  61. func (s *employeeService)AddPhoto(stream pb.EmployeeService_AddPhotoServer) error {
  62. md, ok := metadata.FromIncomingContext(stream.Context())
  63. if ok {
  64. //通过metadata获取并输出employee的number
  65. fmt.Printf( "Employee: %s\n",md[ "number"][ 0])
  66. }
  67. img := [] byte{}
  68. for {
  69. data,err := stream.Recv()
  70. if err == io.EOF {
  71. //输出文件大小
  72. fmt.Printf( "File Size: %d\n", len(img))
  73. //告诉客户端已经成功接收
  74. return stream.SendAndClose(&pb.AddPhotoResponse{
  75. IsOk: true,
  76. })
  77. if err != nil {
  78. return err
  79. }
  80. }
  81. //输出每次接收的一小块的大小
  82. fmt.Printf( "File received: %d\n", len(data.Data))
  83. img = append(img,data.Data...)
  84. }
  85. }
  86. func (s *employeeService)Save(context.Context,
  87. *pb.EmployeeRequest) (*pb.EmployeeResponse, error){
  88. return nil, nil
  89. }
  90. //双向传送stream
  91. func (s *employeeService)SaveAll(
  92. stream pb.EmployeeService_SaveAllServer) error {
  93. for {
  94. empReq, err := stream.Recv()
  95. if err == io.EOF {
  96. break
  97. }
  98. if err != nil {
  99. return err
  100. }
  101. employees = append(employees,*empReq.Employee)
  102. stream.Send(&pb.EmployeeResponse{
  103. Employee: empReq.Employee,
  104. })
  105. }
  106. for _,emp := range employees {
  107. fmt.Println(emp)
  108. }
  109. return nil
  110. }

 

 

 

 

  • 建立客户端 go_grpc_client

创建相应的目录文件,并存放proto

1.terminal运行

go get google.golang.org/grpc 

2.运行指令生成两个go proto文件

 protoc ./*.proto --go_out=plugins=grpc:../

 

这样客户端和服务器端的证书会相互匹配

3.编写main函数


  
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "go_grpc_client/pb"
  6. "google.golang.org/grpc"
  7. "google.golang.org/grpc/credentials"
  8. "log"
  9. )
  10. const port = ":5001"
  11. func main() {
  12. creds, err := credentials.NewClientTLSFromFile( "cert.pem", "")
  13. if err != nil {
  14. log.Fatal(err.Error())
  15. }
  16. //接着设置options
  17. options := []grpc.DialOption{grpc.WithTransportCredentials(creds)}
  18. conn, err := grpc.Dial( "localhost" + port,options ...)
  19. if err != nil {
  20. log.Fatal(err.Error())
  21. }
  22. defer conn.Close()
  23. client := pb.NewEmployeeServiceClient(conn)
  24. fmt.Println( "Client Server started...")
  25. getByNo(client)
  26. }
  27. func getByNo(client pb.EmployeeServiceClient) {
  28. res, err := client.GetByNo(context.Background(),&pb.GetByNoRequest{Number: 1994})
  29. if err != nil {
  30. log.Fatal(err.Error())
  31. }
  32. fmt.Println(res.Employee)
  33. }

go run main.go 报错


  
  1. code = Unavailable desc = connection error: desc = "transport: authentication handshake failed: x509: certificate relies on legacy Common Name field, use SANs or temporarily enable Common Name matching with GODEBUG=x509ignoreCN= 0 "
  2. Exiting.
  • 运行报错 GRPC X509 Common Name field, use SANs or temporarily enable Common Name matching

解决:问题原因GO1.15版本以上已经把Common Name这个东西砍了!解决办法:

转载自:https://blog.csdn.net/weixin_40280629/article/details/113563351

其中需要注意的是:alt_names 下的DNS.1=www.xxx.com(这个地方自己设置) 还有下面的ip也设置一下

然后将服务端生成的server.pem文件拷贝到客户端,修改代码段:


  
  1. //第二个参数就是刚才在alt_names中设置的DNS
  2. creds, err := credentials.NewClientTLSFromFile( "server.pem", "www.test.com")

服务端修改代码段:

creds,err := credentials.NewServerTLSFromFile("server.pem","server.key")

开始运行,通过

成功获取了服务端的数据,自此,简单的grpc一元消息传递已经完成

其实这里可以去补一下http协议中的ssl和tls

  • grpc消息传输类型

grpc的消息传输类型有4种:

  1. 第一种是一元的消息,就是简单的请求响应
  2. 第二种是server streaming(流),server会把数据streaming回给client(streaming是把数据分成块传输)
  3. 第三种是client streaming,也就是client会把数据streamin给server
  4. 第四种是双向的streaming

 

  • server streaming

server 代码


  
  1. //二元消息传递
  2. //服务端会将数据以streaming的形式传回
  3. func (s *employeeService)GetAll(req *pb.GetAllRequest,
  4. stream pb.EmployeeService_GetAllServer) error {
  5. for _,e := range employees {
  6. //stream.send会将数据一块块的传给客户端
  7. stream.Send(&pb.EmployeeResponse{
  8. Employee: &e,
  9. })
  10. }
  11. return nil
  12. }

client 代码


  
  1. func getAll(client pb.EmployeeServiceClient) {
  2. stream, err := client.GetAll(context.Background(),&pb.GetAllRequest{})
  3. if err != nil {
  4. log.Fatal(err.Error())
  5. }
  6. for {
  7. res,err := stream.Recv()
  8. //如果服务端数据发送结束,则为EOF
  9. if err == io.EOF {
  10. break
  11. }
  12. if err != nil {
  13. log.Fatal(err.Error())
  14. }
  15. fmt.Println(res.Employee)
  16. }
  17. }
  • client streaming

采用stream的形式向服务器传送图片

client 代码


  
  1. func addPhoto (client pb.EmployeeServiceClient) {
  2. imgFile, err := os.Open( "WechatIMG3.jpeg")
  3. if err != nil {
  4. log.Fatal(err.Error())
  5. }
  6. defer imgFile.Close()
  7. //metadata相当于报文的header,我们只需要把用户number放在header传输一次就可以了
  8. md := metadata.New( map[ string] string{ "number": "1994"})
  9. context := context.Background()
  10. context = metadata.NewOutgoingContext(context,md)
  11. stream, err := client.AddPhoto(context)
  12. if err != nil {
  13. fmt.Println(err)
  14. log.Fatal(err.Error())
  15. }
  16. //循环分块传输数据
  17. for {
  18. chunk := make([] byte, 128* 1024)
  19. chunkSize, err := imgFile.Read(chunk)
  20. if err == io.EOF {
  21. break
  22. }
  23. if err != nil {
  24. log.Fatal(err.Error())
  25. }
  26. if chunkSize < len(chunk) {
  27. chunk = chunk[:chunkSize]
  28. }
  29. //开始分块发送数据
  30. stream.Send(&pb.AddPhotoRequest{Data: chunk})
  31. }
  32. //closeandrec会向客户端发送一个信号EOF,等待服务端发回一个响应
  33. res,err := stream.CloseAndRecv()
  34. if err != nil {
  35. log.Fatal(err.Error())
  36. }
  37. fmt.Println(res.IsOk)
  38. }

server 代码


  
  1. //client 以stream的形式传输图片给服务端
  2. func (s *employeeService)AddPhoto(stream pb.EmployeeService_AddPhotoServer) error {
  3. md, ok := metadata.FromIncomingContext(stream.Context())
  4. if ok {
  5. //通过metadata获取并输出employee的number
  6. fmt.Printf( "Employee: %s\n",md[ "number"][ 0])
  7. }
  8. img := [] byte{}
  9. for {
  10. data,err := stream.Recv()
  11. if err == io.EOF {
  12. //输出文件大小
  13. fmt.Printf( "File Size: %d\n", len(img))
  14. //告诉客户端已经成功接收
  15. return stream.SendAndClose(&pb.AddPhotoResponse{
  16. IsOk: true,
  17. })
  18. if err != nil {
  19. return err
  20. }
  21. }
  22. //输出每次接收的一小块的大小
  23. fmt.Printf( "File received: %d\n", len(data.Data))
  24. img = append(img,data.Data...)
  25. }
  26. }

client 代码


  
  1. //client 以stream的形式传输图片给服务端
  2. func (s *employeeService)AddPhoto(stream pb.EmployeeService_AddPhotoServer) error {
  3. md, ok := metadata.FromIncomingContext(stream.Context())
  4. if ok {
  5. //通过metadata获取并输出employee的number
  6. fmt.Printf( "Employee: %s\n",md[ "number"][ 0])
  7. }
  8. img := [] byte{}
  9. for {
  10. data,err := stream.Recv()
  11. if err == io.EOF {
  12. //输出文件大小
  13. fmt.Printf( "File Size: %d\n", len(img))
  14. //告诉客户端已经成功接收
  15. return stream.SendAndClose(&pb.AddPhotoResponse{
  16. IsOk: true,
  17. })
  18. if err != nil {
  19. return err
  20. }
  21. }
  22. //输出每次接收的一小块的大小
  23. fmt.Printf( "File received: %d\n", len(data.Data))
  24. img = append(img,data.Data...)
  25. }
  26. }

 

  • 双向stream

client 代码


  
  1. func saveAll(client pb.EmployeeServiceClient) {
  2. employees := []pb.Employee{
  3. pb.Employee{
  4. Id: 200,
  5. Number: 201,
  6. FirstName: "xx",
  7. LastName: "xx1",
  8. MonthSalary: &pb.MonthSalary{
  9. Basic: 200,
  10. Bonus: 125.5,
  11. },
  12. Status: pb.EmployeeStatus_NORMAL,
  13. LastModfied: &timestamppb.Timestamp{
  14. Seconds: time.Now().Unix(),
  15. },
  16. },pb.Employee{
  17. Id: 300,
  18. Number: 301,
  19. FirstName: "asd",
  20. LastName: "wefewf",
  21. MonthSalary: &pb.MonthSalary{
  22. Basic: 300,
  23. Bonus: 5.5,
  24. },
  25. Status: pb.EmployeeStatus_NORMAL,
  26. LastModfied: &timestamppb.Timestamp{
  27. Seconds: time.Now().Unix(),
  28. },
  29. },pb.Employee{
  30. Id: 400,
  31. Number: 401,
  32. FirstName: "www",
  33. LastName: "wwwwq",
  34. MonthSalary: &pb.MonthSalary{
  35. Basic: 4566,
  36. Bonus: 100,
  37. },
  38. Status: pb.EmployeeStatus_NORMAL,
  39. LastModfied: &timestamppb.Timestamp{
  40. Seconds: time.Now().Unix(),
  41. },
  42. },
  43. }
  44. stream,err := client.SaveAll(context.Background())
  45. if err != nil {
  46. log.Fatal(err.Error())
  47. }
  48. //我们不知道什么时候服务器会把数据发回,我们不能在这阻塞,采用goroutine
  49. finshChannel := make( chan struct{})
  50. go func() {
  51. for {
  52. res,err := stream.Recv()
  53. if err == io.EOF {
  54. finshChannel <- struct{}{}
  55. break
  56. }
  57. if err != nil {
  58. log.Fatal(err.Error())
  59. }
  60. fmt.Println(res.Employee)
  61. }
  62. }()
  63. for _,e := range employees {
  64. err := stream.Send(&pb.EmployeeRequest{
  65. Employee: &e,
  66. })
  67. if err != nil {
  68. log.Fatal(err.Error())
  69. }
  70. }
  71. stream.CloseSend()
  72. <-finshChannel
  73. }

 

server 代码


  
  1. //双向传送stream
  2. func (s *employeeService)SaveAll(
  3. stream pb.EmployeeService_SaveAllServer) error {
  4. for {
  5. empReq, err := stream.Recv()
  6. if err == io.EOF {
  7. break
  8. }
  9. if err != nil {
  10. return err
  11. }
  12. employees = append(employees,*empReq.Employee)
  13. stream.Send(&pb.EmployeeResponse{
  14. Employee: empReq.Employee,
  15. })
  16. }
  17. for _,emp := range employees {
  18. fmt.Println(emp)
  19. }
  20. return nil
  21. }

 

client


  
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "go_grpc_client/pb"
  6. "google.golang.org/grpc"
  7. "google.golang.org/grpc/credentials"
  8. "google.golang.org/grpc/metadata"
  9. "google.golang.org/protobuf/types/known/timestamppb"
  10. "io"
  11. "log"
  12. "os"
  13. "time"
  14. )
  15. const port = ":5001"
  16. func main() {
  17. creds, err := credentials.NewClientTLSFromFile( "server.pem", "www.test.com")
  18. if err != nil {
  19. log.Fatal(err.Error())
  20. }
  21. //接着设置options
  22. options := []grpc.DialOption{grpc.WithTransportCredentials(creds)}
  23. conn, err := grpc.Dial( "localhost" + port,options ...)
  24. if err != nil {
  25. log.Fatal(err.Error())
  26. }
  27. defer conn.Close()
  28. client := pb.NewEmployeeServiceClient(conn)
  29. fmt.Println( "Client Server started...")
  30. //getByNo(client)
  31. //getAll(client)
  32. //addPhoto(client)
  33. saveAll(client)
  34. }
  35. func saveAll(client pb.EmployeeServiceClient) {
  36. employees := []pb.Employee{
  37. pb.Employee{
  38. Id: 200,
  39. Number: 201,
  40. FirstName: "xx",
  41. LastName: "xx1",
  42. MonthSalary: &pb.MonthSalary{
  43. Basic: 200,
  44. Bonus: 125.5,
  45. },
  46. Status: pb.EmployeeStatus_NORMAL,
  47. LastModfied: &timestamppb.Timestamp{
  48. Seconds: time.Now().Unix(),
  49. },
  50. },pb.Employee{
  51. Id: 300,
  52. Number: 301,
  53. FirstName: "asd",
  54. LastName: "wefewf",
  55. MonthSalary: &pb.MonthSalary{
  56. Basic: 300,
  57. Bonus: 5.5,
  58. },
  59. Status: pb.EmployeeStatus_NORMAL,
  60. LastModfied: &timestamppb.Timestamp{
  61. Seconds: time.Now().Unix(),
  62. },
  63. },pb.Employee{
  64. Id: 400,
  65. Number: 401,
  66. FirstName: "www",
  67. LastName: "wwwwq",
  68. MonthSalary: &pb.MonthSalary{
  69. Basic: 4566,
  70. Bonus: 100,
  71. },
  72. Status: pb.EmployeeStatus_NORMAL,
  73. LastModfied: &timestamppb.Timestamp{
  74. Seconds: time.Now().Unix(),
  75. },
  76. },
  77. }
  78. stream,err := client.SaveAll(context.Background())
  79. if err != nil {
  80. log.Fatal(err.Error())
  81. }
  82. //我们不知道什么时候服务器会把数据发回,我们不能在这阻塞,采用goroutine
  83. finshChannel := make( chan struct{})
  84. go func() {
  85. for {
  86. res,err := stream.Recv()
  87. if err == io.EOF {
  88. finshChannel <- struct{}{}
  89. break
  90. }
  91. if err != nil {
  92. log.Fatal(err.Error())
  93. }
  94. fmt.Println(res.Employee)
  95. }
  96. }()
  97. for _,e := range employees {
  98. err := stream.Send(&pb.EmployeeRequest{
  99. Employee: &e,
  100. })
  101. if err != nil {
  102. log.Fatal(err.Error())
  103. }
  104. }
  105. stream.CloseSend()
  106. <-finshChannel
  107. }
  108. func addPhoto (client pb.EmployeeServiceClient) {
  109. imgFile, err := os.Open( "WechatIMG3.jpeg")
  110. if err != nil {
  111. log.Fatal(err.Error())
  112. }
  113. defer imgFile.Close()
  114. //metadata相当于报文的header,我们只需要把用户number放在header传输一次就可以了
  115. md := metadata.New( map[ string] string{ "number": "1994"})
  116. context := context.Background()
  117. context = metadata.NewOutgoingContext(context,md)
  118. stream, err := client.AddPhoto(context)
  119. if err != nil {
  120. fmt.Println(err)
  121. log.Fatal(err.Error())
  122. }
  123. //循环分块传输数据
  124. for {
  125. chunk := make([] byte, 128* 1024)
  126. chunkSize, err := imgFile.Read(chunk)
  127. if err == io.EOF {
  128. break
  129. }
  130. if err != nil {
  131. log.Fatal(err.Error())
  132. }
  133. if chunkSize < len(chunk) {
  134. chunk = chunk[:chunkSize]
  135. }
  136. //开始分块发送数据
  137. stream.Send(&pb.AddPhotoRequest{Data: chunk})
  138. }
  139. //closeandrec会向客户端发送一个信号EOF,等待服务端发回一个响应
  140. res,err := stream.CloseAndRecv()
  141. if err != nil {
  142. log.Fatal(err.Error())
  143. }
  144. fmt.Println(res.IsOk)
  145. }
  146. func getAll(client pb.EmployeeServiceClient) {
  147. stream, err := client.GetAll(context.Background(),&pb.GetAllRequest{})
  148. if err != nil {
  149. log.Fatal(err.Error())
  150. }
  151. for {
  152. res,err := stream.Recv()
  153. //如果服务端数据发送结束,则为EOF
  154. if err == io.EOF {
  155. break
  156. }
  157. if err != nil {
  158. log.Fatal(err.Error())
  159. }
  160. fmt.Println(res.Employee)
  161. }
  162. }
  163. func getByNo(client pb.EmployeeServiceClient) {
  164. res, err := client.GetByNo(context.Background(),&pb.GetByNoRequest{Number: 1994})
  165. if err != nil {
  166. log.Fatal(err.Error())
  167. }
  168. fmt.Println(res.Employee)
  169. }

 

至此完结散花!!!

 


  
  1. 1. go get -u github.com/golang/protobuf/protoc-gen- go
  2. 2. go get -u google.golang.org/grpc
  3. protoc ./person.proto --go_out=./
  4. 将当前目录下的person.roto转译成 go语言放在当前目录
  5. 注意在person.proto中写上
  6. option go_package= "./;firstpb";
  7. 左边表示转译之后存放的位置,右边表示 go语言的包名
  8. protoc ./message.proto --go_out=plugins=grpc:../
  9. 用plugins工具编译service函数
  10. openssl req -x509 -newkey rsa: 4096 -keyout key.pem -out cert.pem -days 365 -nodes -subj '/CN=localhost'
  11. 设置证书,设置key.pem和cert.pem

 

 

 

 

 

 

 

 

 

 


转载:https://blog.csdn.net/leekerian/article/details/115654055
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场