1 module kissrpc.RpcBinaryPackage; 2 3 import kissrpc.Endian; 4 import kissrpc.Unit; 5 import kissrpc.Logs; 6 7 import snappy.snappy; 8 import std.stdio; 9 import std.experimental.logger; 10 import std.format; 11 12 enum RPC_PACKAGE_STATUS_CODE 13 { 14 RPSC_OK, 15 RPSC_FAILED, 16 } 17 18 19 class RpcBinaryPackage 20 { 21 this(RPC_PACKAGE_PROTOCOL tpp, ulong msgId = 0, RPC_PACKAGE_COMPRESS_TYPE compressType = RPC_PACKAGE_COMPRESS_TYPE.RPCT_NO, bool isNonblock = true, size_t funcId = 0) 22 { 23 magic = RPC_HANDER_MAGIC; 24 ver = RPC_HANDER_VERSION; 25 sequenceId = cast(uint)msgId; 26 this.funcId = funcId; 27 28 statusInfo |= (isNonblock ? RPC_HANDER_NONBLOCK_FLAG : 0); 29 30 st = cast(short)tpp; 31 32 st |= (compressType << 8); 33 34 statusInfo |= (RPC_PACKAGE_STATUS_CODE.RPSC_OK & RPC_HANDER_STATUS_CODE_FLAG); 35 36 handerSize = ver.sizeof + st.sizeof + statusInfo.sizeof + reserved.sizeof + funcId.sizeof + sequenceId.sizeof + bodySize.sizeof; 37 } 38 39 int getStartHanderLength()const 40 { 41 return magic.sizeof + handerSize.sizeof; 42 } 43 44 int getHanderSize()const 45 { 46 return handerSize + this.getStartHanderLength(); 47 } 48 49 int getPackgeSize()const 50 { 51 return handerSize + bodySize + this.getStartHanderLength(); 52 } 53 54 size_t getFuncId()const 55 { 56 return funcId; 57 } 58 59 void setFuncId(const size_t id) 60 { 61 funcId = id; 62 } 63 64 ubyte[] getPayload() 65 { 66 return bodyPayload; 67 } 68 69 short getVersion()const 70 { 71 return ver; 72 } 73 74 ulong getBodySize()const 75 { 76 return bodySize; 77 } 78 79 short getSerializedType()const 80 { 81 return st & RPC_HANDER_SERI_FLAG; 82 } 83 84 ulong getSequenceId()const 85 { 86 return sequenceId; 87 } 88 89 bool getNonblock()const 90 { 91 return cast(bool)statusInfo & RPC_HANDER_NONBLOCK_FLAG; 92 } 93 94 short getStatusCode()const 95 { 96 return statusInfo & RPC_HANDER_STATUS_CODE_FLAG; 97 } 98 99 void setStatusCode(const RPC_PACKAGE_STATUS_CODE code) 100 { 101 statusInfo |= (code & RPC_HANDER_STATUS_CODE_FLAG); 102 } 103 104 bool getHB()const 105 { 106 return statusInfo & RPC_HANDER_HB_FLAG; 107 } 108 109 void setHBPackage() 110 { 111 statusInfo |= RPC_HANDER_HB_FLAG; 112 } 113 114 bool getOW()const 115 { 116 return cast(bool)statusInfo & RPC_HANDER_OW_FLAG; 117 } 118 119 void setOWPackage() 120 { 121 statusInfo |= RPC_HANDER_OW_FLAG; 122 } 123 124 bool getRP()const 125 { 126 return cast(bool)statusInfo & RPC_HANDER_RP_FLAG; 127 } 128 129 void setRP() 130 { 131 statusInfo |= RPC_HANDER_RP_FLAG; 132 } 133 134 135 ubyte[] toStream(ubyte[] payload) 136 { 137 switch(this.getCompressType()) 138 { 139 case RPC_PACKAGE_COMPRESS_TYPE.RPCT_COMPRESS: 140 payload =cast(ubyte[]) Snappy.compress(cast(byte[])payload); 141 st |= RPC_HANDER_COMPRESS_FLAG; 142 break; 143 144 case RPC_PACKAGE_COMPRESS_TYPE.RPCT_DYNAMIC: 145 if(payload.length >= RPC_PACKAGE_COMPRESS_DYNAMIC_VALUE) 146 { 147 payload = cast(ubyte[]) Snappy.compress(cast(byte[])payload); 148 st |= RPC_HANDER_COMPRESS_FLAG; 149 }else 150 { 151 st &= ~RPC_HANDER_COMPRESS_FLAG; 152 } 153 break; 154 155 default:break; 156 } 157 158 bodySize = cast(ushort)payload.length; 159 160 auto stream = new ubyte[this.getPackgeSize()]; 161 162 ulong pos = 0; 163 164 165 pos = writeBytesPos(stream, magic, pos); 166 pos = writeBytePos(stream, handerSize, pos); 167 pos = writeBytePos(stream, ver, pos); 168 169 pos = writeBinaryPos(stream, st, pos); 170 171 pos = writeBytePos(stream, statusInfo, pos); 172 pos = writeBytesPos(stream, reserved, pos); 173 174 pos = writeBinaryPos(stream, funcId, pos); 175 pos = writeBinaryPos(stream, sequenceId, pos); 176 pos = writeBinaryPos(stream, bodySize, pos); 177 178 pos = writeBytesPos(stream, payload, pos); 179 180 return stream; 181 } 182 183 bool fromStream(ubyte[] data) 184 { 185 ulong pos = 0; 186 187 try{ 188 pos = readBytesPos(data, magic, pos); 189 pos = readBytePos(data, handerSize, pos); 190 pos = readBytePos(data, ver, pos); 191 192 pos = readBinaryPos(data, st, pos); 193 194 pos = readBytePos(data, statusInfo, pos); 195 pos = readBytesPos(data, reserved, pos); 196 197 pos = readBinaryPos(data, funcId, pos); 198 pos = readBinaryPos(data, sequenceId, pos); 199 pos = readBinaryPos(data, bodySize, pos); 200 201 bodyPayload = data[pos .. $]; 202 203 if(this.isCompress) 204 { 205 bodyPayload =cast(ubyte[]) Snappy.uncompress(cast(byte[])bodyPayload); 206 bodySize = cast(ushort)bodyPayload.length; 207 } 208 209 }catch(Exception e) 210 { 211 logWarning("decode binary stream is error:%s", e.msg); 212 return false; 213 } 214 215 return true; 216 } 217 218 bool fromStreamForHander(ubyte[] data) 219 { 220 ulong pos = 0; 221 222 try{ 223 224 pos = readBytesPos(data, magic, pos); 225 pos = readBytePos(data, handerSize, pos); 226 pos = readBytePos(data, ver, pos); 227 228 pos = readBinaryPos(data, st, pos); 229 230 pos = readBytePos(data, statusInfo, pos); 231 pos = readBytesPos(data, reserved, pos); 232 233 pos = readBinaryPos(data, funcId, pos); 234 pos = readBinaryPos(data, sequenceId, pos); 235 pos = readBinaryPos(data, bodySize, pos); 236 237 238 239 headData.length = getHanderSize(); 240 pos = 0; 241 pos = writeBytesPos(headData, magic, pos); 242 pos = writeBytePos(headData, handerSize, pos); 243 pos = writeBytePos(headData, ver, pos); 244 pos = writeBinaryPos(headData, st, pos); 245 pos = writeBytePos(headData, statusInfo, pos); 246 pos = writeBytesPos(headData, reserved, pos); 247 pos = writeBinaryPos(headData, funcId, pos); 248 pos = writeBinaryPos(headData, sequenceId, pos); 249 pos = writeBinaryPos(headData, bodySize, pos); 250 251 252 253 log(format("magic = %s, handerSize = %s, ver = %s, st = %s, statusInfo = %s, reserved = %s, funcId = %s, sequenceId = %s, bodySize = %s",magic,handerSize,ver,st,statusInfo,reserved,funcId,sequenceId,bodySize)); 254 255 }catch(Exception e) 256 { 257 logWarning("decode binary stream for hander is error:%s", e.msg); 258 return false; 259 } 260 261 return this.checkHanderValid(); 262 } 263 264 ubyte[] getHead() { 265 return headData; 266 } 267 268 bool fromStreamForPayload(ubyte[] data) 269 { 270 try{ 271 272 bodyPayload = data[0 .. $]; 273 274 if(this.isCompress) 275 { 276 bodyPayload =cast(ubyte[]) Snappy.uncompress(cast(byte[])bodyPayload); 277 bodySize = cast(ushort)bodyPayload.length; 278 } 279 280 }catch(Exception e) 281 { 282 logWarning("decode body stream is error:%s", e.msg); 283 return false; 284 } 285 286 return true; 287 } 288 289 290 bool checkHanderValid() 291 { 292 return magic == RPC_HANDER_MAGIC && ver == RPC_HANDER_VERSION && this.getPackgeSize <= RPC_PACKAGE_MAX; 293 } 294 295 RPC_PACKAGE_COMPRESS_TYPE getCompressType() 296 { 297 return cast(RPC_PACKAGE_COMPRESS_TYPE)((st & RPC_HANDER_CPNPRESS_TYPE_FLAG)>>8); 298 } 299 300 bool isCompress() 301 { 302 return cast(bool)(st & RPC_HANDER_COMPRESS_FLAG); 303 } 304 305 protected: 306 307 ulong writeBinaryPos(T)(ubyte[] data, T t, ulong pos) 308 { 309 T bits= hostToNet(t); 310 data[pos .. pos + t.sizeof ] = (cast(ubyte*)&bits)[0 .. t.sizeof]; 311 return pos + t.sizeof; 312 } 313 314 ulong writeBytesPos(ubyte[] data, ubyte[] bytes, ulong pos) 315 { 316 data[pos .. pos + bytes.length] = bytes[0 .. bytes.length]; 317 return pos + bytes.length; 318 } 319 320 ulong writeBytePos(ubyte[] data, ubyte abyte, ulong pos) 321 { 322 data[pos .. pos + abyte.sizeof] = abyte; 323 return pos + abyte.sizeof; 324 } 325 326 ulong readBinaryPos(T)(ubyte[] data, ref T t, ulong pos) 327 { 328 IntBuf!(T) bits; 329 bits.bytes = data[pos .. pos + t.sizeof]; 330 t = netToHost(bits.value); 331 332 return pos + t.sizeof; 333 } 334 335 ulong readBytesPos(ubyte[] data, ubyte[] bytes, ulong pos) 336 { 337 bytes[0 .. $] = data[pos .. pos + bytes.length]; 338 339 return pos + bytes.length; 340 } 341 342 ulong readBytePos(ubyte[] data, ref ubyte abyte, ulong pos) 343 { 344 abyte = (data[pos .. pos + abyte.sizeof])[0]; 345 return pos + abyte.sizeof; 346 } 347 348 349 private: 350 ubyte[2] magic; 351 352 ubyte handerSize; 353 ubyte ver; 354 short st; 355 356 ubyte statusInfo; // [nb:1bit, ow:1bit, rp:1bit, nonblock:1bit, statusCode:4bit] 357 358 ubyte[2] reserved; 359 uint sequenceId; 360 size_t funcId; 361 ushort bodySize; 362 363 ubyte[] bodyPayload; 364 ubyte[] headData; 365 } 366 367 368 unittest{ 369 import std.stdio; 370 371 auto send_pkg = new RpcBinaryPackage(RPC_PACKAGE_PROTOCOL.TPP_CAPNP_BUF, 0, RPC_PACKAGE_COMPRESS_TYPE.RPCT_DYNAMIC); 372 auto send_data = "aaaaaaaabbbbbbbbbbbbbbcccccccccccccccdddddddddddddddddddd"; 373 374 writeln("-----------------------------------------------------"); 375 376 auto snd_stream = send_pkg.toStream(cast(ubyte[])send_data); 377 378 writefln("send stream, length:%s, compress:%s, data:%s", snd_stream.length, send_pkg.getCompressType, snd_stream); 379 380 auto recvPkg = new RpcBinaryPackage(RPC_PACKAGE_PROTOCOL.TPP_CAPNP_BUF); 381 382 recvPkg.fromStream(snd_stream); 383 384 writeln("----------------------------------------------------"); 385 386 writefln("recv stream, length:%s, compress:%s, data:%s", recvPkg.getPackgeSize(), recvPkg.getCompressType, recvPkg.getPayload()); 387 388 }