1 module kissrpc.RpcClient; 2 3 import kissrpc.RpcRequest; 4 import kissrpc.Unit; 5 import kissrpc.RpcResponse; 6 import kissrpc.RpcBinaryPackage; 7 import kissrpc.RpcClientSocket; 8 import kissrpc.RpcEventInterface; 9 import kissrpc.RpcPackageBase; 10 import kissrpc.RpcSocketBaseInterface; 11 import kissrpc.RpcEventInterface; 12 import kissrpc.RpcSendPackageManage; 13 import kissrpc.Logs; 14 15 import kiss.aio.AsynchronousChannelSelector; 16 import kiss.aio.ByteBuffer; 17 18 import std.parallelism; 19 import std.stdio; 20 import std.experimental.logger.core; 21 22 23 alias ReponsCallback = void delegate(RpcResponse); 24 25 26 class RpcClient:RpcEventInterface{ 27 28 this(ClientSocketEventInterface socketEvent) 29 { 30 clientSocketEvent = socketEvent; 31 sendPackManage = new RpcSendPackageManage(this); 32 defaultPoolThreads = RPC_CLIENT_DEFAULT_THREAD_POOL; 33 compressType = RPC_PACKAGE_COMPRESS_TYPE.RPCT_NO; 34 } 35 36 void bind(string className, string funcName) 37 { 38 string key = className ~ "." ~ funcName; 39 deWritefln("rpc client bind:%s", key); 40 } 41 42 void bindCallback(size_t funcId, ReponsCallback callback) 43 { 44 rpcCallbackMap[funcId] = callback; 45 } 46 47 48 bool requestRemoteCall(RpcRequest req, RPC_PACKAGE_PROTOCOL protocol) 49 { 50 packMessageCount++; 51 req.setSequence(packMessageCount); 52 req.setSocket(clientSocket); 53 54 if(req.getCompressType == RPC_PACKAGE_COMPRESS_TYPE.RPCT_NO) 55 { 56 req.setCompressType(this.compressType); 57 } 58 59 deWritefln("rpc client request remote call:%s, id:%s", req.getCallFuncName(), req.getCallFuncId); 60 return sendPackManage.add(req); 61 } 62 63 void rpcRecvPackageEvent(RpcSocketBaseInterface socket, RpcBinaryPackage pack) 64 { 65 66 if(sendPackManage.remove(pack.getSequenceId)) 67 { 68 deWritefln("client recv package event, hander len:%s, package size:%s, ver:%s, func id:%s, sequence id:%s, body size:%s, compress:%s", 69 pack.getHanderSize, pack.getPackgeSize, pack.getVersion, pack.getFuncId, pack.getSequenceId, pack.getBodySize, pack.getCompressType); 70 71 RpcPackageBase packageBase; 72 73 switch(pack.getSerializedType) 74 { 75 case RPC_PACKAGE_PROTOCOL.TPP_JSON:break; 76 case RPC_PACKAGE_PROTOCOL.TPP_XML: break; 77 case RPC_PACKAGE_PROTOCOL.TPP_PROTO_BUF: break; 78 case RPC_PACKAGE_PROTOCOL.TPP_FLAT_BUF: break; 79 case RPC_PACKAGE_PROTOCOL.TPP_CAPNP_BUF: break; 80 81 default: 82 logWarning("unpack serialized type is failed!, type:%d", pack.getSerializedType()); 83 } 84 85 auto rpcResp = new RpcRequest(socket); 86 87 rpcResp.setSequence(pack.getSequenceId()); 88 rpcResp.setNonblock(pack.getNonblock()); 89 rpcResp.setCompressType(pack.getCompressType); 90 rpcResp.bindFunc(pack.getFuncId()); 91 rpcResp.push(pack.getPayload()); 92 93 if(pack.getStatusCode != RPC_PACKAGE_STATUS_CODE.RPSC_OK) 94 { 95 logWarning("server recv binary package is failed, hander len:%s, package size:%s, ver:%s, sequence id:%s, body size:%s, status code:%s", 96 pack.getHanderSize, pack.getPackgeSize, pack.getVersion, pack.getSequenceId, pack.getBodySize, pack.getStatusCode); 97 98 rpcResp.setStatus(RESPONSE_STATUS.RS_FAILD); 99 100 }else 101 { 102 rpcResp.setStatus(RESPONSE_STATUS.RS_OK); 103 deWritefln("rpc server response call, func:%s, arg num:%s", rpcResp.getCallFuncName(), rpcResp.getArgsNum()); 104 } 105 106 if(pack.getNonblock) 107 { 108 deWritefln("async call from rpc server response, func:%s, arg num:%s", rpcResp.getCallFuncName(), rpcResp.getArgsNum()); 109 }else 110 { 111 deWritefln("sync call from rpc server response, func:%s, arg num:%s", rpcResp.getCallFuncName(), rpcResp.getArgsNum()); 112 } 113 114 auto callback = rpcCallbackMap.get(rpcResp.getCallFuncId, null); 115 116 if(callback !is null) 117 { 118 callback(rpcResp); 119 }else 120 { 121 logError("server rpc call function is not bind, function name:%s", rpcResp.getCallFuncName); 122 } 123 124 }else 125 { 126 logWarning("Accept error, client recv response failed, package is timeout!!!, hander len:%s, package size:%s, ver:%s, sequence id:%s, body size:%s", 127 pack.getHanderSize, pack.getPackgeSize, pack.getVersion, pack.getSequenceId, pack.getBodySize); 128 } 129 } 130 131 void rpcSendPackageEvent(RpcResponse rpcResp) 132 { 133 switch(rpcResp.getStatus) 134 { 135 case RESPONSE_STATUS.RS_TIMEOUT: 136 logWarning("response timeout, func:%s, start time:%s, time:%s, sequence:%s", rpcResp.getCallFuncName, rpcResp.getTimestamp, rpcResp.getTimeout, rpcResp.getSequence); 137 break; 138 139 case RESPONSE_STATUS.RS_FAILD: 140 logWarning("request failed, func:%s, start time:%s, time:%s, sequence:%s", rpcResp.getCallFuncName, rpcResp.getTimestamp, rpcResp.getTimeout, rpcResp.getSequence); 141 break; 142 143 default: 144 logWarning("rpc send package event is fatal!!, event type error!"); 145 } 146 147 auto callback = rpcCallbackMap.get(rpcResp.getCallFuncId, null); 148 149 if(callback !is null) 150 { 151 callback(rpcResp); 152 }else 153 { 154 logError("server rpc call function is not bind, function name:%s", rpcResp.getCallFuncName); 155 } 156 } 157 158 159 void socketEvent(RpcSocketBaseInterface socket, const SOCKET_STATUS status,const string statusStr) 160 { 161 // logInfo("client socket status info:%s", statusStr); 162 switch(status) 163 { 164 case SOCKET_STATUS.SE_CONNECTD: 165 auto t = task(&clientSocketEvent.connectd, socket); 166 taskPool.put(t); 167 break; 168 169 case SOCKET_STATUS.SE_DISCONNECTD: 170 auto t = task(&clientSocketEvent.disconnectd, socket); 171 taskPool.put(t); 172 break; 173 174 case SOCKET_STATUS.SE_READ_FAILED : 175 auto t = task(&clientSocketEvent.readFailed, socket); 176 taskPool.put(t); 177 break; 178 179 case SOCKET_STATUS.SE_WRITE_FAILED: 180 auto t = task(&clientSocketEvent.writeFailed, socket); 181 taskPool.put(t); 182 break; 183 184 default: 185 logError("client socket status is fatal!!", statusStr); 186 return; 187 } 188 189 } 190 191 void connect(string ip, ushort port, AsynchronousChannelSelector sel) 192 { 193 clientSocket = new RpcClientSocket(ip, port, sel, this); 194 } 195 196 void reConnect() 197 { 198 clientSocket.reConnect(); 199 } 200 201 ulong getWaitResponseNum() 202 { 203 return sendPackManage.getWaitResponseNum; 204 } 205 206 void setSocketCompress(RPC_PACKAGE_COMPRESS_TYPE type) 207 { 208 compressType = type; 209 } 210 211 private: 212 RpcSendPackageManage sendPackManage; 213 RPC_PACKAGE_COMPRESS_TYPE compressType; 214 215 ReponsCallback[size_t] rpcCallbackMap; 216 217 ulong packMessageCount; 218 219 RpcClientSocket clientSocket; 220 ClientSocketEventInterface clientSocketEvent; 221 }