gRPC

gRPC

四种通信方式

  1. 简单rpc 一元rpc (Unary RPC)
  2. 服务端流式RPC (Server Streaming RPC) 错误的认识:服务端返回的是一个List。实际上是 不同的时刻,返回多个结果,长链接才可以达到这种效果
  3. 客户端流式RPC (Client Streaming RPC)
  4. 双向流RPC (Bi-directional Stream RPC)

一元RPC:

开发过程中,主要采用就是一元RPC的这种通信方式

blockingStub,FutureStub

服务端流式:

blockingStub,Stub

客户端流式:

Stub

双端流式:

Stub

gRPC代理方式:

1
2
3
4
5
6
7
1. BlockingStub
阻塞 通信方式
2. Stub
异步 通过监听处理的
3. FutureStub
同步 异步 NettyFuture
1. FutureStub只能应用 一元RPC

proto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
syntax = "proto3";
option java_multiple_files = false;
option java_package = "com.suns";
option java_outer_classname = "MeProto";

message MeHelloRequest1{
string name = 1;
int32 age = 2;
}

message MeHelloResponse1{
string result = 1;
int32 code = 2;
}

message MeHelloRequest2{
repeated string name = 1;
int32 age = 2;
}

message MeHelloResponse2{
repeated string result = 1;
int32 code = 2;
}

service MeHelloService{
rpc meUnary(MeHelloRequest1) returns (MeHelloResponse1){};
rpc serverStream(MeHelloRequest1) returns (stream MeHelloResponse1){};
rpc clientStream(stream MeHelloRequest1) returns (MeHelloResponse1){};
rpc bothStream(stream MeHelloRequest1) returns (stream MeHelloResponse1){};
rpc repeatedunary(MeHelloRequest2) returns (MeHelloResponse2) {};
}

Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package com.suns.service;

import com.google.protobuf.ProtocolStringList;
import com.suns.MeHelloServiceGrpc;
import com.suns.MeProto;
import io.grpc.stub.StreamObserver;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;


public class MeHelloServiceImpl extends MeHelloServiceGrpc.MeHelloServiceImplBase {

@Override
public void meUnary(MeProto.MeHelloRequest1 request, StreamObserver<MeProto.MeHelloResponse1> responseObserver) {
String name = request.getName();
int age = request.getAge();
System.out.println("name = " + name);
System.out.println("age = " + age);
MeProto.MeHelloResponse1 helloSuccess = MeProto.MeHelloResponse1.newBuilder().setResult("hello success").setCode(22).build();

try {
TimeUnit.SECONDS.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
responseObserver.onNext(helloSuccess);

responseObserver.onCompleted();
}

@Override
public void serverStream(MeProto.MeHelloRequest1 request, StreamObserver<MeProto.MeHelloResponse1> responseObserver) {
int age = request.getAge();
String name = request.getName();
System.out.println("age = " + age);
System.out.println("name = " + name);
for (int i = 0; i < 10; i++) {
if (i > 5){
throw new RuntimeException("ddddddddd");
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {

}
MeProto.MeHelloResponse1.Builder builder = MeProto.MeHelloResponse1.newBuilder();
builder.setResult("server stream");
builder.setCode(i);
MeProto.MeHelloResponse1 build = builder.build();
responseObserver.onNext(build);
System.out.println("==================" + i + LocalDateTime.now());

}
responseObserver.onCompleted();
}

@Override
public StreamObserver<MeProto.MeHelloRequest1> clientStream(StreamObserver<MeProto.MeHelloResponse1> responseObserver) {
List<String> allNames = new ArrayList<>();
return new StreamObserver<MeProto.MeHelloRequest1>() {
@Override
public void onNext(MeProto.MeHelloRequest1 meHelloRequest1) {
allNames.add(meHelloRequest1.getName());
System.out.println(meHelloRequest1.getName());
}

@Override
public void onError(Throwable throwable) {

}

@Override
public void onCompleted() {
System.out.println("MeHelloServiceImpl.onCompleted");
allNames.forEach(System.out::println);
//提供响应:响应的目的:当接受了全部client提交的信息,并处理后,提供相应

MeProto.MeHelloResponse1.Builder builder1 = MeProto.MeHelloResponse1.newBuilder();
builder1.setResult("all is over");
builder1.setCode(66);
responseObserver.onNext(builder1.build());
responseObserver.onCompleted();
}
};
}

@Override
public StreamObserver<MeProto.MeHelloRequest1> bothStream(StreamObserver<MeProto.MeHelloResponse1> responseObserver) {
return new StreamObserver<MeProto.MeHelloRequest1>() {
@Override
public void onNext(MeProto.MeHelloRequest1 meHelloRequest1) {
System.out.println("MeHelloServiceImpl.onNext");
MeProto.MeHelloResponse1.Builder builder = MeProto.MeHelloResponse1.newBuilder();
builder.setResult("message:" + meHelloRequest1.getName());
responseObserver.onNext(builder.build());
}

@Override
public void onError(Throwable throwable) {

}

@Override
public void onCompleted() {
System.out.println("MeHelloServiceImpl.onCompleted");
responseObserver.onCompleted();
}

};
}


@Override
public void repeatedunary(MeProto.MeHelloRequest2 request, StreamObserver<MeProto.MeHelloResponse2> responseObserver) {
ProtocolStringList nameList = request.getNameList();
nameList.forEach(System.out::println);
ArrayList<String> values = new ArrayList<>();
values.add("a");
values.add("b");
MeProto.MeHelloResponse2 build = MeProto.MeHelloResponse2.newBuilder().addAllResult(values).build();
responseObserver.onNext(build);
responseObserver.onCompleted();
}
}

一元访问

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.suns;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

import java.util.concurrent.TimeUnit;


public class MeClient_1_unary {
public static void main(String[] args) {
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();

try {
MeHelloServiceGrpc.MeHelloServiceBlockingStub newBlockingStub = MeHelloServiceGrpc.newBlockingStub(managedChannel);
MeProto.MeHelloRequest1 xixi = MeProto.MeHelloRequest1.newBuilder().setName("xixi").setAge(223).build();

MeProto.MeHelloResponse1 meHelloResponse1 = newBlockingStub.meUnary(xixi);
String result = meHelloResponse1.getResult();
int code = meHelloResponse1.getCode();
System.out.println("result = " + result);
System.out.println("code = " + code);
TimeUnit.SECONDS.sleep(5);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
managedChannel.shutdown();
}

}
}

服务器流式响应-阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.suns;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

import java.time.LocalDateTime;
import java.util.Iterator;


public class MeClient_2_serverStream {
public static void main(String[] args) {
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();

try {
MeHelloServiceGrpc.MeHelloServiceBlockingStub newBlockingStub = MeHelloServiceGrpc.newBlockingStub(managedChannel);
MeProto.MeHelloRequest1 xixi = MeProto.MeHelloRequest1.newBuilder().setName("xixi").setAge(223).build();

Iterator<MeProto.MeHelloResponse1> meHelloResponse1Iterator = newBlockingStub.serverStream(xixi);
while (meHelloResponse1Iterator.hasNext()) {
MeProto.MeHelloResponse1 next = meHelloResponse1Iterator.next();
System.out.println(next.getCode());
System.out.println(next.getResult());
System.out.println("=================="+ LocalDateTime.now());
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
managedChannel.shutdown();
}

}
}

服务器流式响应-异步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.suns;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;


public class MeClient_3_serverStream_Stub {
public static void main(String[] args) {
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();

try {
MeHelloServiceGrpc.MeHelloServiceStub serviceStub = MeHelloServiceGrpc.newStub(managedChannel);
MeProto.MeHelloRequest1 xixi = MeProto.MeHelloRequest1.newBuilder().setName("xixi").setAge(223).build();
serviceStub.serverStream(xixi, new StreamObserver<MeProto.MeHelloResponse1>() {
@Override
public void onNext(MeProto.MeHelloResponse1 next) {
System.out.println(next.getCode());
System.out.println(next.getResult());
System.out.println("==================" + LocalDateTime.now());
}

@Override
public void onError(Throwable throwable) {
System.out.println(throwable.getMessage());
}

@Override
public void onCompleted() {
System.out.println("结束时间 = " + LocalDateTime.now());
}
});
managedChannel.awaitTermination(12, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
managedChannel.shutdown();
}

}
}

客户端流式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.suns;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

import java.util.concurrent.TimeUnit;


public class MeClient_4_clientStream {
public static void main(String[] args) {
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();

try {
MeHelloServiceGrpc.MeHelloServiceStub serviceStub = MeHelloServiceGrpc.newStub(managedChannel);

StreamObserver<MeProto.MeHelloRequest1> observer = serviceStub.clientStream(new StreamObserver<MeProto.MeHelloResponse1>() {
@Override
public void onNext(MeProto.MeHelloResponse1 meHelloResponse1) {
System.out.println("MeHelloClient_4_clientStream.onNext");
System.out.println("meHelloResponse1.getResult() = " + meHelloResponse1.getResult());
}

@Override
public void onError(Throwable throwable) {

}

@Override
public void onCompleted() {
System.out.println("MeHelloClient_4_clientStream.onCompleted");
}
});
for (int i = 0; i < 10; i++) {
MeProto.MeHelloRequest1 xixi = MeProto.MeHelloRequest1.newBuilder().setName("xixi" + i).setAge(223).build();
TimeUnit.SECONDS.sleep(1);
observer.onNext(xixi);
}
observer.onCompleted();
TimeUnit.SECONDS.sleep(20);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
managedChannel.shutdown();
}

}
}

双端流式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.suns;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

import java.util.concurrent.TimeUnit;


public class MeClient_5_bothStream {
public static void main(String[] args) {
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();

try {
MeHelloServiceGrpc.MeHelloServiceStub serviceStub = MeHelloServiceGrpc.newStub(managedChannel);

StreamObserver<MeProto.MeHelloRequest1> observer = serviceStub.bothStream(new StreamObserver<MeProto.MeHelloResponse1>() {
@Override
public void onNext(MeProto.MeHelloResponse1 meHelloResponse1) {
System.out.println("MeHelloClient_5_clientStream.onNext");
System.out.println("meHelloResponse1.getResult() = " + meHelloResponse1.getResult());
}

@Override
public void onError(Throwable throwable) {

}

@Override
public void onCompleted() {
System.out.println("MeHelloClient_5_clientStream.onCompleted");
}
});
for (int i = 0; i < 10; i++) {
MeProto.MeHelloRequest1 xixi = MeProto.MeHelloRequest1.newBuilder().setName("xixi" + i).setAge(223).build();
TimeUnit.SECONDS.sleep(1);
observer.onNext(xixi);
}
observer.onCompleted();
TimeUnit.SECONDS.sleep(20);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
managedChannel.shutdown();
}

}
}

一元将来时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.suns;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class MeClient_6_unary_future {
public static void main(String[] args) {
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();
try {

MeHelloServiceGrpc.MeHelloServiceFutureStub serviceStub = MeHelloServiceGrpc.newFutureStub(managedChannel);

ListenableFuture<MeProto.MeHelloResponse1> future = serviceStub.meUnary(MeProto.MeHelloRequest1.newBuilder().setName("future").build());



//同步操作
// MeProto.MeHelloResponse1 meHelloResponse1 = future.get();
// System.out.println(meHelloResponse1.getResult());

// 仅仅知道响应回来了,无法获取到 响应结果
// future.addListener(() -> System.out.println("异步的rpc响应 回来了...."), Executors.newCachedThreadPool());



Futures.addCallback(future, new FutureCallback<MeProto.MeHelloResponse1>() {
@Override
public void onSuccess(MeProto.MeHelloResponse1 meHelloResponse1) {
System.out.println("MeClient_6_Future.onSuccess");
System.out.println(meHelloResponse1.getResult());
}

@Override
public void onFailure(Throwable throwable) {
System.out.println("MeClient_6_Future.onSuccess");
System.out.println(throwable.getMessage());
}
}, Executors.newCachedThreadPool());

System.out.println("后续的操作....");

managedChannel.awaitTermination(40, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
} finally {
managedChannel.shutdown();
}
}
}

一元重复键

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.suns;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;

public class MeClient_7_unary_repeated {
public static void main(String[] args) {
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();
try {

MeHelloServiceGrpc.MeHelloServiceBlockingStub serviceStub = MeHelloServiceGrpc.newBlockingStub(managedChannel);

MeProto.MeHelloRequest2.Builder future1 = MeProto.MeHelloRequest2.newBuilder();
future1.addAllName(Arrays.asList("c", "d"));
MeProto.MeHelloResponse2 repeatedunary = serviceStub.repeatedunary(future1.build());

repeatedunary.getResultList().forEach(System.out::println);

managedChannel.awaitTermination(40, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
} finally {
managedChannel.shutdown();
}
}
}

参考链接:

手把手教大家在 gRPC 中使用 JWT 完成身份校验

https://blog.csdn.net/u012702547/article/details/129163995