1 module kissrpc.RpcClient;
2 
3 import kissrpc.RpcRequest;
4 import kissrpc.Unit;
5 import kissrpc.RpcResponse;
6 import kissrpc.RpcBinaryPackage;
7 import kissrpc.RpcClientSocket;
8 import kissrpc.RpcEventInterface;
9 import kissrpc.RpcPackageBase;
10 import kissrpc.RpcSocketBaseInterface;
11 import kissrpc.RpcEventInterface;
12 import kissrpc.RpcSendPackageManage;
13 import kissrpc.Logs;
14 
15 import kiss.aio.AsynchronousChannelSelector;
16 import kiss.aio.ByteBuffer;
17 
18 import std.parallelism;
19 import std.stdio;
20 import std.experimental.logger.core;
21 
22 
23 alias  ReponsCallback =  void delegate(RpcResponse);
24 
25 
26 class RpcClient:RpcEventInterface{
27 
28 	this(ClientSocketEventInterface socketEvent)
29 	{
30 		clientSocketEvent = socketEvent;
31 		sendPackManage = new RpcSendPackageManage(this);
32 		defaultPoolThreads = RPC_CLIENT_DEFAULT_THREAD_POOL;
33 		compressType = RPC_PACKAGE_COMPRESS_TYPE.RPCT_NO;
34 	}
35 	
36 	void bind(string className, string funcName)
37 	{
38 		string key = className ~ "." ~ funcName;
39 		deWritefln("rpc client bind:%s", key);
40 	}
41 
42 	void bindCallback(size_t funcId, ReponsCallback callback)
43 	{
44 		rpcCallbackMap[funcId] = callback;
45 	}
46 
47 	
48 	bool requestRemoteCall(RpcRequest req, RPC_PACKAGE_PROTOCOL protocol)
49 	{	
50 		packMessageCount++;
51 		req.setSequence(packMessageCount);
52 		req.setSocket(clientSocket);
53 
54 		if(req.getCompressType == RPC_PACKAGE_COMPRESS_TYPE.RPCT_NO)
55 		{
56 			req.setCompressType(this.compressType);
57 		}
58 
59 		deWritefln("rpc client request remote call:%s, id:%s", req.getCallFuncName(), req.getCallFuncId);
60 		return sendPackManage.add(req);
61 	}
62 
63 	void rpcRecvPackageEvent(RpcSocketBaseInterface socket, RpcBinaryPackage pack)
64 	{
65 
66 		if(sendPackManage.remove(pack.getSequenceId))
67 		{
68 			deWritefln("client recv package event, hander len:%s, package size:%s, ver:%s, func id:%s, sequence id:%s, body size:%s, compress:%s", 
69 				pack.getHanderSize, pack.getPackgeSize, pack.getVersion, pack.getFuncId, pack.getSequenceId, pack.getBodySize, pack.getCompressType);
70 
71 			RpcPackageBase packageBase;
72 			
73 			switch(pack.getSerializedType)
74 			{
75 				case RPC_PACKAGE_PROTOCOL.TPP_JSON:break;
76 				case RPC_PACKAGE_PROTOCOL.TPP_XML: break;
77 				case RPC_PACKAGE_PROTOCOL.TPP_PROTO_BUF: break;
78 				case RPC_PACKAGE_PROTOCOL.TPP_FLAT_BUF:  break;
79 				case RPC_PACKAGE_PROTOCOL.TPP_CAPNP_BUF: break;
80 					
81 				default:
82 					logWarning("unpack serialized type is failed!, type:%d", pack.getSerializedType());
83 			}
84 
85 			auto rpcResp = new RpcRequest(socket);
86 
87 			rpcResp.setSequence(pack.getSequenceId());
88 			rpcResp.setNonblock(pack.getNonblock());
89 			rpcResp.setCompressType(pack.getCompressType);
90 			rpcResp.bindFunc(pack.getFuncId());
91 			rpcResp.push(pack.getPayload());
92 
93 			if(pack.getStatusCode != RPC_PACKAGE_STATUS_CODE.RPSC_OK)
94 			{
95 				logWarning("server recv binary package is failed, hander len:%s, package size:%s, ver:%s, sequence id:%s, body size:%s, status code:%s", 
96 					pack.getHanderSize, pack.getPackgeSize, pack.getVersion, pack.getSequenceId, pack.getBodySize, pack.getStatusCode);
97 				
98 				rpcResp.setStatus(RESPONSE_STATUS.RS_FAILD);
99 				
100 			}else
101 			{
102 				rpcResp.setStatus(RESPONSE_STATUS.RS_OK);	
103 				deWritefln("rpc server response call, func:%s, arg num:%s", rpcResp.getCallFuncName(), rpcResp.getArgsNum());			
104 			}
105 			
106 			if(pack.getNonblock)
107 			{
108 				deWritefln("async call from rpc server response, func:%s, arg num:%s", rpcResp.getCallFuncName(), rpcResp.getArgsNum());			
109 			}else
110 			{
111 				deWritefln("sync call from rpc server response, func:%s, arg num:%s", rpcResp.getCallFuncName(), rpcResp.getArgsNum());			
112 			}
113 
114 			auto callback = rpcCallbackMap.get(rpcResp.getCallFuncId, null);
115 			
116 			if(callback !is null)
117 			{
118 				callback(rpcResp);
119 			}else
120 			{
121 				logError("server rpc call function is not bind, function name:%s", rpcResp.getCallFuncName);
122 			}
123 
124 		}else
125 		{
126 			logWarning("Accept error, client recv response failed, package is timeout!!!, hander len:%s, package size:%s, ver:%s, sequence id:%s, body size:%s", 
127 				pack.getHanderSize, pack.getPackgeSize, pack.getVersion, pack.getSequenceId, pack.getBodySize);
128 		}
129 	}
130 
131 	void rpcSendPackageEvent(RpcResponse rpcResp)
132 	{
133 		switch(rpcResp.getStatus)
134 		{
135 			case RESPONSE_STATUS.RS_TIMEOUT:
136 				logWarning("response timeout, func:%s, start time:%s, time:%s, sequence:%s", rpcResp.getCallFuncName, rpcResp.getTimestamp, rpcResp.getTimeout, rpcResp.getSequence);
137 				break;
138 			
139 			case RESPONSE_STATUS.RS_FAILD:
140 				logWarning("request failed, func:%s, start time:%s, time:%s, sequence:%s", rpcResp.getCallFuncName, rpcResp.getTimestamp, rpcResp.getTimeout, rpcResp.getSequence);
141 				break;
142 
143 			default:
144 				logWarning("rpc send package event is fatal!!, event type error!");
145 		}
146 
147 			auto callback = rpcCallbackMap.get(rpcResp.getCallFuncId, null);
148 
149 			if(callback !is null)
150 			{
151 				callback(rpcResp);
152 			}else
153 			{
154 				logError("server rpc call function is not bind, function name:%s", rpcResp.getCallFuncName);
155 			}
156 	}
157 
158 
159 	void socketEvent(RpcSocketBaseInterface socket, const SOCKET_STATUS status,const string statusStr)
160 	{
161 		// logInfo("client socket status info:%s", statusStr);
162 		switch(status)
163 		{
164 			case SOCKET_STATUS.SE_CONNECTD:
165 				 auto t = task(&clientSocketEvent.connectd, socket); 
166 				 taskPool.put(t);
167 				 break;
168 
169 			case SOCKET_STATUS.SE_DISCONNECTD: 
170 				auto t = task(&clientSocketEvent.disconnectd, socket); 
171 				taskPool.put(t);
172 				break;
173 			
174 			case SOCKET_STATUS.SE_READ_FAILED : 
175 				 auto t = task(&clientSocketEvent.readFailed, socket); 
176 				 taskPool.put(t);
177 				 break;
178 
179 			case SOCKET_STATUS.SE_WRITE_FAILED: 
180 				auto t = task(&clientSocketEvent.writeFailed, socket); 
181 				taskPool.put(t);
182 				break;
183 		
184 			default:
185 				logError("client socket status is fatal!!", statusStr);
186 				return;
187 		}
188 
189 	}
190 
191 	void connect(string ip, ushort port, AsynchronousChannelSelector sel)
192 	{
193 		clientSocket = new RpcClientSocket(ip, port, sel, this);
194 	}
195 
196 	void reConnect()
197 	{
198 		clientSocket.reConnect();
199 	}
200 
201 	ulong getWaitResponseNum()
202 	{
203 		return sendPackManage.getWaitResponseNum;
204 	}
205 
206 	void setSocketCompress(RPC_PACKAGE_COMPRESS_TYPE type)
207 	{
208 		compressType = type;
209 	}
210 	
211 private:
212 	RpcSendPackageManage sendPackManage;
213 	RPC_PACKAGE_COMPRESS_TYPE compressType;
214 
215 	ReponsCallback[size_t] rpcCallbackMap;
216 
217 	ulong packMessageCount;
218 
219 	RpcClientSocket clientSocket;
220 	ClientSocketEventInterface clientSocketEvent;
221 }