1 module kissrpc.RpcRecvPackageManage;
2 
3 import kissrpc.RpcBinaryPackage;
4 import kissrpc.RpcServerSocket;
5 import kissrpc.RpcEventInterface;
6 import kissrpc.RpcSocketBaseInterface;
7 import kissrpc.Unit;
8 import kissrpc.Logs;
9 
10 import std.parallelism;
11 import std.stdio;
12 import core.thread;
13 
14 class CapnprotoRecvPackage
15 {
16 	this()
17 	{
18 		binaryPackage = new RpcBinaryPackage(RPC_PACKAGE_PROTOCOL.TPP_CAPNP_BUF);
19 		hander = new ubyte[binaryPackage.getHanderSize];
20 		recvRemainBytes = hander.length;
21 	}
22 
23 
24 	ubyte[] parse(ubyte[] bytes, ref bool isOk)
25 	{
26 		ulong cpySize = bytes.length > recvRemainBytes? recvRemainBytes : bytes.length;
27 		ulong bytesPos = 0;
28 
29 		if(parseState == 0)
30 		{
31 			hander[handerPos .. handerPos + cpySize] = bytes[bytesPos .. bytesPos + cpySize];
32 
33 			handerPos += cpySize;
34 			bytesPos  += cpySize;
35 
36 			recvRemainBytes -= cpySize;
37 
38 			if(recvRemainBytes == 0)
39 			{		
40 				if(binaryPackage.fromStreamForHander(hander))
41 				{
42 					payload = new ubyte[binaryPackage.getBodySize()];
43 					recvRemainBytes = payload.length;
44 					parseState = 1;
45 
46 					return this.parse(bytes[bytesPos .. bytesPos + (bytes.length - cpySize)], isOk);
47 				}
48 			}
49 		}
50 
51 		if(parseState == 1 && recvRemainBytes > 0)
52 		{		
53 			payload[payloadPos .. payloadPos + cpySize] = bytes[bytesPos .. bytesPos + cpySize];
54 
55 			payloadPos += cpySize;
56 			bytesPos  += cpySize;
57 			recvRemainBytes -= cpySize;
58 
59 			if(recvRemainBytes == 0) 
60 			{
61 				isOk = binaryPackage.fromStreamForPayload(payload);
62 			}
63 		}
64 
65 		return bytes[bytesPos .. bytesPos + (bytes.length-cpySize)];
66 	}
67 
68 	RpcBinaryPackage getPackage()
69 	{
70 		return binaryPackage;
71 	}
72 
73 	bool checkHanderValid()
74 	{
75 		return binaryPackage.checkHanderValid;
76 	}
77 
78 	bool checkPackageValid()
79 	{
80 		return binaryPackage.checkHanderValid && payloadPos == payload.length;
81 	}
82 
83 private:
84 	ubyte[] hander;
85 	ubyte[] payload;
86 	int parseState;
87 
88 	ulong handerPos, payloadPos;
89 
90 	ulong recvRemainBytes;
91 
92 	RpcBinaryPackage binaryPackage;
93 }
94 
95 class RpcRecvPackageManage
96 {
97 	this(RpcSocketBaseInterface baseSocket, RpcEventInterface rpcDelegate)
98 	{
99 		rpcEventDelegate = rpcDelegate;
100 		socket = baseSocket;
101 	}
102 
103 
104 	void add(ubyte[] bytes)
105 	{
106 		 do{
107 				auto pack = recvPackage.get(id, new CapnprotoRecvPackage);
108 			
109 				bool parseOk = false;
110 
111 				recvPackage[id] = pack;
112 				
113 				bytes = pack.parse(bytes, parseOk);
114 			
115 				if(parseOk)
116 				{
117 						auto capnprotoPack = pack.getPackage();
118 						
119 						if(pack.checkHanderValid())
120 						{
121 							if(pack.checkPackageValid)
122 							{
123 								rpcEventDelegate.rpcRecvPackageEvent(socket, capnprotoPack);
124 								recvPackage.remove(id);
125 								id++;
126 							}else{
127 								logError("parse package check hander is error, package data:%s", bytes);
128 							}
129 						}else
130 						{
131 							capnprotoPack.setStatusCode(RPC_PACKAGE_STATUS_CODE.RPSC_FAILED);
132 							recvPackage.remove(id);
133 							rpcEventDelegate.rpcRecvPackageEvent(socket, capnprotoPack);		
134 						}
135 				}
136 			}while(bytes.length > 0);
137 	}
138 
139 private:
140 	ulong id;
141 	CapnprotoRecvPackage[ulong] recvPackage;
142 	RpcEventInterface rpcEventDelegate;
143 	RpcSocketBaseInterface socket;
144 }