1 module kissrpc.RpcClient; 2 3 import kissrpc.RpcRequest; 4 import kissrpc.Unit; 5 import kissrpc.RpcResponse; 6 import kissrpc.RpcBinaryPackage; 7 import kissrpc.RpcClientSocket; 8 import kissrpc.RpcEventInterface; 9 import kissrpc.RpcPackageBase; 10 import kissrpc.RpcSocketBaseInterface; 11 import kissrpc.RpcEventInterface; 12 import kissrpc.RpcSendPackageManage; 13 import kissrpc.Logs; 14 15 import kiss.aio.AsynchronousChannelSelector; 16 import kiss.aio.ByteBuffer; 17 18 import std.parallelism; 19 import std.stdio; 20 import std.experimental.logger.core; 21 import std.format; 22 23 alias ReponsCallback = void delegate(RpcResponse); 24 25 26 class RpcClient:RpcEventInterface{ 27 28 this(ClientSocketEventInterface socketEvent) 29 { 30 clientSocketEvent = socketEvent; 31 sendPackManage = new RpcSendPackageManage(this); 32 defaultPoolThreads = RPC_CLIENT_DEFAULT_THREAD_POOL; 33 compressType = RPC_PACKAGE_COMPRESS_TYPE.RPCT_NO; 34 } 35 int getFd() { 36 return clientSocket.getFd(); 37 } 38 39 void bind(string className, string funcName) 40 { 41 string key = className ~ "." ~ funcName; 42 deWritefln("rpc client bind:%s", key); 43 } 44 45 void bindCallback(size_t funcId, ReponsCallback callback) 46 { 47 rpcCallbackMap[funcId] = callback; 48 } 49 50 51 bool requestRemoteCall(RpcRequest req, RPC_PACKAGE_PROTOCOL protocol) 52 { 53 packMessageCount++; 54 req.setSequence(packMessageCount); 55 req.setSocket(clientSocket); 56 57 if(req.getCompressType == RPC_PACKAGE_COMPRESS_TYPE.RPCT_NO) 58 { 59 req.setCompressType(this.compressType); 60 } 61 62 deWritefln("rpc client request remote call:%s, id:%s", req.getCallFuncName(), req.getCallFuncId); 63 return sendPackManage.add(req); 64 } 65 66 void rpcRecvPackageEvent(RpcSocketBaseInterface socket, RpcBinaryPackage pack) 67 { 68 69 if(sendPackManage.remove(pack.getSequenceId)) 70 { 71 log(format("client recv package event, hander len:%s, package size:%s, ver:%s, func id:%s, sequence id:%s, body size:%s, compress:%s", 72 pack.getHanderSize, pack.getPackgeSize, pack.getVersion, pack.getFuncId, pack.getSequenceId, pack.getBodySize, pack.getCompressType)); 73 74 75 RpcPackageBase packageBase; 76 77 switch(pack.getSerializedType) 78 { 79 case RPC_PACKAGE_PROTOCOL.TPP_JSON:break; 80 case RPC_PACKAGE_PROTOCOL.TPP_XML: break; 81 case RPC_PACKAGE_PROTOCOL.TPP_PROTO_BUF: break; 82 case RPC_PACKAGE_PROTOCOL.TPP_FLAT_BUF: break; 83 case RPC_PACKAGE_PROTOCOL.TPP_CAPNP_BUF: break; 84 85 default: 86 logWarning("unpack serialized type is failed!, type:%d", pack.getSerializedType()); 87 } 88 89 auto rpcResp = new RpcRequest(socket); 90 91 rpcResp.setSequence(pack.getSequenceId()); 92 rpcResp.setNonblock(pack.getNonblock()); 93 rpcResp.setCompressType(pack.getCompressType); 94 rpcResp.bindFunc(pack.getFuncId()); 95 rpcResp.push(pack.getPayload()); 96 97 if(pack.getStatusCode != RPC_PACKAGE_STATUS_CODE.RPSC_OK) 98 { 99 logWarning("server recv binary package is failed, hander len:%s, package size:%s, ver:%s, sequence id:%s, body size:%s, status code:%s", 100 pack.getHanderSize, pack.getPackgeSize, pack.getVersion, pack.getSequenceId, pack.getBodySize, pack.getStatusCode); 101 102 rpcResp.setStatus(RESPONSE_STATUS.RS_FAILD); 103 104 }else 105 { 106 rpcResp.setStatus(RESPONSE_STATUS.RS_OK); 107 log(format("rpc server response call, func:%s, arg num:%s", rpcResp.getCallFuncName(), rpcResp.getArgsNum())); 108 } 109 110 if(pack.getNonblock) 111 { 112 log(format("async call from rpc server response, func:%s, arg num:%s", rpcResp.getCallFuncName(), rpcResp.getArgsNum())); 113 }else 114 { 115 log(format("sync call from rpc server response, func:%s, arg num:%s", rpcResp.getCallFuncName(), rpcResp.getArgsNum())); 116 } 117 118 auto callback = rpcCallbackMap.get(rpcResp.getCallFuncId, null); 119 120 if(callback !is null) 121 { 122 callback(rpcResp); 123 }else 124 { 125 logError("server rpc call function is not bind, function name:%s", rpcResp.getCallFuncName); 126 } 127 128 }else 129 { 130 logWarning("Accept error, client recv response failed, package is timeout!!!, hander len:%s, package size:%s, ver:%s, sequence id:%s, body size:%s", 131 pack.getHanderSize, pack.getPackgeSize, pack.getVersion, pack.getSequenceId, pack.getBodySize); 132 } 133 } 134 135 void rpcSendPackageEvent(RpcResponse rpcResp) 136 { 137 switch(rpcResp.getStatus) 138 { 139 case RESPONSE_STATUS.RS_TIMEOUT: 140 logWarning("response timeout, func:%s, start time:%s, time:%s, sequence:%s", rpcResp.getCallFuncName, rpcResp.getTimestamp, rpcResp.getTimeout, rpcResp.getSequence); 141 break; 142 143 case RESPONSE_STATUS.RS_FAILD: 144 logWarning("request failed, func:%s, start time:%s, time:%s, sequence:%s", rpcResp.getCallFuncName, rpcResp.getTimestamp, rpcResp.getTimeout, rpcResp.getSequence); 145 break; 146 147 default: 148 logWarning("rpc send package event is fatal!!, event type error!"); 149 } 150 151 auto callback = rpcCallbackMap.get(rpcResp.getCallFuncId, null); 152 153 if(callback !is null) 154 { 155 callback(rpcResp); 156 }else 157 { 158 logError("server rpc call function is not bind, function name:%s", rpcResp.getCallFuncName); 159 } 160 } 161 162 163 void socketEvent(RpcSocketBaseInterface socket, const SOCKET_STATUS status,const string statusStr) 164 { 165 // logInfo("client socket status info:%s", statusStr); 166 switch(status) 167 { 168 case SOCKET_STATUS.SE_CONNECTD: 169 auto t = task(&clientSocketEvent.connectd, socket); 170 taskPool.put(t); 171 break; 172 173 case SOCKET_STATUS.SE_DISCONNECTD: 174 auto t = task(&clientSocketEvent.disconnectd, socket); 175 taskPool.put(t); 176 break; 177 178 case SOCKET_STATUS.SE_READ_FAILED : 179 auto t = task(&clientSocketEvent.readFailed, socket); 180 taskPool.put(t); 181 break; 182 183 case SOCKET_STATUS.SE_WRITE_FAILED: 184 auto t = task(&clientSocketEvent.writeFailed, socket); 185 taskPool.put(t); 186 break; 187 188 default: 189 logError("client socket status is fatal!!", statusStr); 190 return; 191 } 192 193 } 194 195 void connect(string ip, ushort port, AsynchronousChannelSelector sel) 196 { 197 clientSocket = new RpcClientSocket(ip, port, sel, this); 198 } 199 200 bool write(byte[] data) { 201 return clientSocket.write(data); 202 } 203 204 void reConnect() 205 { 206 clientSocket.reConnect(); 207 } 208 209 ulong getWaitResponseNum() 210 { 211 return sendPackManage.getWaitResponseNum; 212 } 213 214 void setSocketCompress(RPC_PACKAGE_COMPRESS_TYPE type) 215 { 216 compressType = type; 217 } 218 219 private: 220 RpcSendPackageManage sendPackManage; 221 RPC_PACKAGE_COMPRESS_TYPE compressType; 222 223 ReponsCallback[size_t] rpcCallbackMap; 224 225 ulong packMessageCount; 226 227 RpcClientSocket clientSocket; 228 ClientSocketEventInterface clientSocketEvent; 229 }