/** * generate service name,use to distinguish different service,and * can be split to get the service name */ public String fetchRpcServiceName() { returnthis.getProject() +"*"+this.getGroup()+"*"+ this.getServiceName() +"*"+ this.getVersion(); }
}
Listening for requests requires starting a netty server, used for listening to the service of the request.
First, resources such as previously registered services need to be closed upon startup.
Subsequently, resources required by Netty are initialized in order.
Below is a sample Netty startup code, where encoders and decoders need to be added for protocol parsing and heartbeat detection.
At the same time, rate limiting and request handling after decoding need to be added.
This project only implements Hessian serialization and gzip compression/decompression by default, and there are many tutorials on this, so it won’t be covered here. The specific code can be found in the org.example.ray.infrastructure.serialize package and org.example.ray.infrastructure.compress package in the source code
Encoding and Protocol
After implementing the service, we need to sequentially add encoding and processing classes for it.
Before implementing the encoding service, it is first necessary to determine the underlying encoding protocol.
Protocol
This project references some existing protocol designs and selects a relatively simple protocol design approach, as shown in the figure below:
The protocol consists of a 16-byte header and a body.
The 0-4 bytes are the magic code, used for verification.
The 4-5 bytes are the custom protocol version.
The 5-8 bytes indicate the length of the entire message, used for decoding.
8-9 define the message type, including request, response, heartbeat request, and heartbeat response.
10 indicates the encoding method
11 indicates the compression method
12-16 is an integer, representing the request number
After understanding the protocol, implement decoding
Decoding
The LengthFieldBasedFrameDecoder decoder can refer to the following article
1
https://zhuanlan.zhihu.com/p/95621344"
Based on understanding the LengthFieldBasedFrameDecoder decoder, the decoding process is actually not complex. It mainly consists of three parts: decoding the header, verification, and decoding the body. The specific implementation can be referred to in the code and comments.
The decoding part uses Java SPI, allowing customization of serialization and decompression methods. This part can be referred to in the code on GitHub, or the SPI part can be replaced with fixed serialization and decompression methods.
This project by default only implements Hessian serialization and gzip compression/decompression. There are many tutorials on this, so it is introduced here. The specific code can be found in the org.example.ray.infrastructure.serialize package and org.example.ray.infrastructure.compress package in the source code.
publicRpcMessageDecoder() { // lengthFieldOffset: magic code is 4B, and version is 1B, and then full // length. so value is 5 // lengthFieldLength: full length is 4B. so value is 4 // lengthAdjustment: full length include all data and read 9 bytes // before, so the left length is (fullLength-9). so values is -9 // initialBytesToStrip: we will check magic code and version manually, // so do not strip any bytes. so values is 0 this(8 * 1024 * 1024, 5, 4, -9, 0); }
publicRpcMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) { super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip); }
@Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in)throws Exception { // get the bytebuf which contains the frame Objectdecode=super.decode(ctx, in); if (decode instanceof ByteBuf) { ByteBufbyteBuf= (ByteBuf)decode; // if data not empty, decode it if (byteBuf.readableBytes() >= RpcConstants.HEAD_LENGTH) { try { return decode(byteBuf); } catch (Exception e) { LogUtil.error("Decode error:{} ,input:{}", e, byteBuf); } finally { byteBuf.release(); } } } return decode; }
/** * heart beat handle * * @param ctx * @param evt * @throws Exception */ @Override publicvoiduserEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception { // if the channel is free,close it if (evt instanceof IdleStateEvent) { IdleStatestate= ((IdleStateEvent)evt).state(); if (state == IdleState.READER_IDLE) { LogUtil.info("idle check happen, so close the connection"); ctx.close(); } } else { super.userEventTriggered(ctx, evt); } }
/** * Called when an exception occurs in processing a client message */ @Override publicvoidexceptionCaught(ChannelHandlerContext ctx, Throwable cause) { LogUtil.error("server exceptionCaught"); cause.printStackTrace(); ctx.close(); }
privatevoid buildAndSetRpcResponse(ChannelHandlerContext ctx, RpcRequest rpcRequest, RpcData rpcMessage, Object result) { if (canBuildResponse(ctx)) { // If the channel is active and writable, a successful RPC response is constructed RpcResponse<Object> rpcResponse = RpcResponse.success(result, rpcRequest.getTraceId()); rpcMessage.setData(rpcResponse); } else { // Construct a failed RPC response if the channel is not writable RpcResponse<Object> rpcResponse = RpcResponse.fail(); rpcMessage.setData(rpcResponse); LogUtil.error("Not writable now, message dropped,message:{}", rpcRequest); } }