1 module kissrpc.RpcBinaryPackage; 2 3 import kissrpc.Endian; 4 import kissrpc.Unit; 5 import kissrpc.Logs; 6 7 import snappy.snappy; 8 import std.stdio; 9 10 enum RPC_PACKAGE_STATUS_CODE 11 { 12 RPSC_OK, 13 RPSC_FAILED, 14 } 15 16 17 class RpcBinaryPackage 18 { 19 this(RPC_PACKAGE_PROTOCOL tpp, ulong msgId = 0, RPC_PACKAGE_COMPRESS_TYPE compressType = RPC_PACKAGE_COMPRESS_TYPE.RPCT_NO, bool isNonblock = true, size_t funcId = 0) 20 { 21 magic = RPC_HANDER_MAGIC; 22 ver = RPC_HANDER_VERSION; 23 sequenceId = cast(uint)msgId; 24 this.funcId = funcId; 25 26 statusInfo |= (isNonblock ? RPC_HANDER_NONBLOCK_FLAG : 0); 27 28 st = cast(short)tpp; 29 30 st |= (compressType << 8); 31 32 statusInfo |= (RPC_PACKAGE_STATUS_CODE.RPSC_OK & RPC_HANDER_STATUS_CODE_FLAG); 33 34 handerSize = ver.sizeof + st.sizeof + statusInfo.sizeof + reserved.sizeof + funcId.sizeof + sequenceId.sizeof + bodySize.sizeof; 35 } 36 37 int getStartHanderLength()const 38 { 39 return magic.sizeof + handerSize.sizeof; 40 } 41 42 int getHanderSize()const 43 { 44 return handerSize + this.getStartHanderLength(); 45 } 46 47 int getPackgeSize()const 48 { 49 return handerSize + bodySize + this.getStartHanderLength(); 50 } 51 52 size_t getFuncId()const 53 { 54 return funcId; 55 } 56 57 void setFuncId(const size_t id) 58 { 59 funcId = id; 60 } 61 62 ubyte[] getPayload() 63 { 64 return bodyPayload; 65 } 66 67 short getVersion()const 68 { 69 return ver; 70 } 71 72 ulong getBodySize()const 73 { 74 return bodySize; 75 } 76 77 short getSerializedType()const 78 { 79 return st & RPC_HANDER_SERI_FLAG; 80 } 81 82 ulong getSequenceId()const 83 { 84 return sequenceId; 85 } 86 87 bool getNonblock()const 88 { 89 return cast(bool)statusInfo & RPC_HANDER_NONBLOCK_FLAG; 90 } 91 92 short getStatusCode()const 93 { 94 return statusInfo & RPC_HANDER_STATUS_CODE_FLAG; 95 } 96 97 void setStatusCode(const RPC_PACKAGE_STATUS_CODE code) 98 { 99 statusInfo |= (code & RPC_HANDER_STATUS_CODE_FLAG); 100 } 101 102 bool getHB()const 103 { 104 return statusInfo & RPC_HANDER_HB_FLAG; 105 } 106 107 void setHBPackage() 108 { 109 statusInfo |= RPC_HANDER_HB_FLAG; 110 } 111 112 bool getOW()const 113 { 114 return cast(bool)statusInfo & RPC_HANDER_OW_FLAG; 115 } 116 117 void setOWPackage() 118 { 119 statusInfo |= RPC_HANDER_OW_FLAG; 120 } 121 122 bool getRP()const 123 { 124 return cast(bool)statusInfo & RPC_HANDER_RP_FLAG; 125 } 126 127 void setRP() 128 { 129 statusInfo |= RPC_HANDER_RP_FLAG; 130 } 131 132 133 ubyte[] toStream(ubyte[] payload) 134 { 135 switch(this.getCompressType()) 136 { 137 case RPC_PACKAGE_COMPRESS_TYPE.RPCT_COMPRESS: 138 payload =cast(ubyte[]) Snappy.compress(cast(byte[])payload); 139 st |= RPC_HANDER_COMPRESS_FLAG; 140 break; 141 142 case RPC_PACKAGE_COMPRESS_TYPE.RPCT_DYNAMIC: 143 if(payload.length >= RPC_PACKAGE_COMPRESS_DYNAMIC_VALUE) 144 { 145 payload = cast(ubyte[]) Snappy.compress(cast(byte[])payload); 146 st |= RPC_HANDER_COMPRESS_FLAG; 147 }else 148 { 149 st &= ~RPC_HANDER_COMPRESS_FLAG; 150 } 151 break; 152 153 default:break; 154 } 155 156 bodySize = cast(ushort)payload.length; 157 158 auto stream = new ubyte[this.getPackgeSize()]; 159 160 ulong pos = 0; 161 162 163 pos = writeBytesPos(stream, magic, pos); 164 pos = writeBytePos(stream, handerSize, pos); 165 pos = writeBytePos(stream, ver, pos); 166 167 pos = writeBinaryPos(stream, st, pos); 168 169 pos = writeBytePos(stream, statusInfo, pos); 170 pos = writeBytesPos(stream, reserved, pos); 171 172 pos = writeBinaryPos(stream, funcId, pos); 173 pos = writeBinaryPos(stream, sequenceId, pos); 174 pos = writeBinaryPos(stream, bodySize, pos); 175 176 pos = writeBytesPos(stream, payload, pos); 177 178 return stream; 179 } 180 181 bool fromStream(ubyte[] data) 182 { 183 ulong pos = 0; 184 185 try{ 186 pos = readBytesPos(data, magic, pos); 187 pos = readBytePos(data, handerSize, pos); 188 pos = readBytePos(data, ver, pos); 189 190 pos = readBinaryPos(data, st, pos); 191 192 pos = readBytePos(data, statusInfo, pos); 193 pos = readBytesPos(data, reserved, pos); 194 195 pos = readBinaryPos(data, funcId, pos); 196 pos = readBinaryPos(data, sequenceId, pos); 197 pos = readBinaryPos(data, bodySize, pos); 198 199 bodyPayload = data[pos .. $]; 200 201 if(this.isCompress) 202 { 203 bodyPayload =cast(ubyte[]) Snappy.uncompress(cast(byte[])bodyPayload); 204 bodySize = cast(ushort)bodyPayload.length; 205 } 206 207 }catch(Exception e) 208 { 209 logWarning("decode binary stream is error:%s", e.msg); 210 return false; 211 } 212 213 return true; 214 } 215 216 bool fromStreamForHander(ubyte[] data) 217 { 218 ulong pos = 0; 219 220 try{ 221 222 pos = readBytesPos(data, magic, pos); 223 pos = readBytePos(data, handerSize, pos); 224 pos = readBytePos(data, ver, pos); 225 226 pos = readBinaryPos(data, st, pos); 227 228 pos = readBytePos(data, statusInfo, pos); 229 pos = readBytesPos(data, reserved, pos); 230 231 pos = readBinaryPos(data, funcId, pos); 232 pos = readBinaryPos(data, sequenceId, pos); 233 pos = readBinaryPos(data, bodySize, pos); 234 235 236 }catch(Exception e) 237 { 238 logWarning("decode binary stream for hander is error:%s", e.msg); 239 return false; 240 } 241 242 return this.checkHanderValid(); 243 } 244 245 bool fromStreamForPayload(ubyte[] data) 246 { 247 try{ 248 249 bodyPayload = data[0 .. $]; 250 251 if(this.isCompress) 252 { 253 bodyPayload =cast(ubyte[]) Snappy.uncompress(cast(byte[])bodyPayload); 254 bodySize = cast(ushort)bodyPayload.length; 255 } 256 257 }catch(Exception e) 258 { 259 logWarning("decode body stream is error:%s", e.msg); 260 return false; 261 } 262 263 return true; 264 } 265 266 267 bool checkHanderValid() 268 { 269 return magic == RPC_HANDER_MAGIC && ver == RPC_HANDER_VERSION && this.getPackgeSize <= RPC_PACKAGE_MAX; 270 } 271 272 RPC_PACKAGE_COMPRESS_TYPE getCompressType() 273 { 274 return cast(RPC_PACKAGE_COMPRESS_TYPE)((st & RPC_HANDER_CPNPRESS_TYPE_FLAG)>>8); 275 } 276 277 bool isCompress() 278 { 279 return cast(bool)(st & RPC_HANDER_COMPRESS_FLAG); 280 } 281 282 protected: 283 284 ulong writeBinaryPos(T)(ubyte[] data, T t, ulong pos) 285 { 286 T bits= hostToNet(t); 287 data[pos .. pos + t.sizeof ] = (cast(ubyte*)&bits)[0 .. t.sizeof]; 288 return pos + t.sizeof; 289 } 290 291 ulong writeBytesPos(ubyte[] data, ubyte[] bytes, ulong pos) 292 { 293 data[pos .. pos + bytes.length] = bytes[0 .. bytes.length]; 294 return pos + bytes.length; 295 } 296 297 ulong writeBytePos(ubyte[] data, ubyte abyte, ulong pos) 298 { 299 data[pos .. pos + abyte.sizeof] = abyte; 300 return pos + abyte.sizeof; 301 } 302 303 ulong readBinaryPos(T)(ubyte[] data, ref T t, ulong pos) 304 { 305 IntBuf!(T) bits; 306 bits.bytes = data[pos .. pos + t.sizeof]; 307 t = netToHost(bits.value); 308 309 return pos + t.sizeof; 310 } 311 312 ulong readBytesPos(ubyte[] data, ubyte[] bytes, ulong pos) 313 { 314 bytes[0 .. $] = data[pos .. pos + bytes.length]; 315 316 return pos + bytes.length; 317 } 318 319 ulong readBytePos(ubyte[] data, ref ubyte abyte, ulong pos) 320 { 321 abyte = (data[pos .. pos + abyte.sizeof])[0]; 322 return pos + abyte.sizeof; 323 } 324 325 326 private: 327 ubyte[2] magic; 328 329 ubyte handerSize; 330 ubyte ver; 331 short st; 332 333 ubyte statusInfo; // [nb:1bit, ow:1bit, rp:1bit, nonblock:1bit, statusCode:4bit] 334 335 ubyte[2] reserved; 336 uint sequenceId; 337 size_t funcId; 338 ushort bodySize; 339 340 ubyte[] bodyPayload; 341 } 342 343 344 unittest{ 345 import std.stdio; 346 347 auto send_pkg = new RpcBinaryPackage(RPC_PACKAGE_PROTOCOL.TPP_CAPNP_BUF, 0, RPC_PACKAGE_COMPRESS_TYPE.RPCT_DYNAMIC); 348 auto send_data = "aaaaaaaabbbbbbbbbbbbbbcccccccccccccccdddddddddddddddddddd"; 349 350 writeln("-----------------------------------------------------"); 351 352 auto snd_stream = send_pkg.toStream(cast(ubyte[])send_data); 353 354 writefln("send stream, length:%s, compress:%s, data:%s", snd_stream.length, send_pkg.getCompressType, snd_stream); 355 356 auto recvPkg = new RpcBinaryPackage(RPC_PACKAGE_PROTOCOL.TPP_CAPNP_BUF); 357 358 recvPkg.fromStream(snd_stream); 359 360 writeln("----------------------------------------------------"); 361 362 writefln("recv stream, length:%s, compress:%s, data:%s", recvPkg.getPackgeSize(), recvPkg.getCompressType, recvPkg.getPayload()); 363 364 }