StormDRPC流程解读

软件发布|下载排行|最新软件

当前位置:首页IT学院IT技术

StormDRPC流程解读

stillcoolme   2020-03-20 我要评论
Storm 的编程模型是一个有向无环图,模型角度决定了 Storm 的 Spout 接收到外部系统的请求,将请求数据分发给下游的 bolt 进行处理后,spout 并不能得到 bolt 的处理结果并将结果返回给外部请求。所以应用场景中 Storm 对外部系统的调用都是采用回调的方式: 1. 接收外部系统的请求,将请求得到的数据发到消息队列中,就立马响应给外部系统。 2. 然后 Storm 实时平台去消息队列中拉取数据并进行分布式并行处理,再将运算完的结果存入第三方存储介质(外部系统直接通过读取该介质获取结果)或者调用外部系统的接口将处理的结果推送出去(以回调的方式实现伪同步请求)。 但假如有一个需求:项目要接入各大银行的系统中,通过要求对方提供一个回调接口来实现同步是不可能的。必须依靠自己去实现同步请求响应,外部系统将消息发往storm实时平台,然后外部系统会阻塞,等待storm实时平台处理完后将结果返回给外部系统。这就要用到 DRPC了。 ### Strom DRPC DRPC设计目的是为了**充分利用Storm的计算能力实现高密度的并行实时计算**。通过一个 DRPC Server 负责接收 RPC 请求,并将该请求发送到 Storm 中运行的 Topology,等待接收 Topology 发送的处理结果,并将该结果返回给发送请求的客户端。 一个 DRPC请求过程:客户端程序向 DRPC Server 发送要执行的函数名称和该函数的参数。DRPC Server 将函数调用放到队列中,并用一个惟一的id标记,具备 DRPC功能的拓扑会使用一个 DRPCSpout 拉取 。Topo 计算好结果后会由一个名为 ReturnResults 的 bolt 去 连接 DRPC Server 给出对应函数调用id的结果,然后 DRPC Serve 根据 ID 找到等待中的客户端,为等待中的客户端消除阻塞,并发送结果给客户端。具体工作流程如下图所示: ![img](https://storm.apache.org/releases/current/imageshttps://img.qb5200.com/download-x/drpc-workflow.png) 从一个客户端的角度来看,一个分布式RPC调用就像是一个常规的RPC调用。只需要传输服务名和请求参数即可。 实际就是个同步的、向远程系统发送socket请求并得到远程系统处理的结果的分布式调用而已。 ### DRPC请求流程 由上面的架构图可以发现,DRPC Server 相当于一个第三方服务: 1. 负责接收外部系统的请求,将外部请求的参数存入一个先进先出的队列中,阻塞等待 Storm Topo 处理的结果。 2. Storm Topo 的 spout 中创建 socket 去连接中转程序,spout 不断拿队列中请求参数来处理。 3. spout获取到请求参数后,将参数传给下游的bolt去计算,下游的最后一层bolt计算完也创建socke去连接中转程序并将结果发送给中转程序。 4. 中转程序中阻塞的地方轮询得到结果后,就结束轮询响应给外部系统了。  DRPC Server 要同时协调三个不同的程序的请求,通过源码可知其通过定义 Thrift 接口完成了进程间的通信,下面来详解每个过程。 #### 0. Thrift接口 由于 Strom 的 drpc 是通过 thrift 框架 进行 rpc调用的,所以先查看 strom.thrift。有两个 thrift 接口: DistributedRPC 和 DistributedRPCInvocations 。 * DistributedRPC:定义 DRPC客户端 和 DRPC Server端 之间的调用方法 execute(),暴露给 业务客户端使用; * DistributedRPCInvocations:定义 DRPC Server端 和 服务端逻辑处理Topo 之间的 拉取请求参数 以及 返回结果的 方法; ```java service DistributedRPC { // 请求 drpc 方法 string execute(1: string functionName, 2: string funcArgs) throws (1: DRPCExecutionException e, 2: AuthorizationException aze); } service DistributedRPCInvocations { // 返回 业务topo 处理结果给 DRPCServer void result(1: string id, 2: string result) throws (1: AuthorizationException aze); // 业务topo 拉取 DRPCServer从客户端接收到的请求 DRPCRequest fetchRequest(1: string functionName) throws (1: AuthorizationException aze); void failRequest(1: string id) throws (1: AuthorizationException aze); void failRequestV2(1: string id, 2: DRPCExecutionException e) throws (1: AuthorizationException aze); } struct DRPCRequest { 1: required string func_args; 2: required string request_id; } ``` 需要查看对两个 thrift 接口的具体实现逻辑,只要查看接口的实现类即可,DRPC Server 中的具体实现类是 **DRPCThrift**,它同时实现了两个接口中的方法,即处理 DRPC客户端的请求,又处理 DRPC业务Topo拉请求的请求。 #### 1. execute( ) 接收客户端请求 进行 DRPC调用的第一步是 客户端调用 execute(name, args) ,DRPC Server 的 execute( ) 会对该请求做如下处理: 1. 将请求封装成 BlockingOutstandingRequest req,然后 req.getResult() 使用 req 内部的一个Semaphore 实现 acquire() 请求阻塞,直到 DRPC Server 端接到 业务topo 的结果才 release。 2. 将请求添加到队列中,等待 业务 topo进行拉取消费。 ```java public class DRPCThrift implements DistributedRPC.Iface, DistributedRPCInvocations.Iface { // 构造方法注入 private final DRPC drpc; //请求队列 ,将请求排队给业务topo消费,Waiting to be fetched ConcurrentHashMap> _queues = new ConcurrentHashMap<>(); //结果map ,用来接收结果返回给客户端,Waiting to be returned _requests = new ConcurrentHashMap<>(); @Override public String execute(String functionName, String funcArgs) throws DRPCExecutionException, AuthorizationException { return drpc.executeBlocking(functionName, funcArgs); } } public class DRPC implements AutoCloseable { public String executeBlocking(String functionName, String funcArgs) throws DRPCExecutionException, AuthorizationException { String id = nextId(); T req = BlockingOutstandingRequest .FACTORY .mkRequest(functionName, new DRPCRequest(funcArgs, id)); _requests.put(id, req); ConcurrentLinkedQueue q = getQueue(functionName); q.add(req); try { return req.getResult(); } finally { cleanup(req.getRequest().get_request_id()); } } } ``` #### 2. DRPCSpout 拉取请求 DRPCSpout 作为 thrift客户端 通过调用 fetchRequest() 拉取请求,这里需要转换一下思维,DRPCThrift 依然作为 thrift 服务端,所以 DRPCThrift 要实现两个接口。 因此 DRPCSpout 使用的是 DRPCInvocationsClient extends ThriftClient implements DistributedRPCInvocations.Iface,在 nextTuple() 中不断调用 client.fetchRequest(function); 得到 DRPC客户端 的请求来处理。 #### 3. 业务Topo链路 后面的流程就进入我们编写的业务 Topo 中了,通过 LinearDRPCTopologyBuilder 的`builder.createRemoteTopology()` 来构建线性的`drpc topo`,该`topo`的链路为:`spout -> PrepareRequest Bolt-> 用户bolt1 -> 用户bolt2 -> JoinResult Bolt -> ReturnResults Bolt` 其中 JoinResult Bolt ,用两个 Map 分别记录 PrepareRequest 收到的请求 Id,最后一个业务Topo处理后的请求 Id,这两个Id是一样的,当两个Id都在 Map中时就认为该 DRPC请求完成,则继续发送给 ReturnResults Bolt 。 最后 ReturnResults Bolt 通过调用 client.result(id, result); 用于返回 Topo 处理结果,在 DRPC 类中 returnResult() 的具体实现: 1. 从 _requests Map 拿出对应的请求,然后将 result 注入进去,同时 _sem.release(); 将信号量释放,去掉对请求的阻塞; 2. 在 DRPC 中的 req.getResult(); 将不再被阻塞,立刻将 DRPC Server 请求返回给客户端。 ```java # DRPC类 public void returnResult(String id, String result) { OutstandingRequest req = _requests.get(id); if (req != null) { req.returnResult(result); } } } # BlockingOutstandingRequest 类 public void returnResult(String result) { _result = result; _sem.release(); } ``` ### 需要注意的问题 #### 相关类职责 * DRPCServer:DRPC Server上帝类,启动用于执行 DRPC 请求的 ThriftServer 端实例,会启动两个 ThriftServer * handlerServer 用于接收 DRPC客户端的请求; * invokeServer 用于接收 求经过Topo进行业务处理后的 result,然后返回给 handlerServer; * ThriftServer:对 Thrift 框架 服务端 启动、停止 操作的封装; * DRPCThrift:DRPC Server 的业务类,包装 DRPC类的调用; * DRPC:DRPC Server 的业务类 #### thrift客户端到服务端调用链路问题 阅读源码的过程中对 DRPCSput 的 `client.fetchRequest(function);`链路不清楚,想看它的服务端业务是怎么实现的,点进去看到的是 DRPCInvocationsClient 的 fetchRequest() ,这里`c.fetchRequest(func);`居然是直接又用 thrift客户端调 fetchRequest()?看: ```java public class DRPCInvocationsClient extends ThriftClient implements DistributedRPCInvocations.Iface { // 构造方法 client.set(new DistributedRPCInvocations.Client(protocol)); // # DRPCInvocationsClient @Override public DRPCRequest fetchRequest(String func) { DistributedRPCInvocations.Client c = client.get(); if (c == null) { throw new TException("Client is not connected..."); } // 这里是真正的客户端请求,DistributedRPCInvocations.Client 是 thrift 抽象的客户端 return c.fetchRequest(func); } } ``` 小伙子,你会以为`fetchRequest(func)`是重复的 thrift客户端调用,说明你对 `DRPCInvocationsClient` 类不熟,对设计模式也不熟啊!!首先,`DRPCInvocationsClient` 和`DistributedRPCInvocations.Client`一样,都实现` DistributedRPCInvocations.Iface`,你就根据仅有的`thrift`知识,以为实现了`DistributedRPCInvocations.Iface`接口的**都是要写服务端业务逻辑的**;其实这里`DRPCInvocationsClient`只是用了**静态代理模式**,对 `DistributedRPCInvocations.Client`的代理而已,对各方法多了异常处理啊,真正的客户端请求确实是`c.fetchRequest(func);` 既然这样,那就看还有什么类是实现了` DistributedRPCInvocations.Iface`接口的,就是`c.fetchRequest(func);`对应的服务端相应逻辑,就在`DRPCThrift implements DistributedRPC.Iface, DistributedRPCInvocations.Iface`,同时实现了两个 drpc 的接口进行全部方法实现,具体逻辑在`DRPCThrift`的成员变量`DRPC`类中! ```java class DRPC { // DRPCSpout 中调用的 fetchRequest,实际调用的是这里。 public DRPCRequest fetchRequest(String functionName) throws AuthorizationException { meterFetchRequestCalls.mark(); checkAuthorizationNoLog("fetchRequest", functionName); ConcurrentLinkedQueue q = getQueue(functionName); OutstandingRequest req = q.poll(); if (req != null) { //Only log accesses that fetched something logAccess("fetchRequest", functionName); req.fetched(); DRPCRequest ret = req.getRequest(); return ret; } return NOTHING_REQUEST; } } ``` ### 参考 * [Storm同步调用之DRPC模型探讨](http://www.cnblogs.com/intsmaze/p/7602242.html) * [Thrift 服务端 IO通信模型](https://www.cnblogs.com/zaizhoumo/p/8184923.html ) * [Thrift 服务端 IO通信模型2](https://segmentfault.com/a/1190000016250234 )

Copyright 2022 版权所有 软件发布 访问手机版

声明:所有软件和文章来自软件开发商或者作者 如有异议 请与本站联系 联系我们