1 module kissrpc.RpcBinaryPackage;
2 
3 import kissrpc.Endian;
4 import kissrpc.Unit;
5 import kissrpc.Logs;
6 
7 import snappy.snappy;
8 import std.stdio;
9 import std.experimental.logger;
10 import std.format;
11 
12 enum RPC_PACKAGE_STATUS_CODE
13 {
14 	RPSC_OK,
15 	RPSC_FAILED,
16 }
17 
18 
19 class RpcBinaryPackage
20 {
21 	this(RPC_PACKAGE_PROTOCOL tpp, ulong msgId = 0, RPC_PACKAGE_COMPRESS_TYPE compressType = RPC_PACKAGE_COMPRESS_TYPE.RPCT_NO, bool isNonblock = true, size_t funcId = 0)
22 	{
23 		magic = RPC_HANDER_MAGIC;
24 		ver = RPC_HANDER_VERSION;
25 		sequenceId = cast(uint)msgId;
26 		this.funcId  = funcId;
27 
28 		statusInfo |= (isNonblock ? RPC_HANDER_NONBLOCK_FLAG : 0);
29 
30 		st = cast(short)tpp;
31 
32 		st |= (compressType << 8);
33 
34 		statusInfo |= (RPC_PACKAGE_STATUS_CODE.RPSC_OK & RPC_HANDER_STATUS_CODE_FLAG);
35 
36 		handerSize = ver.sizeof + st.sizeof + statusInfo.sizeof + reserved.sizeof + funcId.sizeof + sequenceId.sizeof + bodySize.sizeof;					
37 	}
38 
39 	int getStartHanderLength()const
40 	{
41 		return magic.sizeof + handerSize.sizeof;
42 	}
43 
44 	int getHanderSize()const
45 	{
46 		return handerSize + this.getStartHanderLength();
47 	}
48 
49 	int getPackgeSize()const
50 	{
51 		return handerSize + bodySize + this.getStartHanderLength();
52 	}
53 
54 	size_t getFuncId()const
55 	{
56 		return funcId;
57 	}
58 
59 	void setFuncId(const size_t id)
60 	{
61 		funcId = id;
62 	}
63 
64 	ubyte[] getPayload()
65 	{
66 		return bodyPayload;
67 	}
68 
69 	short getVersion()const
70 	{
71 		return ver;
72 	}
73 
74 	ulong getBodySize()const
75 	{
76 		return bodySize;
77 	}
78 
79 	short getSerializedType()const
80 	{
81 		return st & RPC_HANDER_SERI_FLAG;
82 	}
83 
84 	ulong getSequenceId()const
85 	{
86 		return sequenceId;
87 	}
88 
89 	bool getNonblock()const
90 	{
91 		return cast(bool)statusInfo & RPC_HANDER_NONBLOCK_FLAG;
92 	}
93 
94 	short getStatusCode()const
95 	{
96 		return statusInfo & RPC_HANDER_STATUS_CODE_FLAG;
97 	}
98 
99 	void setStatusCode(const RPC_PACKAGE_STATUS_CODE code)
100 	{
101 		statusInfo |= (code & RPC_HANDER_STATUS_CODE_FLAG);
102 	}
103 
104 	bool getHB()const
105 	{
106 		return statusInfo & RPC_HANDER_HB_FLAG;
107 	}
108 
109 	void setHBPackage()
110 	{
111 		statusInfo |= RPC_HANDER_HB_FLAG;
112 	}
113 
114 	bool getOW()const
115 	{
116 		return cast(bool)statusInfo & RPC_HANDER_OW_FLAG;
117 	}
118 
119 	void setOWPackage()
120 	{
121 		statusInfo |= RPC_HANDER_OW_FLAG;
122 	}
123 
124 	bool getRP()const
125 	{
126 		return cast(bool)statusInfo & RPC_HANDER_RP_FLAG;
127 	}
128 
129 	void setRP()
130 	{
131 		statusInfo |= RPC_HANDER_RP_FLAG;
132 	}
133 
134 
135 	ubyte[] toStream(ubyte[] payload)
136 	{
137 		switch(this.getCompressType())
138 		{
139 			case RPC_PACKAGE_COMPRESS_TYPE.RPCT_COMPRESS:
140 				payload =cast(ubyte[]) Snappy.compress(cast(byte[])payload);
141 				st |= RPC_HANDER_COMPRESS_FLAG;
142 				break;
143 				
144 			case RPC_PACKAGE_COMPRESS_TYPE.RPCT_DYNAMIC:
145 				if(payload.length >= RPC_PACKAGE_COMPRESS_DYNAMIC_VALUE)
146 				{
147 					payload = cast(ubyte[]) Snappy.compress(cast(byte[])payload);
148 					st |= RPC_HANDER_COMPRESS_FLAG;
149 				}else
150 				{
151 					st &= ~RPC_HANDER_COMPRESS_FLAG;
152 				}
153 				break;
154 				
155 			default:break;
156 		}
157 
158 		bodySize = cast(ushort)payload.length;
159 		
160 		auto stream = new ubyte[this.getPackgeSize()];
161 		
162 		ulong pos = 0;
163 
164 
165 		pos = writeBytesPos(stream, magic,  pos);
166 		pos = writeBytePos(stream, handerSize, pos);
167 		pos = writeBytePos(stream, ver, pos);
168 
169 		pos = writeBinaryPos(stream, st, pos);
170 
171 		pos = writeBytePos(stream, statusInfo, pos);
172 		pos = writeBytesPos(stream, reserved, pos);
173 
174 		pos = writeBinaryPos(stream, funcId, pos);
175 		pos = writeBinaryPos(stream, sequenceId, pos);
176 		pos = writeBinaryPos(stream, bodySize, pos);
177 
178 		pos = writeBytesPos(stream, payload, pos);
179 
180 		return stream;
181 	}
182 
183 	bool fromStream(ubyte[] data)
184 	{
185 		ulong pos = 0;
186 
187 		try{
188 			pos = readBytesPos(data, magic, pos);
189 			pos = readBytePos(data, handerSize, pos);
190 			pos = readBytePos(data, ver, pos);
191 
192 			pos = readBinaryPos(data, st, pos);
193 
194 			pos = readBytePos(data, statusInfo, pos);
195 			pos = readBytesPos(data, reserved, pos);
196 
197 			pos = readBinaryPos(data, funcId, pos);
198 			pos = readBinaryPos(data, sequenceId, pos);
199 			pos = readBinaryPos(data, bodySize, pos);
200 			
201 			bodyPayload = data[pos .. $];
202 
203 			if(this.isCompress)
204 			{
205 				bodyPayload =cast(ubyte[]) Snappy.uncompress(cast(byte[])bodyPayload);
206 				bodySize = cast(ushort)bodyPayload.length;
207 			}
208 					
209 		}catch(Exception e)
210 		{
211 			logWarning("decode binary stream is error:%s", e.msg);
212 			return false;
213 		}
214 
215 		return true;
216 	}
217 
218 	bool fromStreamForHander(ubyte[] data)
219 	{
220 		ulong pos = 0;
221 		
222 		try{
223 
224 			pos = readBytesPos(data, magic, pos);
225 			pos = readBytePos(data, handerSize, pos);
226 			pos = readBytePos(data, ver, pos);
227 			
228 			pos = readBinaryPos(data, st, pos);
229 			
230 			pos = readBytePos(data, statusInfo, pos);
231 			pos = readBytesPos(data, reserved, pos);
232 
233 			pos = readBinaryPos(data, funcId, pos);
234 			pos = readBinaryPos(data, sequenceId, pos);
235 			pos = readBinaryPos(data, bodySize, pos);
236 
237 	
238 
239 			headData.length = getHanderSize();
240 			pos = 0;
241 			pos = writeBytesPos(headData, magic,  pos);
242 			pos = writeBytePos(headData, handerSize, pos);
243 			pos = writeBytePos(headData, ver, pos);
244 			pos = writeBinaryPos(headData, st, pos);
245 			pos = writeBytePos(headData, statusInfo, pos);
246 			pos = writeBytesPos(headData, reserved, pos);
247 			pos = writeBinaryPos(headData, funcId, pos);
248 			pos = writeBinaryPos(headData, sequenceId, pos);
249 			pos = writeBinaryPos(headData, bodySize, pos);
250 
251 			
252 
253 			log(format("magic = %s, handerSize = %s, ver = %s, st = %s, statusInfo = %s, reserved = %s, funcId = %s, sequenceId = %s, bodySize = %s",magic,handerSize,ver,st,statusInfo,reserved,funcId,sequenceId,bodySize));
254 			
255 		}catch(Exception e)
256 		{
257 			logWarning("decode binary stream for hander is error:%s", e.msg);
258 			return false;
259 		}
260 
261 		return this.checkHanderValid();
262 	}
263 
264 	ubyte[] getHead() {
265 		return headData;
266 	}
267 
268 	bool fromStreamForPayload(ubyte[] data)
269 	{
270 		try{
271 
272 			bodyPayload = data[0 .. $];
273 				
274 			if(this.isCompress)
275 			{
276 				bodyPayload =cast(ubyte[]) Snappy.uncompress(cast(byte[])bodyPayload);
277 				bodySize = cast(ushort)bodyPayload.length;
278 			}
279 
280 		}catch(Exception e)
281 		{
282 			logWarning("decode body stream is error:%s", e.msg);
283 			return false;
284 		}
285 
286 		return true;
287 	}
288 
289 
290 	bool checkHanderValid()
291 	{
292 		return magic == RPC_HANDER_MAGIC && ver == RPC_HANDER_VERSION && this.getPackgeSize <= RPC_PACKAGE_MAX;
293 	}
294 
295 	RPC_PACKAGE_COMPRESS_TYPE getCompressType()
296 	{
297 		return cast(RPC_PACKAGE_COMPRESS_TYPE)((st & RPC_HANDER_CPNPRESS_TYPE_FLAG)>>8);
298 	}
299 
300 	bool isCompress()
301 	{
302 		return cast(bool)(st & RPC_HANDER_COMPRESS_FLAG);
303 	}
304 
305 protected:
306 
307 	ulong writeBinaryPos(T)(ubyte[] data, T t, ulong pos)
308 	{
309 		T bits= hostToNet(t);
310 		data[pos .. pos + t.sizeof ] = (cast(ubyte*)&bits)[0 .. t.sizeof];
311 		return pos + t.sizeof;
312 	}
313 
314 	ulong writeBytesPos(ubyte[] data, ubyte[] bytes, ulong pos)
315 	{
316 		data[pos .. pos + bytes.length] = bytes[0 .. bytes.length];
317 		return pos + bytes.length;
318 	}
319 
320 	ulong writeBytePos(ubyte[] data, ubyte abyte, ulong pos)
321 	{
322 		data[pos .. pos + abyte.sizeof] = abyte;
323 		return pos + abyte.sizeof;
324 	}
325 
326 	ulong readBinaryPos(T)(ubyte[] data, ref T t, ulong pos)
327 	{
328 		IntBuf!(T) bits;
329 		bits.bytes = data[pos .. pos + t.sizeof];
330 		t = netToHost(bits.value);
331 	
332 		return pos + t.sizeof;
333 	}
334 
335 	ulong readBytesPos(ubyte[] data, ubyte[] bytes, ulong pos)
336 	{
337 		bytes[0 .. $] = data[pos .. pos + bytes.length];
338 
339 		return pos + bytes.length;
340 	}
341 
342 	ulong readBytePos(ubyte[] data, ref ubyte abyte, ulong pos)
343 	{
344 		abyte = (data[pos .. pos + abyte.sizeof])[0];
345 		return pos + abyte.sizeof;
346 	}
347 
348 
349 private:
350 	ubyte[2] magic;
351 
352 	ubyte handerSize;
353 	ubyte ver;
354 	short st;
355 
356 	ubyte statusInfo; // [nb:1bit, ow:1bit, rp:1bit, nonblock:1bit, statusCode:4bit] 
357 
358 	ubyte[2] reserved;
359 	uint sequenceId;
360 	size_t funcId;
361 	ushort bodySize;
362 
363 	ubyte[] bodyPayload;
364 	ubyte[] headData;
365 }
366 
367 
368 unittest{
369 	import std.stdio;
370 
371 	auto send_pkg  = new RpcBinaryPackage(RPC_PACKAGE_PROTOCOL.TPP_CAPNP_BUF, 0, RPC_PACKAGE_COMPRESS_TYPE.RPCT_DYNAMIC);
372 	auto send_data = "aaaaaaaabbbbbbbbbbbbbbcccccccccccccccdddddddddddddddddddd";
373 
374 	writeln("-----------------------------------------------------");
375 
376 	auto snd_stream = send_pkg.toStream(cast(ubyte[])send_data);
377 
378 	writefln("send stream, length:%s, compress:%s, data:%s", snd_stream.length, send_pkg.getCompressType, snd_stream);
379 
380 	auto recvPkg = new RpcBinaryPackage(RPC_PACKAGE_PROTOCOL.TPP_CAPNP_BUF);
381 
382 	recvPkg.fromStream(snd_stream);
383 
384 	writeln("----------------------------------------------------");
385 
386 	writefln("recv stream, length:%s, compress:%s, data:%s", recvPkg.getPackgeSize(), recvPkg.getCompressType, recvPkg.getPayload());
387 
388 }