1 module kissrpc.RpcRecvPackageManage; 2 3 import kissrpc.RpcBinaryPackage; 4 import kissrpc.RpcServerSocket; 5 import kissrpc.RpcEventInterface; 6 import kissrpc.RpcSocketBaseInterface; 7 import kissrpc.Unit; 8 import kissrpc.Logs; 9 10 import kissrpc.RpcClientSocket; 11 12 import std.parallelism; 13 import std.stdio; 14 import core.thread; 15 import std.format; 16 17 import std.experimental.logger; 18 19 class CapnprotoRecvPackage 20 { 21 this() 22 { 23 binaryPackage = new RpcBinaryPackage(RPC_PACKAGE_PROTOCOL.TPP_CAPNP_BUF); 24 hander = new ubyte[binaryPackage.getHanderSize]; 25 recvRemainBytes = hander.length; 26 } 27 28 29 ubyte[] parse(ubyte[] bytes, ref bool isOk) 30 { 31 // log("parse ",bytes,", hander.length ",hander.length); 32 ulong cpySize = bytes.length > recvRemainBytes? recvRemainBytes : bytes.length; 33 ulong bytesPos = 0; 34 // log("parseState ",parseState, ", recvRemainBytes ",recvRemainBytes, ", bytes.length ", bytes.length,", isOk ",isOk); 35 if(parseState == 0) 36 { 37 hander[handerPos .. handerPos + cpySize] = bytes[bytesPos .. bytesPos + cpySize]; 38 // log(format("handerPos = %s, hander = %s",handerPos,hander)); 39 40 handerPos += cpySize; 41 bytesPos += cpySize; 42 43 recvRemainBytes -= cpySize; 44 45 if(recvRemainBytes == 0) 46 { 47 if(binaryPackage.fromStreamForHander(hander)) 48 { 49 payload = new ubyte[binaryPackage.getBodySize()]; 50 recvRemainBytes = payload.length; 51 parseState = 1; 52 return this.parse(bytes[bytesPos .. bytesPos + (bytes.length - cpySize)], isOk); 53 } 54 else 55 return null; 56 } 57 } 58 59 if(parseState == 1 && recvRemainBytes >= 0) 60 { 61 // log(format("payloadPos = %s, payload = %s",payloadPos,payload)); 62 payload[payloadPos .. payloadPos + cpySize] = bytes[bytesPos .. bytesPos + cpySize]; 63 64 payloadPos += cpySize; 65 bytesPos += cpySize; 66 recvRemainBytes -= cpySize; 67 68 if(recvRemainBytes == 0) 69 { 70 isOk = binaryPackage.fromStreamForPayload(payload); 71 if (!isOk) { 72 log("body check error!!!"); 73 return null; 74 } 75 } 76 } 77 78 79 return bytes[bytesPos .. bytesPos + (bytes.length-cpySize)]; 80 } 81 82 RpcBinaryPackage getPackage() 83 { 84 return binaryPackage; 85 } 86 87 bool checkHanderValid() 88 { 89 return binaryPackage.checkHanderValid; 90 } 91 92 bool checkPackageValid() 93 { 94 return binaryPackage.checkHanderValid && payloadPos == payload.length; 95 } 96 97 private: 98 ubyte[] hander; 99 ubyte[] payload; 100 int parseState; 101 102 ulong handerPos, payloadPos; 103 104 ulong recvRemainBytes; 105 106 RpcBinaryPackage binaryPackage; 107 } 108 109 class RpcRecvPackageManage 110 { 111 this(RpcSocketBaseInterface baseSocket, RpcEventInterface rpcDelegate) 112 { 113 rpcEventDelegate = rpcDelegate; 114 socket = baseSocket; 115 } 116 117 118 void add(ubyte[] bytes) 119 { 120 // log("add ",bytes); 121 do{ 122 auto pack = recvPackage.get(id, new CapnprotoRecvPackage); 123 124 bool parseOk = false; 125 126 recvPackage[id] = pack; 127 128 129 bytes = pack.parse(bytes, parseOk); 130 if (bytes is null) { 131 logError(format("parse head error !!!!")); 132 socket.disconnect(); 133 break; 134 } 135 136 // log("bytes.length ",bytes.length,", parseOk ",parseOk,", id ", id); 137 if(parseOk) 138 { 139 auto capnprotoPack = pack.getPackage(); 140 141 // log("pack.checkHanderValid() ",pack.checkHanderValid()); 142 if(pack.checkHanderValid()) 143 { 144 // log("pack.checkPackageValid() ",pack.checkPackageValid()); 145 if(pack.checkPackageValid) 146 { 147 if (capnprotoPack.getFuncId() == 0) { 148 logInfo("recv heart kick"); 149 if (cast(RpcClientSocket)socket !is null) { 150 151 } 152 else if(cast(RpcServerSocket)socket !is null) { 153 socket.write(cast(byte[])capnprotoPack.getHead()); 154 } 155 }else { 156 rpcEventDelegate.rpcRecvPackageEvent(socket, capnprotoPack); 157 } 158 recvPackage.remove(id); 159 id++; 160 }else{ 161 logError("parse package check hander is error, package data:%s", bytes); 162 socket.disconnect(); 163 break; 164 } 165 }else 166 { 167 logError(format("parse parseOk head error !!!!")); 168 socket.disconnect(); 169 break; 170 } 171 } 172 }while(bytes.length > 0); 173 } 174 175 private: 176 ulong id; 177 CapnprotoRecvPackage[ulong] recvPackage; 178 RpcEventInterface rpcEventDelegate; 179 RpcSocketBaseInterface socket; 180 }