欢迎回来!
2.请求流接口
(客户端可以源源不断的给服务端传参数,服务端会源源不断的接受服务端的参数,最后在客户端完成请求的时候,服务端返回一个结果)
在.proto文件中新加一个方法,这个方法的参数被 stream 关键字修饰
rpc methodRequestStream(stream Request) returns (Result) {}
然后用maven,清理一下缓存,重新编译一下
2.1.服务端
重新编译之后,实现刚刚新加的方法
@Override public StreamObservermethodRequestStream(StreamObserver responseObserver) { return new StreamObserver () { @Override public void onNext(Request request) { System.out.print("收到了请求 \n"); } @Override public void onError(Throwable throwable) { } @Override public void onCompleted() { Result result = Result.newBuilder().setResult1("result1").setResult2("result2").build(); responseObserver.onNext(result); responseObserver.onCompleted(); } }; }
(友情提示,如果 StreamObserver 的的泛型是Result 我们就叫 返回流观察者,如果是 Request 就叫请求流观察者,这样好描述一些)
这个和普通的有点不一样,直接返回了一个 请求流观察者 的接口实现,而且方法的参数还是一个 返回流观察者 ,好像搞反了一样,至于为什么,一会在客户端那里 统一说
2.2.客户端
请求流式异步调用,普通的是同步调用,我们在普通的方法里创建的实例 也是同步的,所以我们要在 JavaGrpcClient 中新加一个 异步调用的方法,添加一个异步的实例
publicResult runAsync(Functional functional) { TestServiceGrpc.TestServiceStub testServiceStub = TestServiceGrpc.newStub(channel); return functional.run(testServiceStub); }
TestServiceGrpc.newStub 返回的是一个异步的实例
再加一个测试
@Test public void contextLoads2() { Request request = Request.newBuilder().setRequest1("test1").setRequest2("test2").build(); StreamObserverresponseObserver = new StreamObserver () { @Override public void onNext(Result result) { System.out.print("返回了结果 \n"); } @Override public void onError(Throwable throwable) { } @Override public void onCompleted() { } }; StreamObserver result = javaGrpcClient.runAsync(o -> o.methodRequestStream(responseObserver)); result.onNext(request); result.onNext(request); result.onNext(request); result.onCompleted(); try { Thread.sleep(600000); } catch (Exception ex){} }
这里我们实现了一个 返回流观察者
StreamObserverresponseObserver = new StreamObserver () { @Override public void onNext(Result result) { System.out.print("返回了结果 \n"); } @Override public void onError(Throwable throwable) { } @Override public void onCompleted() { } };
调用方法的时候,将我们实现的 返回流观察者 传进去,返回给我们一个 请求流观察者
StreamObserverresult = javaGrpcClient.runAsync(o -> o.methodRequestStream(responseObserver));
其实这里返回的 请求流观察者 就是服务端那里返回给我们的内个实现,服务端那里 返回流观察者 是我们实现的 传给他的
由于是异步调用,最后暂停一下,要不测试跑完,程序结束 开没开始就结束了
try { Thread.sleep(600000);}catch (Exception ex){}
运行起来看结果
服务端的打印
客户端的打印
这里我们发送了三次参数过去
result.onNext(request);result.onNext(request);result.onNext(request);
就相当于 服务端 那边返回的 请求流观察者 被调用了 三次 ,所以就打印了三句话
发送完参数结束请求
result.onCompleted();
服务端那里的结束请求中调用了一次我们传给他的 返回流观察者 中的 onNext 方法
所以客户端就打印了一次
这里会有人问 这里不能返回 多个吗
不能,虽然 这两个观察者 看上去一样 都是 StreamObserver 接口,但是,这个方法只是请求流调用,在grpc的内部 最后返回的时候 只返回第一个指定的返回只,不管返回了多少个,在客户端那边只会收到 第一个返回的结果
3.响应流接口
(和请求流接口完全相反,请求流是异步,响应流是同步,请求流是接受多个请求返回一个结果,响应流是接受一个请求返回多个结果)
我们在.proto文件中再增加一个方法,这回这个方法的返回值被 stream 关键字修饰
rpc methodResultStream(Request) returns (stream Result){}
清缓存,重新编译
3.1.服务端
实现刚刚新加的方法
@Override public void methodResultStream(Request request, StreamObserverresponseObserver) { System.out.print("收到了请求 \n"); Result result = Result.newBuilder().setResult1("result1").setResult2("result2").build(); responseObserver.onNext(result); responseObserver.onNext(result); try { Thread.sleep(2000); } catch (Exception ex){} responseObserver.onNext(result); responseObserver.onCompleted(); }
这里跟普通的差不多,只是我们返回了三次结果
responseObserver.onNext(result);responseObserver.onNext(result);try { Thread.sleep(2000);}catch (Exception ex){}responseObserver.onNext(result);
3.2.客户端
没啥好加的了,直接上测试
@Test public void contextLoads3() { Request request = Request.newBuilder().setRequest1("test1").setRequest2("test2").build(); Iteratorresult = javaGrpcClient.run(o -> o.methodResultStream(request)); result.forEachRemaining(o -> { System.out.print("返回了结果 \n"); }); System.out.print("结束 \n"); }
返回流请求是同步的,所以要调同步的方法,返回了一个迭代器
Iteratorresult = javaGrpcClient.run(o -> o.methodResultStream(request));
迭代器中有服务端的所有返回结果
result.forEachRemaining(o ->{ System.out.print("返回了结果 \n");});
运行结果
服务端结果
客户端结果
由于是同步调用,在forEach中会等待服务端的每一个返回结果
4.双向流接口
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
歇会,抽根烟!
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
在.proto文件中再加一个方法
rpc methodDoubleStream(stream Request) returns (stream Result){}
实现她
双向流的服务端和请求流的没啥区别,只是在接收到请求的时候没有立刻结束请求
@Override public StreamObservermethodDoubleStream(StreamObserver responseObserver) { return new StreamObserver () { @Override public void onNext(Request value) { System.out.print("收到了请求 \n"); Result result = Result.newBuilder().setResult1("result1").setResult2("result2").build(); responseObserver.onNext(result); } @Override public void onError(Throwable t) { } @Override public void onCompleted() { responseObserver.onCompleted(); } }; }
客户端也没啥区别
@Test public void contextLoads4() { Request request = Request.newBuilder().setRequest1("test1").setRequest2("test2").build(); StreamObserverresponseObserver = new StreamObserver () { @Override public void onNext(Result result) { System.out.print("返回了结果 \n"); } @Override public void onError(Throwable throwable) { } @Override public void onCompleted() { } }; StreamObserver result = javaGrpcClient.runAsync(o -> o.methodDoubleStream(responseObserver)); result.onNext(request); result.onNext(request); result.onNext(request); result.onCompleted(); try { Thread.sleep(600000); } catch (Exception ex){} }
双向流也是异步的,所以要等待
try { Thread.sleep(600000);}catch (Exception ex){}
服务端结果
客户端结果
完结!撒花!