1 module kissrpc.RpcSendPackageManage;
2 
3 import kissrpc.RpcBinaryPackage;
4 import kissrpc.RpcPackageBase;
5 import kissrpc.RpcResponse;
6 import kissrpc.RpcRequest;
7 import kissrpc.RpcEventInterface;
8 import kissrpc.RpcSocketBaseInterface;
9 import kissrpc.Unit;
10 import kissrpc.Logs;
11 
12 import std.datetime;
13 import core.thread;
14 import core.memory:GC;
15 
16 import std.stdio;
17 
18  class RpcSendPackageManage:Thread
19 {
20 	this(RpcEventInterface rpc_event)
21 	{
22 		RPC_SYSTEM_TIMESTAMP = Clock.currStdTime().stdTimeToUnixTime!(long)();
23 
24 		clientEventInterface = rpc_event;
25 
26 		super(&this.threadRun);
27 		super.start();
28 
29 	}
30 
31 
32 	bool add(RpcRequest req, bool checkble = true)
33 	{
34 		synchronized(this)
35 		{
36 			auto streamBinaryPackge = new RpcBinaryPackage(RPC_PACKAGE_PROTOCOL.TPP_CAPNP_BUF, req.getSequence, req.getCompressType, req.getNonblock, req.getCallFuncId);
37 
38 			auto sendStream = streamBinaryPackge.toStream(req.getFunArgList());
39 
40 			bool isOk = req.getSocket.write(cast(byte[]) sendStream);
41 
42 			if(isOk)
43 			{
44 				if(checkble)
45 				{
46 					sendPack[req.getSequence()] = req;
47 				}
48 
49 				deWritefln("send binary stream, sequece:%s, funcId:%s, funcName:%s, length:%s",req.getSequence, req.getCallFuncId, req.getCallFuncName, sendStream.length);
50 				
51 			}else
52 			{
53 				req.setStatus(RESPONSE_STATUS.RS_FAILD);
54 				clientEventInterface.rpcSendPackageEvent(req);
55 			}
56 
57 			return isOk;
58 		}
59 	}
60 
61 	bool remove(const ulong index)
62 	{
63 		synchronized(this)
64 		{
65 			return sendPack.remove(index);
66 		}
67 	}
68 
69 
70 	ulong getWaitResponseNum()
71 	{
72 		return sendPack.length;
73 	}
74 
75 protected:
76 
77 	void threadRun()
78 	{
79 			while(this.isRunning())
80 			{
81 				synchronized(this)
82 				{
83 						RPC_SYSTEM_TIMESTAMP = Clock.currStdTime().stdTimeToUnixTime!(long)();
84 						RPC_SYSTEM_TIMESTAMP_STR = SysTime.fromUnixTime(RPC_SYSTEM_TIMESTAMP).toISOExtString();
85 						
86 						foreach(k, ref req; sendPack)
87 						{							
88 							if(req.getTimestamp() + req.getTimeout() < RPC_SYSTEM_TIMESTAMP)
89 							{
90 								req.setStatus(RESPONSE_STATUS.RS_TIMEOUT);
91 								clientEventInterface.rpcSendPackageEvent(req);
92 								this.remove(k);
93 							}
94 						}
95 				}
96 				this.sleep(dur!("msecs")(100));
97 			}
98 	}
99 
100 private:
101 	RpcRequest[ulong] sendPack;
102 	RpcEventInterface clientEventInterface;
103 }