博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java grpc 简单易懂 ---2
阅读量:4969 次
发布时间:2019-06-12

本文共 7292 字,大约阅读时间需要 24 分钟。

 

 

 

 

 

欢迎回来!

2.请求流接口

(客户端可以源源不断的给服务端传参数,服务端会源源不断的接受服务端的参数,最后在客户端完成请求的时候,服务端返回一个结果)

 

在.proto文件中新加一个方法,这个方法的参数被 stream 关键字修饰

rpc methodRequestStream(stream Request) returns (Result) {}

  

然后用maven,清理一下缓存,重新编译一下

 

2.1.服务端

 重新编译之后,实现刚刚新加的方法

@Override    public StreamObserver
methodRequestStream(StreamObserver
responseObserver) { return new StreamObserver
() { @Override public void onNext(Request request) { System.out.print("收到了请求 \n"); } @Override public void onError(Throwable throwable) { } @Override public void onCompleted() { Result result = Result.newBuilder().setResult1("result1").setResult2("result2").build(); responseObserver.onNext(result); responseObserver.onCompleted(); } }; }

  

(友情提示,如果 StreamObserver  的的泛型是Result 我们就叫 返回流观察者,如果是 Request 就叫请求流观察者,这样好描述一些)

这个和普通的有点不一样,直接返回了一个 请求流观察者 的接口实现,而且方法的参数还是一个 返回流观察者 ,好像搞反了一样,至于为什么,一会在客户端那里 统一说

 

2.2.客户端

请求流式异步调用,普通的是同步调用,我们在普通的方法里创建的实例 也是同步的,所以我们要在 JavaGrpcClient 中新加一个 异步调用的方法,添加一个异步的实例

public 
Result runAsync(Functional
functional) { TestServiceGrpc.TestServiceStub testServiceStub = TestServiceGrpc.newStub(channel); return functional.run(testServiceStub); }

TestServiceGrpc.newStub 返回的是一个异步的实例

 

再加一个测试

 

@Test    public void contextLoads2() {        Request request = Request.newBuilder().setRequest1("test1").setRequest2("test2").build();        StreamObserver
responseObserver = new StreamObserver
() { @Override public void onNext(Result result) { System.out.print("返回了结果 \n"); } @Override public void onError(Throwable throwable) { } @Override public void onCompleted() { } }; StreamObserver
result = javaGrpcClient.runAsync(o -> o.methodRequestStream(responseObserver)); result.onNext(request); result.onNext(request); result.onNext(request); result.onCompleted(); try { Thread.sleep(600000); } catch (Exception ex){} }

  

这里我们实现了一个 返回流观察者 

StreamObserver
responseObserver = new StreamObserver
() { @Override public void onNext(Result result) { System.out.print("返回了结果 \n"); } @Override public void onError(Throwable throwable) { } @Override public void onCompleted() { } };

  

调用方法的时候,将我们实现的 返回流观察者 传进去,返回给我们一个 请求流观察者

StreamObserver
result = javaGrpcClient.runAsync(o -> o.methodRequestStream(responseObserver));

  

其实这里返回的 请求流观察者 就是服务端那里返回给我们的内个实现,服务端那里 返回流观察者 是我们实现的 传给他的

 

由于是异步调用,最后暂停一下,要不测试跑完,程序结束 开没开始就结束了

try {    Thread.sleep(600000);}catch (Exception ex){}

  

 

运行起来看结果

服务端的打印

 

客户端的打印

 

这里我们发送了三次参数过去

result.onNext(request);result.onNext(request);result.onNext(request);

  

就相当于 服务端 那边返回的 请求流观察者 被调用了 三次 ,所以就打印了三句话

 

发送完参数结束请求

result.onCompleted();

  

服务端那里的结束请求中调用了一次我们传给他的 返回流观察者 中的 onNext 方法

所以客户端就打印了一次

 

这里会有人问 这里不能返回 多个吗

不能,虽然 这两个观察者 看上去一样 都是 StreamObserver 接口,但是,这个方法只是请求流调用,在grpc的内部 最后返回的时候 只返回第一个指定的返回只,不管返回了多少个,在客户端那边只会收到 第一个返回的结果

 

 

3.响应流接口

(和请求流接口完全相反,请求流是异步,响应流是同步,请求流是接受多个请求返回一个结果,响应流是接受一个请求返回多个结果)

 

 我们在.proto文件中再增加一个方法,这回这个方法的返回值被 stream 关键字修饰

rpc methodResultStream(Request) returns (stream Result){}

  

清缓存,重新编译

3.1.服务端

 实现刚刚新加的方法

@Override    public void methodResultStream(Request request, StreamObserver
responseObserver) { System.out.print("收到了请求 \n"); Result result = Result.newBuilder().setResult1("result1").setResult2("result2").build(); responseObserver.onNext(result); responseObserver.onNext(result); try { Thread.sleep(2000); } catch (Exception ex){} responseObserver.onNext(result); responseObserver.onCompleted(); }

  

 

这里跟普通的差不多,只是我们返回了三次结果

responseObserver.onNext(result);responseObserver.onNext(result);try {    Thread.sleep(2000);}catch (Exception ex){}responseObserver.onNext(result);

  

 

3.2.客户端

没啥好加的了,直接上测试

@Test    public void contextLoads3() {        Request request = Request.newBuilder().setRequest1("test1").setRequest2("test2").build();        Iterator
result = javaGrpcClient.run(o -> o.methodResultStream(request)); result.forEachRemaining(o -> { System.out.print("返回了结果 \n"); }); System.out.print("结束 \n"); }

  

 

返回流请求是同步的,所以要调同步的方法,返回了一个迭代器

Iterator
result = javaGrpcClient.run(o -> o.methodResultStream(request));

  

迭代器中有服务端的所有返回结果

result.forEachRemaining(o ->{    System.out.print("返回了结果 \n");});

  

运行结果

服务端结果

 

客户端结果

由于是同步调用,在forEach中会等待服务端的每一个返回结果

 

 

4.双向流接口

 --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

歇会,抽根烟!

--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

 

在.proto文件中再加一个方法

rpc methodDoubleStream(stream Request) returns (stream Result){}

  

实现

 

双向流的服务端和请求流的没啥区别,只是在接收到请求的时候没有立刻结束请求

@Override    public StreamObserver
methodDoubleStream(StreamObserver
responseObserver) { return new StreamObserver
() { @Override public void onNext(Request value) { System.out.print("收到了请求 \n"); Result result = Result.newBuilder().setResult1("result1").setResult2("result2").build(); responseObserver.onNext(result); } @Override public void onError(Throwable t) { } @Override public void onCompleted() { responseObserver.onCompleted(); } }; }

  

客户端也没啥区别

@Test    public void contextLoads4() {        Request request = Request.newBuilder().setRequest1("test1").setRequest2("test2").build();        StreamObserver
responseObserver = new StreamObserver
() { @Override public void onNext(Result result) { System.out.print("返回了结果 \n"); } @Override public void onError(Throwable throwable) { } @Override public void onCompleted() { } }; StreamObserver
result = javaGrpcClient.runAsync(o -> o.methodDoubleStream(responseObserver)); result.onNext(request); result.onNext(request); result.onNext(request); result.onCompleted(); try { Thread.sleep(600000); } catch (Exception ex){} }

  

双向流也是异步的,所以要等待

try {    Thread.sleep(600000);}catch (Exception ex){}

  

 服务端结果

 

客户端结果

 

完结!撒花!

转载于:https://www.cnblogs.com/gutousu/p/9970288.html

你可能感兴趣的文章
02号团队-团队任务3:每日立会(2018-12-05)
查看>>
SQLite移植手记1
查看>>
js05-DOM对象二
查看>>
mariadb BINLOG_FORMAT = STATEMENT 异常
查看>>
C3P0 WARN: Establishing SSL connection without server's identity verification is not recommended
查看>>
iPhone在日本最牛,在中国输得最慘
查看>>
动态方法决议 和 消息转发
查看>>
js 基础拓展
查看>>
C#生成随机数
查看>>
Android应用程序与SurfaceFlinger服务的连接过程分析
查看>>
Java回顾之多线程
查看>>
机电行业如何进行信息化建设
查看>>
9、总线
查看>>
Git 笔记 - section 1
查看>>
2018 Multi-University Training Contest 10 - Count
查看>>
HDU6203 ping ping ping
查看>>
《人人都是产品经理》书籍目录
查看>>
如何在git bash中运行mysql
查看>>
OO第三阶段总结
查看>>
构建之法阅读笔记02
查看>>