gRPC
四种通信方式
- 简单rpc 一元rpc (Unary RPC)
- 服务端流式RPC (Server Streaming RPC) 错误的认识:服务端返回的是一个List。实际上是 不同的时刻,返回多个结果,长链接才可以达到这种效果
- 客户端流式RPC (Client Streaming RPC)
- 双向流RPC (Bi-directional Stream RPC)
一元RPC:
开发过程中,主要采用就是一元RPC的这种通信方式
blockingStub,FutureStub
服务端流式:
blockingStub,Stub
客户端流式:
Stub
双端流式:
Stub
gRPC代理方式:
1 2 3 4 5 6 7
| 1. BlockingStub 阻塞 通信方式 2. Stub 异步 通过监听处理的 3. FutureStub 同步 异步 NettyFuture 1. FutureStub只能应用 一元RPC
|
proto
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| syntax = "proto3"; option java_multiple_files = false; option java_package = "com.suns"; option java_outer_classname = "MeProto";
message MeHelloRequest1{ string name = 1; int32 age = 2; }
message MeHelloResponse1{ string result = 1; int32 code = 2; }
message MeHelloRequest2{ repeated string name = 1; int32 age = 2; }
message MeHelloResponse2{ repeated string result = 1; int32 code = 2; }
service MeHelloService{ rpc meUnary(MeHelloRequest1) returns (MeHelloResponse1){}; rpc serverStream(MeHelloRequest1) returns (stream MeHelloResponse1){}; rpc clientStream(stream MeHelloRequest1) returns (MeHelloResponse1){}; rpc bothStream(stream MeHelloRequest1) returns (stream MeHelloResponse1){}; rpc repeatedunary(MeHelloRequest2) returns (MeHelloResponse2) {}; }
|
Server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
| package com.suns.service;
import com.google.protobuf.ProtocolStringList; import com.suns.MeHelloServiceGrpc; import com.suns.MeProto; import io.grpc.stub.StreamObserver;
import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit;
public class MeHelloServiceImpl extends MeHelloServiceGrpc.MeHelloServiceImplBase {
@Override public void meUnary(MeProto.MeHelloRequest1 request, StreamObserver<MeProto.MeHelloResponse1> responseObserver) { String name = request.getName(); int age = request.getAge(); System.out.println("name = " + name); System.out.println("age = " + age); MeProto.MeHelloResponse1 helloSuccess = MeProto.MeHelloResponse1.newBuilder().setResult("hello success").setCode(22).build();
try { TimeUnit.SECONDS.sleep(20); } catch (InterruptedException e) { throw new RuntimeException(e); } responseObserver.onNext(helloSuccess);
responseObserver.onCompleted(); }
@Override public void serverStream(MeProto.MeHelloRequest1 request, StreamObserver<MeProto.MeHelloResponse1> responseObserver) { int age = request.getAge(); String name = request.getName(); System.out.println("age = " + age); System.out.println("name = " + name); for (int i = 0; i < 10; i++) { if (i > 5){ throw new RuntimeException("ddddddddd"); } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {
} MeProto.MeHelloResponse1.Builder builder = MeProto.MeHelloResponse1.newBuilder(); builder.setResult("server stream"); builder.setCode(i); MeProto.MeHelloResponse1 build = builder.build(); responseObserver.onNext(build); System.out.println("==================" + i + LocalDateTime.now());
} responseObserver.onCompleted(); }
@Override public StreamObserver<MeProto.MeHelloRequest1> clientStream(StreamObserver<MeProto.MeHelloResponse1> responseObserver) { List<String> allNames = new ArrayList<>(); return new StreamObserver<MeProto.MeHelloRequest1>() { @Override public void onNext(MeProto.MeHelloRequest1 meHelloRequest1) { allNames.add(meHelloRequest1.getName()); System.out.println(meHelloRequest1.getName()); }
@Override public void onError(Throwable throwable) {
}
@Override public void onCompleted() { System.out.println("MeHelloServiceImpl.onCompleted"); allNames.forEach(System.out::println);
MeProto.MeHelloResponse1.Builder builder1 = MeProto.MeHelloResponse1.newBuilder(); builder1.setResult("all is over"); builder1.setCode(66); responseObserver.onNext(builder1.build()); responseObserver.onCompleted(); } }; }
@Override public StreamObserver<MeProto.MeHelloRequest1> bothStream(StreamObserver<MeProto.MeHelloResponse1> responseObserver) { return new StreamObserver<MeProto.MeHelloRequest1>() { @Override public void onNext(MeProto.MeHelloRequest1 meHelloRequest1) { System.out.println("MeHelloServiceImpl.onNext"); MeProto.MeHelloResponse1.Builder builder = MeProto.MeHelloResponse1.newBuilder(); builder.setResult("message:" + meHelloRequest1.getName()); responseObserver.onNext(builder.build()); }
@Override public void onError(Throwable throwable) {
}
@Override public void onCompleted() { System.out.println("MeHelloServiceImpl.onCompleted"); responseObserver.onCompleted(); }
}; }
@Override public void repeatedunary(MeProto.MeHelloRequest2 request, StreamObserver<MeProto.MeHelloResponse2> responseObserver) { ProtocolStringList nameList = request.getNameList(); nameList.forEach(System.out::println); ArrayList<String> values = new ArrayList<>(); values.add("a"); values.add("b"); MeProto.MeHelloResponse2 build = MeProto.MeHelloResponse2.newBuilder().addAllResult(values).build(); responseObserver.onNext(build); responseObserver.onCompleted(); } }
|
一元访问
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package com.suns;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder;
import java.util.concurrent.TimeUnit;
public class MeClient_1_unary { public static void main(String[] args) { ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();
try { MeHelloServiceGrpc.MeHelloServiceBlockingStub newBlockingStub = MeHelloServiceGrpc.newBlockingStub(managedChannel); MeProto.MeHelloRequest1 xixi = MeProto.MeHelloRequest1.newBuilder().setName("xixi").setAge(223).build();
MeProto.MeHelloResponse1 meHelloResponse1 = newBlockingStub.meUnary(xixi); String result = meHelloResponse1.getResult(); int code = meHelloResponse1.getCode(); System.out.println("result = " + result); System.out.println("code = " + code); TimeUnit.SECONDS.sleep(5); } catch (Exception e) { throw new RuntimeException(e); } finally { managedChannel.shutdown(); }
} }
|
服务器流式响应-阻塞
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| package com.suns;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder;
import java.time.LocalDateTime; import java.util.Iterator;
public class MeClient_2_serverStream { public static void main(String[] args) { ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();
try { MeHelloServiceGrpc.MeHelloServiceBlockingStub newBlockingStub = MeHelloServiceGrpc.newBlockingStub(managedChannel); MeProto.MeHelloRequest1 xixi = MeProto.MeHelloRequest1.newBuilder().setName("xixi").setAge(223).build();
Iterator<MeProto.MeHelloResponse1> meHelloResponse1Iterator = newBlockingStub.serverStream(xixi); while (meHelloResponse1Iterator.hasNext()) { MeProto.MeHelloResponse1 next = meHelloResponse1Iterator.next(); System.out.println(next.getCode()); System.out.println(next.getResult()); System.out.println("=================="+ LocalDateTime.now()); } } catch (Exception e) { throw new RuntimeException(e); } finally { managedChannel.shutdown(); }
} }
|
服务器流式响应-异步
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| package com.suns;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver;
import java.time.LocalDateTime; import java.util.concurrent.TimeUnit;
public class MeClient_3_serverStream_Stub { public static void main(String[] args) { ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();
try { MeHelloServiceGrpc.MeHelloServiceStub serviceStub = MeHelloServiceGrpc.newStub(managedChannel); MeProto.MeHelloRequest1 xixi = MeProto.MeHelloRequest1.newBuilder().setName("xixi").setAge(223).build(); serviceStub.serverStream(xixi, new StreamObserver<MeProto.MeHelloResponse1>() { @Override public void onNext(MeProto.MeHelloResponse1 next) { System.out.println(next.getCode()); System.out.println(next.getResult()); System.out.println("==================" + LocalDateTime.now()); }
@Override public void onError(Throwable throwable) { System.out.println(throwable.getMessage()); }
@Override public void onCompleted() { System.out.println("结束时间 = " + LocalDateTime.now()); } }); managedChannel.awaitTermination(12, TimeUnit.SECONDS); } catch (Exception e) { throw new RuntimeException(e); } finally { managedChannel.shutdown(); }
} }
|
客户端流式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| package com.suns;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver;
import java.util.concurrent.TimeUnit;
public class MeClient_4_clientStream { public static void main(String[] args) { ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();
try { MeHelloServiceGrpc.MeHelloServiceStub serviceStub = MeHelloServiceGrpc.newStub(managedChannel);
StreamObserver<MeProto.MeHelloRequest1> observer = serviceStub.clientStream(new StreamObserver<MeProto.MeHelloResponse1>() { @Override public void onNext(MeProto.MeHelloResponse1 meHelloResponse1) { System.out.println("MeHelloClient_4_clientStream.onNext"); System.out.println("meHelloResponse1.getResult() = " + meHelloResponse1.getResult()); }
@Override public void onError(Throwable throwable) {
}
@Override public void onCompleted() { System.out.println("MeHelloClient_4_clientStream.onCompleted"); } }); for (int i = 0; i < 10; i++) { MeProto.MeHelloRequest1 xixi = MeProto.MeHelloRequest1.newBuilder().setName("xixi" + i).setAge(223).build(); TimeUnit.SECONDS.sleep(1); observer.onNext(xixi); } observer.onCompleted(); TimeUnit.SECONDS.sleep(20); } catch (Exception e) { throw new RuntimeException(e); } finally { managedChannel.shutdown(); }
} }
|
双端流式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| package com.suns;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver;
import java.util.concurrent.TimeUnit;
public class MeClient_5_bothStream { public static void main(String[] args) { ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();
try { MeHelloServiceGrpc.MeHelloServiceStub serviceStub = MeHelloServiceGrpc.newStub(managedChannel);
StreamObserver<MeProto.MeHelloRequest1> observer = serviceStub.bothStream(new StreamObserver<MeProto.MeHelloResponse1>() { @Override public void onNext(MeProto.MeHelloResponse1 meHelloResponse1) { System.out.println("MeHelloClient_5_clientStream.onNext"); System.out.println("meHelloResponse1.getResult() = " + meHelloResponse1.getResult()); }
@Override public void onError(Throwable throwable) {
}
@Override public void onCompleted() { System.out.println("MeHelloClient_5_clientStream.onCompleted"); } }); for (int i = 0; i < 10; i++) { MeProto.MeHelloRequest1 xixi = MeProto.MeHelloRequest1.newBuilder().setName("xixi" + i).setAge(223).build(); TimeUnit.SECONDS.sleep(1); observer.onNext(xixi); } observer.onCompleted(); TimeUnit.SECONDS.sleep(20); } catch (Exception e) { throw new RuntimeException(e); } finally { managedChannel.shutdown(); }
} }
|
一元将来时
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| package com.suns;
import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder;
import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;
public class MeClient_6_unary_future { public static void main(String[] args) { ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build(); try {
MeHelloServiceGrpc.MeHelloServiceFutureStub serviceStub = MeHelloServiceGrpc.newFutureStub(managedChannel);
ListenableFuture<MeProto.MeHelloResponse1> future = serviceStub.meUnary(MeProto.MeHelloRequest1.newBuilder().setName("future").build());
Futures.addCallback(future, new FutureCallback<MeProto.MeHelloResponse1>() { @Override public void onSuccess(MeProto.MeHelloResponse1 meHelloResponse1) { System.out.println("MeClient_6_Future.onSuccess"); System.out.println(meHelloResponse1.getResult()); }
@Override public void onFailure(Throwable throwable) { System.out.println("MeClient_6_Future.onSuccess"); System.out.println(throwable.getMessage()); } }, Executors.newCachedThreadPool());
System.out.println("后续的操作....");
managedChannel.awaitTermination(40, TimeUnit.SECONDS); } catch (Exception e) { e.printStackTrace(); } finally { managedChannel.shutdown(); } } }
|
一元重复键
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| package com.suns;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder;
import java.util.Arrays; import java.util.concurrent.TimeUnit;
public class MeClient_7_unary_repeated { public static void main(String[] args) { ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build(); try {
MeHelloServiceGrpc.MeHelloServiceBlockingStub serviceStub = MeHelloServiceGrpc.newBlockingStub(managedChannel);
MeProto.MeHelloRequest2.Builder future1 = MeProto.MeHelloRequest2.newBuilder(); future1.addAllName(Arrays.asList("c", "d")); MeProto.MeHelloResponse2 repeatedunary = serviceStub.repeatedunary(future1.build());
repeatedunary.getResultList().forEach(System.out::println);
managedChannel.awaitTermination(40, TimeUnit.SECONDS); } catch (Exception e) { e.printStackTrace(); } finally { managedChannel.shutdown(); } } }
|
参考链接:
手把手教大家在 gRPC 中使用 JWT 完成身份校验
https://blog.csdn.net/u012702547/article/details/129163995