1 module kissrpc.RpcBinaryPackage;
2 
3 import kissrpc.Endian;
4 import kissrpc.Unit;
5 import kissrpc.Logs;
6 
7 import snappy.snappy;
8 import std.stdio;
9 
10 enum RPC_PACKAGE_STATUS_CODE
11 {
12 	RPSC_OK,
13 	RPSC_FAILED,
14 }
15 
16 
17 class RpcBinaryPackage
18 {
19 	this(RPC_PACKAGE_PROTOCOL tpp, ulong msgId = 0, RPC_PACKAGE_COMPRESS_TYPE compressType = RPC_PACKAGE_COMPRESS_TYPE.RPCT_NO, bool isNonblock = true, size_t funcId = 0)
20 	{
21 		magic = RPC_HANDER_MAGIC;
22 		ver = RPC_HANDER_VERSION;
23 		sequenceId = cast(uint)msgId;
24 		this.funcId  = funcId;
25 
26 		statusInfo |= (isNonblock ? RPC_HANDER_NONBLOCK_FLAG : 0);
27 
28 		st = cast(short)tpp;
29 
30 		st |= (compressType << 8);
31 
32 		statusInfo |= (RPC_PACKAGE_STATUS_CODE.RPSC_OK & RPC_HANDER_STATUS_CODE_FLAG);
33 
34 		handerSize = ver.sizeof + st.sizeof + statusInfo.sizeof + reserved.sizeof + funcId.sizeof + sequenceId.sizeof + bodySize.sizeof;					
35 	}
36 
37 	int getStartHanderLength()const
38 	{
39 		return magic.sizeof + handerSize.sizeof;
40 	}
41 
42 	int getHanderSize()const
43 	{
44 		return handerSize + this.getStartHanderLength();
45 	}
46 
47 	int getPackgeSize()const
48 	{
49 		return handerSize + bodySize + this.getStartHanderLength();
50 	}
51 
52 	size_t getFuncId()const
53 	{
54 		return funcId;
55 	}
56 
57 	void setFuncId(const size_t id)
58 	{
59 		funcId = id;
60 	}
61 
62 	ubyte[] getPayload()
63 	{
64 		return bodyPayload;
65 	}
66 
67 	short getVersion()const
68 	{
69 		return ver;
70 	}
71 
72 	ulong getBodySize()const
73 	{
74 		return bodySize;
75 	}
76 
77 	short getSerializedType()const
78 	{
79 		return st & RPC_HANDER_SERI_FLAG;
80 	}
81 
82 	ulong getSequenceId()const
83 	{
84 		return sequenceId;
85 	}
86 
87 	bool getNonblock()const
88 	{
89 		return cast(bool)statusInfo & RPC_HANDER_NONBLOCK_FLAG;
90 	}
91 
92 	short getStatusCode()const
93 	{
94 		return statusInfo & RPC_HANDER_STATUS_CODE_FLAG;
95 	}
96 
97 	void setStatusCode(const RPC_PACKAGE_STATUS_CODE code)
98 	{
99 		statusInfo |= (code & RPC_HANDER_STATUS_CODE_FLAG);
100 	}
101 
102 	bool getHB()const
103 	{
104 		return statusInfo & RPC_HANDER_HB_FLAG;
105 	}
106 
107 	void setHBPackage()
108 	{
109 		statusInfo |= RPC_HANDER_HB_FLAG;
110 	}
111 
112 	bool getOW()const
113 	{
114 		return cast(bool)statusInfo & RPC_HANDER_OW_FLAG;
115 	}
116 
117 	void setOWPackage()
118 	{
119 		statusInfo |= RPC_HANDER_OW_FLAG;
120 	}
121 
122 	bool getRP()const
123 	{
124 		return cast(bool)statusInfo & RPC_HANDER_RP_FLAG;
125 	}
126 
127 	void setRP()
128 	{
129 		statusInfo |= RPC_HANDER_RP_FLAG;
130 	}
131 
132 
133 	ubyte[] toStream(ubyte[] payload)
134 	{
135 		switch(this.getCompressType())
136 		{
137 			case RPC_PACKAGE_COMPRESS_TYPE.RPCT_COMPRESS:
138 				payload =cast(ubyte[]) Snappy.compress(cast(byte[])payload);
139 				st |= RPC_HANDER_COMPRESS_FLAG;
140 				break;
141 				
142 			case RPC_PACKAGE_COMPRESS_TYPE.RPCT_DYNAMIC:
143 				if(payload.length >= RPC_PACKAGE_COMPRESS_DYNAMIC_VALUE)
144 				{
145 					payload = cast(ubyte[]) Snappy.compress(cast(byte[])payload);
146 					st |= RPC_HANDER_COMPRESS_FLAG;
147 				}else
148 				{
149 					st &= ~RPC_HANDER_COMPRESS_FLAG;
150 				}
151 				break;
152 				
153 			default:break;
154 		}
155 
156 		bodySize = cast(ushort)payload.length;
157 		
158 		auto stream = new ubyte[this.getPackgeSize()];
159 		
160 		ulong pos = 0;
161 
162 
163 		pos = writeBytesPos(stream, magic,  pos);
164 		pos = writeBytePos(stream, handerSize, pos);
165 		pos = writeBytePos(stream, ver, pos);
166 
167 		pos = writeBinaryPos(stream, st, pos);
168 
169 		pos = writeBytePos(stream, statusInfo, pos);
170 		pos = writeBytesPos(stream, reserved, pos);
171 
172 		pos = writeBinaryPos(stream, funcId, pos);
173 		pos = writeBinaryPos(stream, sequenceId, pos);
174 		pos = writeBinaryPos(stream, bodySize, pos);
175 
176 		pos = writeBytesPos(stream, payload, pos);
177 
178 		return stream;
179 	}
180 
181 	bool fromStream(ubyte[] data)
182 	{
183 		ulong pos = 0;
184 
185 		try{
186 			pos = readBytesPos(data, magic, pos);
187 			pos = readBytePos(data, handerSize, pos);
188 			pos = readBytePos(data, ver, pos);
189 
190 			pos = readBinaryPos(data, st, pos);
191 
192 			pos = readBytePos(data, statusInfo, pos);
193 			pos = readBytesPos(data, reserved, pos);
194 
195 			pos = readBinaryPos(data, funcId, pos);
196 			pos = readBinaryPos(data, sequenceId, pos);
197 			pos = readBinaryPos(data, bodySize, pos);
198 			
199 			bodyPayload = data[pos .. $];
200 
201 			if(this.isCompress)
202 			{
203 				bodyPayload =cast(ubyte[]) Snappy.uncompress(cast(byte[])bodyPayload);
204 				bodySize = cast(ushort)bodyPayload.length;
205 			}
206 					
207 		}catch(Exception e)
208 		{
209 			logWarning("decode binary stream is error:%s", e.msg);
210 			return false;
211 		}
212 
213 		return true;
214 	}
215 
216 	bool fromStreamForHander(ubyte[] data)
217 	{
218 		ulong pos = 0;
219 		
220 		try{
221 
222 			pos = readBytesPos(data, magic, pos);
223 			pos = readBytePos(data, handerSize, pos);
224 			pos = readBytePos(data, ver, pos);
225 			
226 			pos = readBinaryPos(data, st, pos);
227 			
228 			pos = readBytePos(data, statusInfo, pos);
229 			pos = readBytesPos(data, reserved, pos);
230 
231 			pos = readBinaryPos(data, funcId, pos);
232 			pos = readBinaryPos(data, sequenceId, pos);
233 			pos = readBinaryPos(data, bodySize, pos);
234 
235 			
236 		}catch(Exception e)
237 		{
238 			logWarning("decode binary stream for hander is error:%s", e.msg);
239 			return false;
240 		}
241 
242 		return this.checkHanderValid();
243 	}
244 
245 	bool fromStreamForPayload(ubyte[] data)
246 	{
247 		try{
248 
249 			bodyPayload = data[0 .. $];
250 				
251 			if(this.isCompress)
252 			{
253 				bodyPayload =cast(ubyte[]) Snappy.uncompress(cast(byte[])bodyPayload);
254 				bodySize = cast(ushort)bodyPayload.length;
255 			}
256 
257 		}catch(Exception e)
258 		{
259 			logWarning("decode body stream is error:%s", e.msg);
260 			return false;
261 		}
262 
263 		return true;
264 	}
265 
266 
267 	bool checkHanderValid()
268 	{
269 		return magic == RPC_HANDER_MAGIC && ver == RPC_HANDER_VERSION && this.getPackgeSize <= RPC_PACKAGE_MAX;
270 	}
271 
272 	RPC_PACKAGE_COMPRESS_TYPE getCompressType()
273 	{
274 		return cast(RPC_PACKAGE_COMPRESS_TYPE)((st & RPC_HANDER_CPNPRESS_TYPE_FLAG)>>8);
275 	}
276 
277 	bool isCompress()
278 	{
279 		return cast(bool)(st & RPC_HANDER_COMPRESS_FLAG);
280 	}
281 
282 protected:
283 
284 	ulong writeBinaryPos(T)(ubyte[] data, T t, ulong pos)
285 	{
286 		T bits= hostToNet(t);
287 		data[pos .. pos + t.sizeof ] = (cast(ubyte*)&bits)[0 .. t.sizeof];
288 		return pos + t.sizeof;
289 	}
290 
291 	ulong writeBytesPos(ubyte[] data, ubyte[] bytes, ulong pos)
292 	{
293 		data[pos .. pos + bytes.length] = bytes[0 .. bytes.length];
294 		return pos + bytes.length;
295 	}
296 
297 	ulong writeBytePos(ubyte[] data, ubyte abyte, ulong pos)
298 	{
299 		data[pos .. pos + abyte.sizeof] = abyte;
300 		return pos + abyte.sizeof;
301 	}
302 
303 	ulong readBinaryPos(T)(ubyte[] data, ref T t, ulong pos)
304 	{
305 		IntBuf!(T) bits;
306 		bits.bytes = data[pos .. pos + t.sizeof];
307 		t = netToHost(bits.value);
308 	
309 		return pos + t.sizeof;
310 	}
311 
312 	ulong readBytesPos(ubyte[] data, ubyte[] bytes, ulong pos)
313 	{
314 		bytes[0 .. $] = data[pos .. pos + bytes.length];
315 
316 		return pos + bytes.length;
317 	}
318 
319 	ulong readBytePos(ubyte[] data, ref ubyte abyte, ulong pos)
320 	{
321 		abyte = (data[pos .. pos + abyte.sizeof])[0];
322 		return pos + abyte.sizeof;
323 	}
324 
325 
326 private:
327 	ubyte[2] magic;
328 
329 	ubyte handerSize;
330 	ubyte ver;
331 	short st;
332 
333 	ubyte statusInfo; // [nb:1bit, ow:1bit, rp:1bit, nonblock:1bit, statusCode:4bit] 
334 
335 	ubyte[2] reserved;
336 	uint sequenceId;
337 	size_t funcId;
338 	ushort bodySize;
339 
340 	ubyte[] bodyPayload;
341 }
342 
343 
344 unittest{
345 	import std.stdio;
346 
347 	auto send_pkg  = new RpcBinaryPackage(RPC_PACKAGE_PROTOCOL.TPP_CAPNP_BUF, 0, RPC_PACKAGE_COMPRESS_TYPE.RPCT_DYNAMIC);
348 	auto send_data = "aaaaaaaabbbbbbbbbbbbbbcccccccccccccccdddddddddddddddddddd";
349 
350 	writeln("-----------------------------------------------------");
351 
352 	auto snd_stream = send_pkg.toStream(cast(ubyte[])send_data);
353 
354 	writefln("send stream, length:%s, compress:%s, data:%s", snd_stream.length, send_pkg.getCompressType, snd_stream);
355 
356 	auto recvPkg = new RpcBinaryPackage(RPC_PACKAGE_PROTOCOL.TPP_CAPNP_BUF);
357 
358 	recvPkg.fromStream(snd_stream);
359 
360 	writeln("----------------------------------------------------");
361 
362 	writefln("recv stream, length:%s, compress:%s, data:%s", recvPkg.getPackgeSize(), recvPkg.getCompressType, recvPkg.getPayload());
363 
364 }