1 module kissrpc.RpcServer; 2 3 import kissrpc.RpcRequest; 4 import kissrpc.Unit; 5 import kissrpc.RpcResponse; 6 import kissrpc.RpcBinaryPackage; 7 import kissrpc.RpcServerSocket; 8 import kissrpc.RpcEventInterface; 9 import kissrpc.RpcPackageBase; 10 import kissrpc.RpcSocketBaseInterface; 11 import kissrpc.RpcSendPackageManage; 12 import kissrpc.Logs; 13 14 // import kiss.event.GroupPoll; 15 // import kiss.aio.AsyncGroupTcpServer; 16 17 import kiss.aio.AsynchronousChannelThreadGroup; 18 import kiss.aio.AsynchronousSocketChannel; 19 import kiss.net.TcpAcceptor; 20 21 22 import std.stdio; 23 24 alias RequestCallback = void delegate(RpcRequest); 25 26 class RpcServer:TcpAcceptor, RpcEventInterface{ 27 28 this(string ip, ushort port, AsynchronousChannelThreadGroup group, ServerSocketEventInterface socketEvent) 29 { 30 serverSocketEvent = socketEvent; 31 sendPackManage = new RpcSendPackageManage(this); 32 compressType = RPC_PACKAGE_COMPRESS_TYPE.RPCT_NO; 33 34 super(ip, port, group.getWorkSelector()); 35 } 36 37 override void onAcceptCompleted(void* attachment, AsynchronousSocketChannel result) { 38 RpcServerSocket server = new RpcServerSocket(result, this); 39 } 40 override void onAcceptFailed(void* attachment) { 41 deWritefln("rpc acceptFailed"); 42 } 43 44 void bind(string className, string funcName) 45 { 46 string key = className ~ "." ~ funcName; 47 deWritefln("rpc server bind:%s", key); 48 } 49 50 void bindCallback(const size_t funcId, RequestCallback callback) 51 { 52 rpcCallbackMap[funcId] = callback; 53 deWritefln("rpc server bind callback:%s, %s, addr:%s",funcId, RpcBindFunctionMap[funcId], callback); 54 } 55 56 bool RpcResponseRemoteCall(RpcResponse resp, RPC_PACKAGE_PROTOCOL protocol) 57 { 58 if(resp.getCompressType == RPC_PACKAGE_COMPRESS_TYPE.RPCT_NO) 59 { 60 resp.setCompressType(this.compressType); 61 } 62 63 deWritefln("rpc response remote call:%s, id:%s", resp.getCallFuncName, resp.getCallFuncId); 64 return sendPackManage.add(resp, false); 65 } 66 67 68 void rpcRecvPackageEvent(RpcSocketBaseInterface socket, RpcBinaryPackage pack) 69 { 70 deWritefln("server recv package event, hander len:%s, package size:%s, ver:%s, func id:%s, sequence id:%s, body size:%s, compress:%s", 71 pack.getHanderSize, pack.getPackgeSize, pack.getVersion, pack.getFuncId, pack.getSequenceId, pack.getBodySize, pack.getCompressType); 72 73 if(pack.getStatusCode != RPC_PACKAGE_STATUS_CODE.RPSC_OK) 74 { 75 logWarning("server recv binary package is failed, hander len:%s, package size:%s, ver:%s, sequence id:%s, body size:%s, compress:%s, status code:%s", 76 pack.getHanderSize, pack.getPackgeSize, pack.getVersion, pack.getSequenceId, pack.getBodySize, pack.getCompressType, pack.getStatusCode); 77 78 }else 79 { 80 RpcPackageBase packageBase; 81 82 switch(pack.getSerializedType) 83 { 84 case RPC_PACKAGE_PROTOCOL.TPP_JSON:break; 85 case RPC_PACKAGE_PROTOCOL.TPP_XML: break; 86 case RPC_PACKAGE_PROTOCOL.TPP_PROTO_BUF: break; 87 case RPC_PACKAGE_PROTOCOL.TPP_FLAT_BUF: break; 88 case RPC_PACKAGE_PROTOCOL.TPP_CAPNP_BUF: break; 89 90 default: 91 logWarning("unpack serialized type is failed!, type:%d", pack.getSerializedType()); 92 } 93 94 auto rpcReq = new RpcRequest(socket); 95 96 rpcReq.setSequence(pack.getSequenceId()); 97 rpcReq.setNonblock(pack.getNonblock()); 98 rpcReq.setCompressType(pack.getCompressType()); 99 rpcReq.bindFunc(pack.getFuncId()); 100 rpcReq.push(pack.getPayload()); 101 102 deWritefln("rpc client request call, func:%s, arg num:%s", rpcReq.getCallFuncName(), rpcReq.getArgsNum()); 103 104 auto callback = rpcCallbackMap.get(rpcReq.getCallFuncId, null); 105 106 if(callback !is null) 107 { 108 callback(rpcReq); 109 }else 110 { 111 logError("client rpc call function is not bind, function name:%s", rpcReq.getCallFuncName); 112 } 113 114 } 115 } 116 117 void rpcSendPackageEvent(RpcResponse rpcResp) 118 { 119 switch(rpcResp.getStatus) 120 { 121 case RESPONSE_STATUS.RS_TIMEOUT: 122 logWarning("response timeout, func:%s, start time:%s, time:%s, sequence:%s", rpcResp.getCallFuncName, rpcResp.getTimestamp, rpcResp.getTimeout, rpcResp.getSequence); 123 break; 124 125 case RESPONSE_STATUS.RS_FAILD: 126 logWarning("request failed, func:%s, start time:%s, time:%s, sequence:%s", rpcResp.getCallFuncName, rpcResp.getTimestamp, rpcResp.getTimeout, rpcResp.getSequence); 127 break; 128 129 default: 130 logWarning("rpc send package event is fatal!!, event type error!"); 131 } 132 } 133 134 void socketEvent(RpcSocketBaseInterface socket, const SOCKET_STATUS status,const string statusStr) 135 { 136 logInfo("server socket status:%s", statusStr); 137 138 switch(status) 139 { 140 case SOCKET_STATUS.SE_LISTEN_FAILED: serverSocketEvent.listenFailed(statusStr); break; 141 case SOCKET_STATUS.SE_CONNECTD: serverSocketEvent.inconming(socket); break; 142 case SOCKET_STATUS.SE_DISCONNECTD:serverSocketEvent.disconnectd(socket); break; 143 case SOCKET_STATUS.SE_READ_FAILED: serverSocketEvent.readFailed(socket); break; 144 case SOCKET_STATUS.SE_WRITE_FAILED: serverSocketEvent.writeFailed(socket); break; 145 146 default: 147 logError("server socket status is fatal!!", statusStr); 148 } 149 } 150 151 152 void setSocketCompress(RPC_PACKAGE_COMPRESS_TYPE type) 153 { 154 compressType = type; 155 } 156 157 private: 158 RpcSendPackageManage sendPackManage; 159 RPC_PACKAGE_COMPRESS_TYPE compressType; 160 161 ServerSocketEventInterface serverSocketEvent; 162 RequestCallback[size_t] rpcCallbackMap; 163 }