1 module kissrpc.RpcServer;
2 
3 import kissrpc.RpcRequest;
4 import kissrpc.Unit;
5 import kissrpc.RpcResponse;
6 import kissrpc.RpcBinaryPackage;
7 import kissrpc.RpcServerSocket;
8 import kissrpc.RpcEventInterface;
9 import kissrpc.RpcPackageBase;
10 import kissrpc.RpcSocketBaseInterface;
11 import kissrpc.RpcSendPackageManage;
12 import kissrpc.Logs;
13 
14 // import kiss.event.GroupPoll;
15 // import kiss.aio.AsyncGroupTcpServer;
16 
17 import kiss.aio.AsynchronousChannelThreadGroup;
18 import kiss.aio.AsynchronousSocketChannel;
19 import kiss.net.TcpAcceptor;
20 
21 
22 import std.stdio;
23 
24 alias RequestCallback = void delegate(RpcRequest);
25 
26 class RpcServer:TcpAcceptor, RpcEventInterface{
27 
28 	this(string ip, ushort port, AsynchronousChannelThreadGroup group, ServerSocketEventInterface socketEvent)
29 	{
30 		serverSocketEvent = socketEvent;
31 		sendPackManage = new RpcSendPackageManage(this);
32 		compressType = RPC_PACKAGE_COMPRESS_TYPE.RPCT_NO;
33 
34 		super(ip, port, group.getWorkSelector());
35 	}
36 
37 	override void onAcceptCompleted(void* attachment, AsynchronousSocketChannel result) {
38 		RpcServerSocket server = new RpcServerSocket(result, this);
39 	}
40     override void onAcceptFailed(void* attachment) {
41 		deWritefln("rpc acceptFailed");
42 	}
43 
44 	void bind(string className, string funcName)
45 	{
46 		string key = className ~ "." ~ funcName;
47 		deWritefln("rpc server bind:%s", key);
48 	}
49 	
50 	void bindCallback(const size_t funcId, RequestCallback callback)
51 	{
52 		rpcCallbackMap[funcId] = callback;
53 		deWritefln("rpc server bind callback:%s, %s, addr:%s",funcId, RpcBindFunctionMap[funcId], callback);
54 	}
55 
56 	bool RpcResponseRemoteCall(RpcResponse resp, RPC_PACKAGE_PROTOCOL protocol)
57 	{
58 		if(resp.getCompressType == RPC_PACKAGE_COMPRESS_TYPE.RPCT_NO)
59 		{
60 			resp.setCompressType(this.compressType);
61 		}
62 
63 		deWritefln("rpc response remote call:%s, id:%s", resp.getCallFuncName, resp.getCallFuncId);
64 		return 	sendPackManage.add(resp, false);
65 	}
66 
67 
68 	void rpcRecvPackageEvent(RpcSocketBaseInterface socket, RpcBinaryPackage pack)
69 	{
70 		deWritefln("server recv package event, hander len:%s, package size:%s, ver:%s, func id:%s, sequence id:%s, body size:%s, compress:%s", 
71 					pack.getHanderSize, pack.getPackgeSize, pack.getVersion, pack.getFuncId, pack.getSequenceId, pack.getBodySize, pack.getCompressType);
72 
73 		if(pack.getStatusCode != RPC_PACKAGE_STATUS_CODE.RPSC_OK)
74 		{
75 			logWarning("server recv binary package is failed, hander len:%s, package size:%s, ver:%s, sequence id:%s, body size:%s, compress:%s, status code:%s", 
76 				pack.getHanderSize, pack.getPackgeSize, pack.getVersion, pack.getSequenceId, pack.getBodySize, pack.getCompressType, pack.getStatusCode);
77 		
78 		}else
79 		{
80 			RpcPackageBase packageBase;
81 
82 			switch(pack.getSerializedType)
83 			{
84 				case RPC_PACKAGE_PROTOCOL.TPP_JSON:break;
85 				case RPC_PACKAGE_PROTOCOL.TPP_XML: break;
86 				case RPC_PACKAGE_PROTOCOL.TPP_PROTO_BUF: break;
87 				case RPC_PACKAGE_PROTOCOL.TPP_FLAT_BUF:  break;
88 				case RPC_PACKAGE_PROTOCOL.TPP_CAPNP_BUF: break;
89 					
90 				default:
91 					logWarning("unpack serialized type is failed!, type:%d", pack.getSerializedType());
92 			}
93 
94 			auto rpcReq = new RpcRequest(socket);
95 
96 			rpcReq.setSequence(pack.getSequenceId());
97 			rpcReq.setNonblock(pack.getNonblock());
98 			rpcReq.setCompressType(pack.getCompressType());
99 			rpcReq.bindFunc(pack.getFuncId());
100 			rpcReq.push(pack.getPayload());
101 
102 			deWritefln("rpc client request call, func:%s, arg num:%s", rpcReq.getCallFuncName(), rpcReq.getArgsNum());
103 
104 			auto callback = rpcCallbackMap.get(rpcReq.getCallFuncId, null);
105 
106 			if(callback !is null)
107 			{
108 					callback(rpcReq);
109 			}else
110 			{
111 					logError("client rpc call function is not bind, function name:%s", rpcReq.getCallFuncName);
112 			}
113 
114 		}
115 	}
116 
117 	void rpcSendPackageEvent(RpcResponse rpcResp)
118 	{
119 		switch(rpcResp.getStatus)
120 		{
121 			case RESPONSE_STATUS.RS_TIMEOUT:
122 				logWarning("response timeout, func:%s, start time:%s, time:%s, sequence:%s", rpcResp.getCallFuncName, rpcResp.getTimestamp, rpcResp.getTimeout, rpcResp.getSequence);
123 				break;
124 				
125 			case RESPONSE_STATUS.RS_FAILD:
126 				logWarning("request failed, func:%s, start time:%s, time:%s, sequence:%s", rpcResp.getCallFuncName, rpcResp.getTimestamp, rpcResp.getTimeout, rpcResp.getSequence);
127 				break;
128 				
129 			default:
130 				logWarning("rpc send package event is fatal!!, event type error!");
131 		}
132 	}
133 
134 	void socketEvent(RpcSocketBaseInterface socket, const SOCKET_STATUS status,const string statusStr)
135 	{
136 		logInfo("server socket status:%s", statusStr);
137 
138 		switch(status)
139 		{
140 			case SOCKET_STATUS.SE_LISTEN_FAILED: serverSocketEvent.listenFailed(statusStr); break;
141 			case SOCKET_STATUS.SE_CONNECTD: serverSocketEvent.inconming(socket); break;
142 			case SOCKET_STATUS.SE_DISCONNECTD:serverSocketEvent.disconnectd(socket); break;
143 			case SOCKET_STATUS.SE_READ_FAILED: serverSocketEvent.readFailed(socket); break;
144 			case SOCKET_STATUS.SE_WRITE_FAILED: serverSocketEvent.writeFailed(socket); break;
145 
146 			default:
147 				logError("server socket status is fatal!!", statusStr);
148 		}
149 	}
150 
151 
152 	void setSocketCompress(RPC_PACKAGE_COMPRESS_TYPE type)
153 	{
154 		compressType = type;
155 	}
156 
157 private:
158 	RpcSendPackageManage sendPackManage;
159 	RPC_PACKAGE_COMPRESS_TYPE compressType;
160 
161 	ServerSocketEventInterface serverSocketEvent;
162 	RequestCallback[size_t] rpcCallbackMap;
163 }