1 module kissrpc.RpcServerSocket; 2 3 import kissrpc.RpcRecvPackageManage; 4 import kissrpc.RpcEventInterface; 5 import kissrpc.RpcSocketBaseInterface; 6 import kissrpc.Unit; 7 8 import std.socket; 9 import std.stdio; 10 import std.conv; 11 import core.thread; 12 13 14 import kiss.net.TcpServer; 15 import kiss.aio.AsynchronousSocketChannel; 16 import kiss.aio.ByteBuffer; 17 import kiss.aio.AsynchronousChannelSelector; 18 19 class RpcServerSocket:TcpServer, RpcSocketBaseInterface{ 20 21 public: 22 this(AsynchronousSocketChannel client, RpcEventInterface rpcEventDalegate) 23 { 24 _socketEventDelegate = rpcEventDalegate; 25 _packageManage = new RpcRecvPackageManage(this, rpcEventDalegate); 26 super(client, RPC_PACKAGE_MAX); 27 _selector = client.getSelector(); 28 _socketEventDelegate.socketEvent(this, SOCKET_STATUS.SE_CONNECTD, "client inconming...."); 29 } 30 override void onWriteCompleted(void* attachment, size_t count , ByteBuffer buffer) { 31 32 } 33 override void onWriteFailed(void* attachment) { 34 35 _socketEventDelegate.socketEvent(this, SOCKET_STATUS.SE_WRITE_FAILED, "write data to client is failed"); 36 } 37 override void onReadCompleted(void* attachment, size_t count , ByteBuffer buffer) { 38 _packageManage.add(cast(ubyte[])(buffer.getCurBuffer())); 39 } 40 override void onReadFailed(void* attachment) { 41 42 } 43 override void onClose() { 44 _socketEventDelegate.socketEvent(this, SOCKET_STATUS.SE_DISCONNECTD, "disconnect from client!"); 45 } 46 47 void disconnect() 48 { 49 close(); 50 } 51 bool write(byte[] buf) { 52 super.doWrite(buf); 53 return true; 54 } 55 56 int getFd() { return cast(int)(fd()); } 57 string getIp() { return ip(); } 58 string getPort() { return port(); } 59 60 public: 61 AsynchronousChannelSelector _selector; 62 63 private: 64 RpcEventInterface _socketEventDelegate; 65 RpcRecvPackageManage _packageManage; 66 } 67