1 module kissrpc.RpcRecvPackageManage; 2 3 import kissrpc.RpcBinaryPackage; 4 import kissrpc.RpcServerSocket; 5 import kissrpc.RpcEventInterface; 6 import kissrpc.RpcSocketBaseInterface; 7 import kissrpc.Unit; 8 import kissrpc.Logs; 9 10 import std.parallelism; 11 import std.stdio; 12 import core.thread; 13 14 class CapnprotoRecvPackage 15 { 16 this() 17 { 18 binaryPackage = new RpcBinaryPackage(RPC_PACKAGE_PROTOCOL.TPP_CAPNP_BUF); 19 hander = new ubyte[binaryPackage.getHanderSize]; 20 recvRemainBytes = hander.length; 21 } 22 23 24 ubyte[] parse(ubyte[] bytes, ref bool isOk) 25 { 26 ulong cpySize = bytes.length > recvRemainBytes? recvRemainBytes : bytes.length; 27 ulong bytesPos = 0; 28 29 if(parseState == 0) 30 { 31 hander[handerPos .. handerPos + cpySize] = bytes[bytesPos .. bytesPos + cpySize]; 32 33 handerPos += cpySize; 34 bytesPos += cpySize; 35 36 recvRemainBytes -= cpySize; 37 38 if(recvRemainBytes == 0) 39 { 40 if(binaryPackage.fromStreamForHander(hander)) 41 { 42 payload = new ubyte[binaryPackage.getBodySize()]; 43 recvRemainBytes = payload.length; 44 parseState = 1; 45 46 return this.parse(bytes[bytesPos .. bytesPos + (bytes.length - cpySize)], isOk); 47 } 48 } 49 } 50 51 if(parseState == 1 && recvRemainBytes > 0) 52 { 53 payload[payloadPos .. payloadPos + cpySize] = bytes[bytesPos .. bytesPos + cpySize]; 54 55 payloadPos += cpySize; 56 bytesPos += cpySize; 57 recvRemainBytes -= cpySize; 58 59 if(recvRemainBytes == 0) 60 { 61 isOk = binaryPackage.fromStreamForPayload(payload); 62 } 63 } 64 65 return bytes[bytesPos .. bytesPos + (bytes.length-cpySize)]; 66 } 67 68 RpcBinaryPackage getPackage() 69 { 70 return binaryPackage; 71 } 72 73 bool checkHanderValid() 74 { 75 return binaryPackage.checkHanderValid; 76 } 77 78 bool checkPackageValid() 79 { 80 return binaryPackage.checkHanderValid && payloadPos == payload.length; 81 } 82 83 private: 84 ubyte[] hander; 85 ubyte[] payload; 86 int parseState; 87 88 ulong handerPos, payloadPos; 89 90 ulong recvRemainBytes; 91 92 RpcBinaryPackage binaryPackage; 93 } 94 95 class RpcRecvPackageManage 96 { 97 this(RpcSocketBaseInterface baseSocket, RpcEventInterface rpcDelegate) 98 { 99 rpcEventDelegate = rpcDelegate; 100 socket = baseSocket; 101 } 102 103 104 void add(ubyte[] bytes) 105 { 106 do{ 107 auto pack = recvPackage.get(id, new CapnprotoRecvPackage); 108 109 bool parseOk = false; 110 111 recvPackage[id] = pack; 112 113 bytes = pack.parse(bytes, parseOk); 114 115 if(parseOk) 116 { 117 auto capnprotoPack = pack.getPackage(); 118 119 if(pack.checkHanderValid()) 120 { 121 if(pack.checkPackageValid) 122 { 123 rpcEventDelegate.rpcRecvPackageEvent(socket, capnprotoPack); 124 recvPackage.remove(id); 125 id++; 126 }else{ 127 logError("parse package check hander is error, package data:%s", bytes); 128 } 129 }else 130 { 131 capnprotoPack.setStatusCode(RPC_PACKAGE_STATUS_CODE.RPSC_FAILED); 132 recvPackage.remove(id); 133 rpcEventDelegate.rpcRecvPackageEvent(socket, capnprotoPack); 134 } 135 } 136 }while(bytes.length > 0); 137 } 138 139 private: 140 ulong id; 141 CapnprotoRecvPackage[ulong] recvPackage; 142 RpcEventInterface rpcEventDelegate; 143 RpcSocketBaseInterface socket; 144 }