1 module kissrpc.RpcRecvPackageManage;
2 
3 import kissrpc.RpcBinaryPackage;
4 import kissrpc.RpcServerSocket;
5 import kissrpc.RpcEventInterface;
6 import kissrpc.RpcSocketBaseInterface;
7 import kissrpc.Unit;
8 import kissrpc.Logs;
9 
10 import kissrpc.RpcClientSocket;
11 
12 import std.parallelism;
13 import std.stdio;
14 import core.thread;
15 import std.format;
16 
17 import std.experimental.logger;
18 
19 class CapnprotoRecvPackage
20 {
21 	this()
22 	{
23 		binaryPackage = new RpcBinaryPackage(RPC_PACKAGE_PROTOCOL.TPP_CAPNP_BUF);
24 		hander = new ubyte[binaryPackage.getHanderSize];
25 		recvRemainBytes = hander.length;
26 	}
27 
28 
29 	ubyte[] parse(ubyte[] bytes, ref bool isOk)
30 	{
31 		// log("parse ",bytes,", hander.length ",hander.length);
32 		ulong cpySize = bytes.length > recvRemainBytes? recvRemainBytes : bytes.length;
33 		ulong bytesPos = 0;
34 		// log("parseState ",parseState, ", recvRemainBytes ",recvRemainBytes, ", bytes.length ", bytes.length,", isOk ",isOk);
35 		if(parseState == 0)
36 		{
37 			hander[handerPos .. handerPos + cpySize] = bytes[bytesPos .. bytesPos + cpySize];
38 			// log(format("handerPos = %s, hander = %s",handerPos,hander));
39 
40 			handerPos += cpySize;
41 			bytesPos  += cpySize;
42 
43 			recvRemainBytes -= cpySize;
44 
45 			if(recvRemainBytes == 0)
46 			{		
47 				if(binaryPackage.fromStreamForHander(hander))
48 				{
49 					payload = new ubyte[binaryPackage.getBodySize()];
50 					recvRemainBytes = payload.length;
51 					parseState = 1;
52 					return this.parse(bytes[bytesPos .. bytesPos + (bytes.length - cpySize)], isOk);
53 				}
54 				else  
55 					return null;
56 			}
57 		}
58 
59 		if(parseState == 1 && recvRemainBytes >= 0)
60 		{	
61 			// log(format("payloadPos = %s, payload = %s",payloadPos,payload));	
62 			payload[payloadPos .. payloadPos + cpySize] = bytes[bytesPos .. bytesPos + cpySize];
63 
64 			payloadPos += cpySize;
65 			bytesPos  += cpySize;
66 			recvRemainBytes -= cpySize;
67 
68 			if(recvRemainBytes == 0) 
69 			{
70 				isOk = binaryPackage.fromStreamForPayload(payload);
71 				if (!isOk) {
72 					log("body check error!!!");
73 					return null;
74 				}
75 			}
76 		}
77 		
78 
79 		return bytes[bytesPos .. bytesPos + (bytes.length-cpySize)];
80 	}
81 
82 	RpcBinaryPackage getPackage()
83 	{
84 		return binaryPackage;
85 	}
86 
87 	bool checkHanderValid()
88 	{
89 		return binaryPackage.checkHanderValid;
90 	}
91 
92 	bool checkPackageValid()
93 	{
94 		return binaryPackage.checkHanderValid && payloadPos == payload.length;
95 	}
96 
97 private:
98 	ubyte[] hander;
99 	ubyte[] payload;
100 	int parseState;
101 
102 	ulong handerPos, payloadPos;
103 
104 	ulong recvRemainBytes;
105 
106 	RpcBinaryPackage binaryPackage;
107 }
108 
109 class RpcRecvPackageManage
110 {
111 	this(RpcSocketBaseInterface baseSocket, RpcEventInterface rpcDelegate)
112 	{
113 		rpcEventDelegate = rpcDelegate;
114 		socket = baseSocket;
115 	}
116 
117 
118 	void add(ubyte[] bytes)
119 	{
120 		// log("add ",bytes);
121 		do{
122 			auto pack = recvPackage.get(id, new CapnprotoRecvPackage);
123 		
124 			bool parseOk = false;
125 
126 			recvPackage[id] = pack;
127 			
128 			
129 			bytes = pack.parse(bytes, parseOk);
130 			if (bytes is null) {
131 				logError(format("parse head error !!!!"));
132 				socket.disconnect();
133 				break;
134 			}
135 
136 			// log("bytes.length ",bytes.length,", parseOk ",parseOk,", id ", id);
137 			if(parseOk)
138 			{
139 				auto capnprotoPack = pack.getPackage();
140 				
141 				// log("pack.checkHanderValid() ",pack.checkHanderValid());
142 				if(pack.checkHanderValid())
143 				{
144 					// log("pack.checkPackageValid() ",pack.checkPackageValid());
145 					if(pack.checkPackageValid)
146 					{	
147 						if (capnprotoPack.getFuncId() == 0) {
148 							logInfo("recv heart kick");
149 							if (cast(RpcClientSocket)socket !is null) {
150 
151 							}
152 							else if(cast(RpcServerSocket)socket !is null) {
153 								socket.write(cast(byte[])capnprotoPack.getHead());
154 							}
155 						}else {
156 							rpcEventDelegate.rpcRecvPackageEvent(socket, capnprotoPack);
157 						}
158 						recvPackage.remove(id);
159 						id++;
160 					}else{
161 						logError("parse package check hander is error, package data:%s", bytes);
162 						socket.disconnect();
163 						break;
164 					}
165 				}else
166 				{
167 					logError(format("parse parseOk head error !!!!"));
168 					socket.disconnect();
169 					break;	
170 				}
171 			}
172 		}while(bytes.length > 0);
173 	}
174 
175 private:
176 	ulong id;
177 	CapnprotoRecvPackage[ulong] recvPackage;
178 	RpcEventInterface rpcEventDelegate;
179 	RpcSocketBaseInterface socket;
180 }