/** * generate service name,use to distinguish different service,and * can be * split to get the service name */ public String fetchRpcServiceName() { returnthis.getProject() + "*" + this.getGroup() + "*" + this.getServiceName() + "*" + this.getVersion(); }
/** * get the proxy object */ @SuppressWarnings("unchecked") public <T> T getProxy(Class<T> clazz) { return (T)Proxy.newProxyInstance(clazz.getClassLoader(), newClass<?>[] {clazz}, this); }
/** * Waiting process request queue */ privatefinal WaitingProcessRequestQueue waitingProcessRequestQueue;
publicRpcSendingServiceAdapterImpl() { this.findingAdapter = ExtensionLoader.getExtensionLoader(RpcServiceFindingAdapter.class) .getExtension(ServiceDiscoveryEnum.ZK.getName()); this.addressChannelManager = SingletonFactory.getInstance(AddressChannelManager.class); this.waitingProcessRequestQueue = SingletonFactory.getInstance(WaitingProcessRequestQueue.class); // initialize eventLoopGroup = newNioEventLoopGroup(); bootstrap = newBootstrap(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(newLoggingHandler(LogLevel.INFO)) // The timeout period for the connection. // If this time is exceeded or if the connection cannot be // established, the connection fails. .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) .handler(newChannelInitializer<SocketChannel>() { @Override protectedvoidinitChannel(SocketChannel ch) { ChannelPipelinep= ch.pipeline(); // If no data is sent to the server within 15 seconds, a // heartbeat request is sent p.addLast(newIdleStateHandler(0, 5, 0, TimeUnit.SECONDS)); p.addLast(newRpcMessageEncoder()); p.addLast(newRpcMessageDecoder()); p.addLast(newNettyRpcClientHandler()); } }); }
private Channel fetchAndConnectChannel(InetSocketAddress address) { Channelchannel= addressChannelManager.get(address); if (channel == null) { // connect to service to get new address and rebuild the channel channel = connect(address); addressChannelManager.set(address, channel); } return channel; }
private Channel connect(InetSocketAddress address) { CompletableFuture<Channel> completableFuture = newCompletableFuture<>(); bootstrap.connect(address).addListener((ChannelFutureListener)future -> { if (future.isSuccess()) { // set channel to future LogUtil.info("The client has connected [{}] successful!", address.toString()); completableFuture.complete(future.channel()); } else { LogUtil.error("The client failed to connect to the server [{}],future", address.toString(), future); thrownewIllegalStateException(); } }); Channelchannel=null; try { channel = completableFuture.get(); } catch (Exception e) { LogUtil.error("occur exception when connect to server:", e); } return channel; }
privatestaticclassConsistentHashLoadBalanceSelector { // hash to virtual node list privatefinal TreeMap<Long, String> virtualInvokers;
privateConsistentHashLoadBalanceSelector(List<String> serviceUrlList, int virtualNodeNumber) { this.virtualInvokers = newTreeMap<>(); // generate service address virtual node] // one address may map to multiple virtual nodes // use the md5 hash algorithm to generate the hash value of the // virtual node LogUtil.info("init add serviceUrlList:{}", serviceUrlList); for (String serviceNode : serviceUrlList) { addVirtualNode(serviceNode, virtualNodeNumber); }
}
privatevoidaddVirtualNode(String serviceNode, int virtualNodeNumber) { for (inti=0; i < virtualNodeNumber / 8; i++) { StringvirtualNodeName= serviceNode + "#" + i; byte[] md5Hash = md5Hash(virtualNodeName); // md5Hash have 32 bytes // use 8 byte for each virtual node for (intj=0; j < 4; j++) { Longhash= calculateHash(md5Hash, j); virtualInvokers.put(hash, serviceNode); } } }
public String select(String rpcServiceKey) { byte[] digest = md5Hash(rpcServiceKey); // use first 8 byte to get hash return selectForKey(calculateHash(digest, 0)); }
privatestaticclassConsistentHashLoadBalanceSelector { // hash to virtual node list privatefinal TreeMap<Long, String> virtualInvokers;
privateConsistentHashLoadBalanceSelector(List<String> serviceUrlList, int virtualNodeNumber) { this.virtualInvokers = newTreeMap<>(); // generate service address virtual node] // one address may map to multiple virtual nodes // use the md5 hash algorithm to generate the hash value of the // virtual node LogUtil.info("init add serviceUrlList:{}", serviceUrlList); for (String serviceNode : serviceUrlList) { addVirtualNode(serviceNode, virtualNodeNumber); }
}
privatevoidaddVirtualNode(String serviceNode, int virtualNodeNumber) { for (inti=0; i < virtualNodeNumber / 8; i++) { StringvirtualNodeName= serviceNode + "#" + i; byte[] md5Hash = md5Hash(virtualNodeName); // md5Hash have 32 bytes // use 8 byte for each virtual node for (intj=0; j < 4; j++) { Longhash= calculateHash(md5Hash, j); virtualInvokers.put(hash, serviceNode); } } }
public String select(String rpcServiceKey) { byte[] digest = md5Hash(rpcServiceKey); // use first 8 byte to get hash return selectForKey(calculateHash(digest, 0)); }
protectedstatic Long calculateHash(byte[] digest, int idx) { if (digest.length < (idx + 1) * 8) { thrownewIllegalArgumentException("Insufficient length of digest"); }
longhash=0; // 8 bytes digest,a byte is 8 bits like :1321 2432 // each loop choose a byte to calculate hash,and shift i*8 bits for (inti=0; i < 8; i++) { hash |= (255L & (long)digest[i + idx * 8]) << (8 * i); } return hash; }
/** * Choose one from the list of existing service addresses list * * @param serviceUrlList Service address list * @param rpcRequest * @return */ @Override public String selectServiceAddress(List<String> serviceUrlList, RpcRequest rpcRequest) { intserviceListHash= System.identityHashCode(serviceUrlList); StringinterfaceName= rpcRequest.getServiceName(); StringselectorKey= interfaceName + serviceListHash;
private Channel fetchAndConnectChannel(InetSocketAddress address) { Channelchannel= addressChannelManager.get(address); if (channel == null) { // connect to service to get new address and rebuild the channel channel = connect(address); addressChannelManager.set(address, channel); } return channel; }
private Channel connect(InetSocketAddress address) { CompletableFuture<Channel> completableFuture = newCompletableFuture<>(); bootstrap.connect(address).addListener((ChannelFutureListener)future -> { if (future.isSuccess()) { // set channel to future LogUtil.info("The client has connected [{}] successful!", address.toString()); completableFuture.complete(future.channel()); } else { LogUtil.error("The client failed to connect to the server [{}],future", address.toString(), future); thrownewIllegalStateException(); } }); Channelchannel=null; try { channel = completableFuture.get(); } catch (Exception e) { LogUtil.error("occur exception when connect to server:", e); } return channel; }
/** * Called when an exception occurs in processing a client message */ @Override publicvoidexceptionCaught(ChannelHandlerContext ctx, Throwable cause) { LogUtil.error("server exceptionCaught"); cause.printStackTrace(); ctx.close(); }
/** * generate service name,use to distinguish different service,and * can be split to get the service name */ public String fetchRpcServiceName() { returnthis.getProject() +"*"+this.getGroup()+"*"+ this.getServiceName() +"*"+ this.getVersion(); }
publicRpcMessageDecoder() { // lengthFieldOffset: magic code is 4B, and version is 1B, and then full // length. so value is 5 // lengthFieldLength: full length is 4B. so value is 4 // lengthAdjustment: full length include all data and read 9 bytes // before, so the left length is (fullLength-9). so values is -9 // initialBytesToStrip: we will check magic code and version manually, // so do not strip any bytes. so values is 0 this(8 * 1024 * 1024, 5, 4, -9, 0); }
publicRpcMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) { super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip); }
@Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in)throws Exception { // get the bytebuf which contains the frame Objectdecode=super.decode(ctx, in); if (decode instanceof ByteBuf) { ByteBufbyteBuf= (ByteBuf)decode; // if data not empty, decode it if (byteBuf.readableBytes() >= RpcConstants.HEAD_LENGTH) { try { return decode(byteBuf); } catch (Exception e) { LogUtil.error("Decode error:{} ,input:{}", e, byteBuf); } finally { byteBuf.release(); } } } return decode; }
/** * heart beat handle * * @param ctx * @param evt * @throws Exception */ @Override publicvoiduserEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception { // if the channel is free,close it if (evt instanceof IdleStateEvent) { IdleStatestate= ((IdleStateEvent)evt).state(); if (state == IdleState.READER_IDLE) { LogUtil.info("idle check happen, so close the connection"); ctx.close(); } } else { super.userEventTriggered(ctx, evt); } }
/** * Called when an exception occurs in processing a client message */ @Override publicvoidexceptionCaught(ChannelHandlerContext ctx, Throwable cause) { LogUtil.error("server exceptionCaught"); cause.printStackTrace(); ctx.close(); }
privatevoid buildAndSetRpcResponse(ChannelHandlerContext ctx, RpcRequest rpcRequest, RpcData rpcMessage, Object result) { if (canBuildResponse(ctx)) { // If the channel is active and writable, a successful RPC response is constructed RpcResponse<Object> rpcResponse = RpcResponse.success(result, rpcRequest.getTraceId()); rpcMessage.setData(rpcResponse); } else { // Construct a failed RPC response if the channel is not writable RpcResponse<Object> rpcResponse = RpcResponse.fail(); rpcMessage.setData(rpcResponse); LogUtil.error("Not writable now, message dropped,message:{}", rpcRequest); } }
@Data @NoArgsConstructor @AllArgsConstructor @Builder publicclassRpcServiceConfig { /** * service version */ privateStringversion="";
/** * target service */ private Object service;
/** * belong to which project */ privateStringproject="";
/** * group */ privateStringgroup="";
/** * generate service name,use to distinguish different service,and * can be split to get the service name * @return */ public String fetchRpcServiceName() { returnthis.getProject() + "*" + this.getGroup() + "*" + this.getServiceName() + "*" + this.getVersion(); }
/** * get the interface name * * @return */ public String getServiceName() { returnthis.service.getClass().getInterfaces()[0].getCanonicalName(); }
}
提供2个方法,注册服务与根据服务名得到对应的bean
1 2 3 4 5 6 7 8 9 10 11 12 13 14
publicinterfaceRpcServiceRegistryAdapter {
/** * @param rpcServiceConfig rpc service related attributes */ voidregistryService(RpcServiceConfig rpcServiceConfig);
/** * @param rpcClassName rpc class name * @return service object */ Object getService(String rpcClassName);
@Override publicvoidregistryService(RpcServiceConfig rpcServiceConfig) { try { // first get address and service StringhostAddress= InetAddress.getLocalHost().getHostAddress(); // add service to zk LogUtil.info("add service to zk,service name{},host:{}", rpcServiceConfig.fetchRpcServiceName(),hostAddress); registerServiceToZk(rpcServiceConfig.fetchRpcServiceName(), newInetSocketAddress(hostAddress, PropertiesFileUtil.readPortFromProperties())); // add service to map cache registerServiceToMap(rpcServiceConfig); } catch (UnknownHostException e) { LogUtil.error("occur exception when getHostAddress", e); thrownewRuntimeException(e); }
}
@Override public Object getService(String rpcServiceName) { Objectservice= serviceMap.get(rpcServiceName); if (null == service) { thrownewRpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND.getCode(),"service not found"); } return service; }