1 module kissrpc.RpcSendPackageManage; 2 3 import kissrpc.RpcBinaryPackage; 4 import kissrpc.RpcPackageBase; 5 import kissrpc.RpcResponse; 6 import kissrpc.RpcRequest; 7 import kissrpc.RpcEventInterface; 8 import kissrpc.RpcSocketBaseInterface; 9 import kissrpc.Unit; 10 import kissrpc.Logs; 11 12 import std.datetime; 13 import core.thread; 14 import core.memory:GC; 15 16 import std.stdio; 17 18 class RpcSendPackageManage:Thread 19 { 20 this(RpcEventInterface rpc_event) 21 { 22 RPC_SYSTEM_TIMESTAMP = Clock.currStdTime().stdTimeToUnixTime!(long)(); 23 24 clientEventInterface = rpc_event; 25 26 super(&this.threadRun); 27 super.start(); 28 29 } 30 31 32 bool add(RpcRequest req, bool checkble = true) 33 { 34 synchronized(this) 35 { 36 auto streamBinaryPackge = new RpcBinaryPackage(RPC_PACKAGE_PROTOCOL.TPP_CAPNP_BUF, req.getSequence, req.getCompressType, req.getNonblock, req.getCallFuncId); 37 38 auto sendStream = streamBinaryPackge.toStream(req.getFunArgList()); 39 40 bool isOk = req.getSocket.write(cast(byte[]) sendStream); 41 42 if(isOk) 43 { 44 if(checkble) 45 { 46 sendPack[req.getSequence()] = req; 47 } 48 49 deWritefln("send binary stream, sequece:%s, funcId:%s, funcName:%s, length:%s",req.getSequence, req.getCallFuncId, req.getCallFuncName, sendStream.length); 50 51 }else 52 { 53 req.setStatus(RESPONSE_STATUS.RS_FAILD); 54 clientEventInterface.rpcSendPackageEvent(req); 55 } 56 57 return isOk; 58 } 59 } 60 61 bool remove(const ulong index) 62 { 63 synchronized(this) 64 { 65 return sendPack.remove(index); 66 } 67 } 68 69 70 ulong getWaitResponseNum() 71 { 72 return sendPack.length; 73 } 74 75 protected: 76 77 void threadRun() 78 { 79 while(this.isRunning()) 80 { 81 synchronized(this) 82 { 83 RPC_SYSTEM_TIMESTAMP = Clock.currStdTime().stdTimeToUnixTime!(long)(); 84 RPC_SYSTEM_TIMESTAMP_STR = SysTime.fromUnixTime(RPC_SYSTEM_TIMESTAMP).toISOExtString(); 85 86 foreach(k, ref req; sendPack) 87 { 88 if(req.getTimestamp() + req.getTimeout() < RPC_SYSTEM_TIMESTAMP) 89 { 90 req.setStatus(RESPONSE_STATUS.RS_TIMEOUT); 91 clientEventInterface.rpcSendPackageEvent(req); 92 this.remove(k); 93 } 94 } 95 } 96 this.sleep(dur!("msecs")(100)); 97 } 98 } 99 100 private: 101 RpcRequest[ulong] sendPack; 102 RpcEventInterface clientEventInterface; 103 }