1 module kissrpc.RpcClient;
2 
3 import kissrpc.RpcRequest;
4 import kissrpc.Unit;
5 import kissrpc.RpcResponse;
6 import kissrpc.RpcBinaryPackage;
7 import kissrpc.RpcClientSocket;
8 import kissrpc.RpcEventInterface;
9 import kissrpc.RpcPackageBase;
10 import kissrpc.RpcSocketBaseInterface;
11 import kissrpc.RpcEventInterface;
12 import kissrpc.RpcSendPackageManage;
13 import kissrpc.Logs;
14 
15 import kiss.aio.AsynchronousChannelSelector;
16 import kiss.aio.ByteBuffer;
17 
18 import std.parallelism;
19 import std.stdio;
20 import std.experimental.logger.core;
21 import std.format;
22 
23 alias  ReponsCallback =  void delegate(RpcResponse);
24 
25 
26 class RpcClient:RpcEventInterface{
27 
28 	this(ClientSocketEventInterface socketEvent)
29 	{
30 		clientSocketEvent = socketEvent;
31 		sendPackManage = new RpcSendPackageManage(this);
32 		defaultPoolThreads = RPC_CLIENT_DEFAULT_THREAD_POOL;
33 		compressType = RPC_PACKAGE_COMPRESS_TYPE.RPCT_NO;
34 	}
35 	int getFd() {
36 		return clientSocket.getFd();
37 	}
38 	
39 	void bind(string className, string funcName)
40 	{
41 		string key = className ~ "." ~ funcName;
42 		deWritefln("rpc client bind:%s", key);
43 	}
44 
45 	void bindCallback(size_t funcId, ReponsCallback callback)
46 	{
47 		rpcCallbackMap[funcId] = callback;
48 	}
49 
50 	
51 	bool requestRemoteCall(RpcRequest req, RPC_PACKAGE_PROTOCOL protocol)
52 	{	
53 		packMessageCount++;
54 		req.setSequence(packMessageCount);
55 		req.setSocket(clientSocket);
56 
57 		if(req.getCompressType == RPC_PACKAGE_COMPRESS_TYPE.RPCT_NO)
58 		{
59 			req.setCompressType(this.compressType);
60 		}
61 
62 		deWritefln("rpc client request remote call:%s, id:%s", req.getCallFuncName(), req.getCallFuncId);
63 		return sendPackManage.add(req);
64 	}
65 
66 	void rpcRecvPackageEvent(RpcSocketBaseInterface socket, RpcBinaryPackage pack)
67 	{
68 
69 		if(sendPackManage.remove(pack.getSequenceId))
70 		{
71 			log(format("client recv package event, hander len:%s, package size:%s, ver:%s, func id:%s, sequence id:%s, body size:%s, compress:%s", 
72 				pack.getHanderSize, pack.getPackgeSize, pack.getVersion, pack.getFuncId, pack.getSequenceId, pack.getBodySize, pack.getCompressType));
73 
74 
75 			RpcPackageBase packageBase;
76 			
77 			switch(pack.getSerializedType)
78 			{
79 				case RPC_PACKAGE_PROTOCOL.TPP_JSON:break;
80 				case RPC_PACKAGE_PROTOCOL.TPP_XML: break;
81 				case RPC_PACKAGE_PROTOCOL.TPP_PROTO_BUF: break;
82 				case RPC_PACKAGE_PROTOCOL.TPP_FLAT_BUF:  break;
83 				case RPC_PACKAGE_PROTOCOL.TPP_CAPNP_BUF: break;
84 					
85 				default:
86 					logWarning("unpack serialized type is failed!, type:%d", pack.getSerializedType());
87 			}
88 
89 			auto rpcResp = new RpcRequest(socket);
90 
91 			rpcResp.setSequence(pack.getSequenceId());
92 			rpcResp.setNonblock(pack.getNonblock());
93 			rpcResp.setCompressType(pack.getCompressType);
94 			rpcResp.bindFunc(pack.getFuncId());
95 			rpcResp.push(pack.getPayload());
96 
97 			if(pack.getStatusCode != RPC_PACKAGE_STATUS_CODE.RPSC_OK)
98 			{
99 				logWarning("server recv binary package is failed, hander len:%s, package size:%s, ver:%s, sequence id:%s, body size:%s, status code:%s", 
100 					pack.getHanderSize, pack.getPackgeSize, pack.getVersion, pack.getSequenceId, pack.getBodySize, pack.getStatusCode);
101 				
102 				rpcResp.setStatus(RESPONSE_STATUS.RS_FAILD);
103 				
104 			}else
105 			{
106 				rpcResp.setStatus(RESPONSE_STATUS.RS_OK);	
107 				log(format("rpc server response call, func:%s, arg num:%s", rpcResp.getCallFuncName(), rpcResp.getArgsNum()));			
108 			}
109 			
110 			if(pack.getNonblock)
111 			{
112 				log(format("async call from rpc server response, func:%s, arg num:%s", rpcResp.getCallFuncName(), rpcResp.getArgsNum()));			
113 			}else
114 			{
115 				log(format("sync call from rpc server response, func:%s, arg num:%s", rpcResp.getCallFuncName(), rpcResp.getArgsNum()));			
116 			}
117 
118 			auto callback = rpcCallbackMap.get(rpcResp.getCallFuncId, null);
119 			
120 			if(callback !is null)
121 			{
122 				callback(rpcResp);
123 			}else
124 			{
125 				logError("server rpc call function is not bind, function name:%s", rpcResp.getCallFuncName);
126 			}
127 
128 		}else
129 		{
130 			logWarning("Accept error, client recv response failed, package is timeout!!!, hander len:%s, package size:%s, ver:%s, sequence id:%s, body size:%s", 
131 				pack.getHanderSize, pack.getPackgeSize, pack.getVersion, pack.getSequenceId, pack.getBodySize);
132 		}
133 	}
134 
135 	void rpcSendPackageEvent(RpcResponse rpcResp)
136 	{
137 		switch(rpcResp.getStatus)
138 		{
139 			case RESPONSE_STATUS.RS_TIMEOUT:
140 				logWarning("response timeout, func:%s, start time:%s, time:%s, sequence:%s", rpcResp.getCallFuncName, rpcResp.getTimestamp, rpcResp.getTimeout, rpcResp.getSequence);
141 				break;
142 			
143 			case RESPONSE_STATUS.RS_FAILD:
144 				logWarning("request failed, func:%s, start time:%s, time:%s, sequence:%s", rpcResp.getCallFuncName, rpcResp.getTimestamp, rpcResp.getTimeout, rpcResp.getSequence);
145 				break;
146 
147 			default:
148 				logWarning("rpc send package event is fatal!!, event type error!");
149 		}
150 
151 			auto callback = rpcCallbackMap.get(rpcResp.getCallFuncId, null);
152 
153 			if(callback !is null)
154 			{
155 				callback(rpcResp);
156 			}else
157 			{
158 				logError("server rpc call function is not bind, function name:%s", rpcResp.getCallFuncName);
159 			}
160 	}
161 
162 
163 	void socketEvent(RpcSocketBaseInterface socket, const SOCKET_STATUS status,const string statusStr)
164 	{
165 		// logInfo("client socket status info:%s", statusStr);
166 		switch(status)
167 		{
168 			case SOCKET_STATUS.SE_CONNECTD:
169 				 auto t = task(&clientSocketEvent.connectd, socket); 
170 				 taskPool.put(t);
171 				 break;
172 
173 			case SOCKET_STATUS.SE_DISCONNECTD: 
174 				auto t = task(&clientSocketEvent.disconnectd, socket); 
175 				taskPool.put(t);
176 				break;
177 			
178 			case SOCKET_STATUS.SE_READ_FAILED : 
179 				 auto t = task(&clientSocketEvent.readFailed, socket); 
180 				 taskPool.put(t);
181 				 break;
182 
183 			case SOCKET_STATUS.SE_WRITE_FAILED: 
184 				auto t = task(&clientSocketEvent.writeFailed, socket); 
185 				taskPool.put(t);
186 				break;
187 		
188 			default:
189 				logError("client socket status is fatal!!", statusStr);
190 				return;
191 		}
192 
193 	}
194 
195 	void connect(string ip, ushort port, AsynchronousChannelSelector sel)
196 	{
197 		clientSocket = new RpcClientSocket(ip, port, sel, this);
198 	}
199 
200 	bool write(byte[] data) {
201 		return clientSocket.write(data);
202 	}
203 
204 	void reConnect()
205 	{
206 		clientSocket.reConnect();
207 	}
208 
209 	ulong getWaitResponseNum()
210 	{
211 		return sendPackManage.getWaitResponseNum;
212 	}
213 
214 	void setSocketCompress(RPC_PACKAGE_COMPRESS_TYPE type)
215 	{
216 		compressType = type;
217 	}
218 	
219 private:
220 	RpcSendPackageManage sendPackManage;
221 	RPC_PACKAGE_COMPRESS_TYPE compressType;
222 
223 	ReponsCallback[size_t] rpcCallbackMap;
224 
225 	ulong packMessageCount;
226 
227 	RpcClientSocket clientSocket;
228 	ClientSocketEventInterface clientSocketEvent;
229 }