0%

Java Interview Guide

Practical, concise, and battle-tested notes for Java, Spring/Spring Cloud, databases, computer networks, OS, Redis, message queues, and distributed systems.
I hope it can help you to know a little about java backend or help you to make a good preparation for the interview.


📚 Table of Contents


🌐 Online Reading

This repository is published as a GitHub Pages site:
📖 Open Online Version

  • Best for mobile/tablet reading

📅 Future Plans

  • Add Design Patterns section
  • Add Elasticsearch and related tools
  • Continuous content updates
  • Feedback and suggestions are welcome! 🙌

📄 License

Apache-2.0 License

Based on SpringBoot, Handwritten Simple RPC Framework (Three)

image-20230522163034434

Continuing from the previous chapter, after implementing server registration and invocation, the next step is to implement client functionality, which mainly includes load balancing, rate limiting, request sending, and service discovery. Next, we will implement the following features in the order of an RPC call process.

A single request:

Before implementing the client, it’s first necessary to consider what needs to be sent in a single request.

First, the current service name method name, as well as the corresponding parameters and parameter types, are needed, otherwise the server cannot perform the corresponding reflection call based on the request.

Second, the request should include the parameters within @RpcConsumer so that the server can locate the correct service.

Finally, the request should include a unique value for this request to facilitate traceability.

At this point, the basic parameters required for a request have been completed.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class RpcRequest implements Serializable {

private static final long serialVersionUID = 8509587559718339795L;
/**
* traceId
*/
private String traceId;
/**
* interface name
*/
private String serviceName;
/**
* method name
*/
private String methodName;
/**
* parameters
*/
private Object[] parameters;
/**
* parameter types
*/
private Class<?>[] paramTypes;
/**
* version
*/
private String version;
/**
* group
*/
private String project;

private String group;

/**
* generate service name,use to distinguish different service,and * can be
* split to get the service name
*/
public String fetchRpcServiceName() {
return this.getProject() + "*" + this.getGroup() + "*" + this.getServiceName() + "*" + this.getVersion();
}

}

Service Proxy

First step, during the Spring startup process, scan all classes annotated with @RpcConsumer to generate proxies. Subsequent calls to methods in these classes will invoke the proxy methods, which then initiate requests.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@Component
public class RpcBeanPostProcessor implements BeanPostProcessor {

private final RpcServiceRegistryAdapter adapter;

private final RpcSendingServiceAdapter sendingServiceAdapter;

public RpcBeanPostProcessor() {
this.adapter = SingletonFactory.getInstance(RpcServiceRegistryAdapterImpl.class);;
this.sendingServiceAdapter = ExtensionLoader.getExtensionLoader(RpcSendingServiceAdapter.class)
.getExtension(RpcRequestSendingEnum.NETTY.getName());
}

/**
* register service
*
* @param bean
* @param beanName
* @return
* @throws BeansException
*/
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
LogUtil.info("start process register service: {}", bean);
// register service
if (bean.getClass().isAnnotationPresent(RpcProvider.class)) {
RpcProvider annotation = bean.getClass().getAnnotation(RpcProvider.class);
// build rpc service config
RpcServiceConfig serviceConfig = RpcServiceConfig.builder()
.service(bean)
.project(annotation.project())
.version(annotation.version())
.group(annotation.group())
.build();
LogUtil.info("register service: {}", serviceConfig);
adapter.registryService(serviceConfig);
}
return bean;
}

/**
* proxy and injection of consumers
*
* @param bean
* @param beanName
* @return
* @throws BeansException
*/
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class<?> toBeProcessedBean = bean.getClass();
Field[] declaredFields = toBeProcessedBean.getDeclaredFields();
for (Field declaredField : declaredFields) {
if (declaredField.isAnnotationPresent(RpcConsumer.class)) {
RpcConsumer annotation = declaredField.getAnnotation(RpcConsumer.class);
// build rpc service config
RpcServiceConfig serviceConfig = RpcServiceConfig.builder()
.project(annotation.project())
.version(annotation.version())
.group(annotation.group())
.build();
// create the proxy bean Factory and the proxy bean
RpcServiceProxy proxy = new RpcServiceProxy(sendingServiceAdapter, serviceConfig);
Object rpcProxy = proxy.getProxy(declaredField.getType());
declaredField.setAccessible(true);
try {
LogUtil.info("create service proxy: {}", bean);
declaredField.set(bean, rpcProxy);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}
return bean;
}
}

Next, in the invoke method of the proxy class, implement the assembly and invocation of the request. Simultaneously, retrieve the response value from the Future and return it to the caller.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class RpcServiceProxy implements InvocationHandler {

private final RpcSendingServiceAdapter sendingServiceAdapter;

private final RpcServiceConfig config;

public RpcServiceProxy(RpcSendingServiceAdapter sendingServiceAdapter, RpcServiceConfig config) {
this.sendingServiceAdapter = sendingServiceAdapter;
this.config = config;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) {
LogUtil.info("invoked method: [{}]", method.getName());
RpcRequest rpcRequest = buildRequest(method,args);

RpcResponse<Object> rpcResponse = null;
CompletableFuture<RpcResponse<Object>> completableFuture =
(CompletableFuture<RpcResponse<Object>>)sendingServiceAdapter.sendRpcRequest(rpcRequest);
try {
rpcResponse = completableFuture.get();
return rpcResponse.getData();
} catch (Exception e) {
LogUtil.error("occur exception:", e);
}
return null;
}

/**
* get the proxy object
*/
@SuppressWarnings("unchecked")
public <T> T getProxy(Class<T> clazz) {
return (T)Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[] {clazz}, this);
}

private RpcRequest buildRequest(Method method,Object[] args){
RpcRequest rpcRequest = RpcRequest.builder()
.methodName(method.getName())
.parameters(args)
.serviceName(method.getDeclaringClass().getName())
.paramTypes(method.getParameterTypes())
.traceId(UUID.randomUUID().toString())
.project(config.getProject())
.version(config.getVersion())
.group(config.getGroup())
.build();
return rpcRequest;
}
}

Sending Request:

The core method of the client is to send requests, and there are multiple methods for sending requests. Here, only the implementation based on Netty’s Nio is demonstrated. Below is a complete sequence.

WX20230530-145829@2x

First, implement the send method, which should include the functionality of finding addresses and sending requests.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
public class RpcSendingServiceAdapterImpl implements RpcSendingServiceAdapter {

/**
* EventLoopGroup is a multithreaded event loop that handles I/O operation.
*/
private final EventLoopGroup eventLoopGroup;

/**
* Bootstrap helt setting and start netty client
*/
private final Bootstrap bootstrap;

/**
* Service discovery
*/
private final RpcServiceFindingAdapter findingAdapter;

/**
* Channel manager,mapping channel and address
*/
private final AddressChannelManager addressChannelManager;

/**
* Waiting process request queue
*/
private final WaitingProcessRequestQueue waitingProcessRequestQueue;

public RpcSendingServiceAdapterImpl() {
this.findingAdapter = ExtensionLoader.getExtensionLoader(RpcServiceFindingAdapter.class)
.getExtension(ServiceDiscoveryEnum.ZK.getName());
this.addressChannelManager = SingletonFactory.getInstance(AddressChannelManager.class);
this.waitingProcessRequestQueue = SingletonFactory.getInstance(WaitingProcessRequestQueue.class);
// initialize
eventLoopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
// The timeout period for the connection.
// If this time is exceeded or if the connection cannot be
// established, the connection fails.
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
// If no data is sent to the server within 15 seconds, a
// heartbeat request is sent
p.addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));
p.addLast(new RpcMessageEncoder());
p.addLast(new RpcMessageDecoder());
p.addLast(new NettyRpcClientHandler());
}
});
}

@Override
public Object sendRpcRequest(RpcRequest rpcRequest) {
CompletableFuture<RpcResponse<Object>> result = new CompletableFuture<>();
InetSocketAddress address = findServiceAddress(rpcRequest);
Channel channel = fetchAndConnectChannel(address);
if (channel.isActive()) {
addToProcessQueue(rpcRequest.getTraceId(), result);
RpcData rpcData = prepareRpcData(rpcRequest);
sendRpcData(channel, rpcData, result);
} else {
log.error("Send request[{}] failed", rpcRequest);
throw new IllegalStateException();
}
return result;
}
private InetSocketAddress findServiceAddress(RpcRequest rpcRequest) {
return findingAdapter.findServiceAddress(rpcRequest);
}

private void addToProcessQueue(String traceId, CompletableFuture<RpcResponse<Object>> result) {
waitingProcessRequestQueue.put(traceId, result);
}

private RpcData prepareRpcData(RpcRequest rpcRequest) {
return RpcData.builder()
.data(rpcRequest)
.serializeMethodCodec(SerializationTypeEnum.HESSIAN.getCode())
.compressType(CompressTypeEnum.GZIP.getCode())
.messageType(RpcConstants.REQUEST_TYPE)
.build();
}
private void sendRpcData(Channel channel, RpcData rpcData, CompletableFuture<RpcResponse<Object>> result) {
channel.writeAndFlush(rpcData).addListener((ChannelFutureListener)future -> {
if (future.isSuccess()) {
LogUtil.info("client send message: [{}]", rpcData);
} else {
future.channel().close();
result.completeExceptionally(future.cause());
LogUtil.error("Send failed:", future.cause());
}
});
}

private Channel fetchAndConnectChannel(InetSocketAddress address) {
Channel channel = addressChannelManager.get(address);
if (channel == null) {
// connect to service to get new address and rebuild the channel
channel = connect(address);
addressChannelManager.set(address, channel);
}
return channel;
}

private Channel connect(InetSocketAddress address) {
CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
bootstrap.connect(address).addListener((ChannelFutureListener)future -> {
if (future.isSuccess()) {
// set channel to future
LogUtil.info("The client has connected [{}] successful!", address.toString());
completableFuture.complete(future.channel());
} else {
LogUtil.error("The client failed to connect to the server [{}],future", address.toString(), future);
throw new IllegalStateException();
}
});
Channel channel = null;
try {
channel = completableFuture.get();
} catch (Exception e) {
LogUtil.error("occur exception when connect to server:", e);
}
return channel;
}

public Channel getChannel(InetSocketAddress inetSocketAddress) {
Channel channel = addressChannelManager.get(inetSocketAddress);
if (channel == null) {
channel = connect(inetSocketAddress);
addressChannelManager.set(inetSocketAddress, channel);
}
return channel;
}
}

In this class, the core method is sendRpcRequest, which is responsible for obtaining services, creating connections, creating a Future task, and sending requests.

Discover services

The process of discovering services can include:

1. Pulling the service address list from the registry

2. Obtaining the specific type of service through a load balancing algorithm.

Get the address

First, implement the first step (here, caching can be used for further optimization; in this project, zk uses a ConcurrentHashMap to replace caching, detailed code can be seen in CuratorClient):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class RpcServiceFindingAdapterImpl implements RpcServiceFindingAdapter {

private final LoadBalanceService loadBalanceService;

public RpcServiceFindingAdapterImpl() {
this.loadBalanceService = ExtensionLoader.getExtensionLoader(LoadBalanceService.class).getExtension(LOAD_BALANCE);
}

@Override
public InetSocketAddress findServiceAddress(RpcRequest rpcRequest) {
String serviceName = rpcRequest.fetchRpcServiceName();
CuratorFramework zkClient = CuratorClient.getZkClient();
List<String> serviceAddresseList = CuratorClient.getChildrenNodes(zkClient, serviceName);
if (CollectionUtils.isEmpty(serviceAddresseList)) {
throw new RuntimeException("no service available, serviceName: " + serviceName);
}

String service = loadBalanceService.selectServiceAddress(serviceAddresseList, rpcRequest);
if (StringUtils.isBlank(service)) {
throw new RuntimeException("no service available, serviceName: " + serviceName);
}
String[] socketAddressArray = service.split(":");
String host = socketAddressArray[0];
int port = Integer.parseInt(socketAddressArray[1]);
return new InetSocketAddress(host, port);
}
}
Load Balancing - Consistent Hashing Algorithm
Definition:

Consistent hashing algorithm is an algorithm used for data sharding and load balancing in distributed systems. It introduces the concepts of virtual nodes and hash rings to minimize the need for data migration when nodes are dynamically scaled up or down, improving system stability and performance. It is widely used in scenarios such as distributed caching, load balancing, etc.

Implementation:

Hash value calculation

First, according to the consistent hashing algorithm, we need to generate hash values based on the corresponding services. In the following implementation, the input is first passed through the SHA-256 algorithm to produce a 32-byte (256-bit) hash value.

However, such a hash value is too long and not convenient to handle, so we need to shorten it. At the same time, mapping multiple hash values to a node can improve the distribution uniformity of the consistent hashing algorithm, because each node will have multiple hash values in the hash space, which can help reduce the impact of hash space redistribution caused by the addition or removal of nodes.

The calculateHash function will take 8 bytes from the start point j of the obtained 256-bit hash value to generate a new Long-type hash value.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
protected static byte[] md5Hash(String input) {
MessageDigest messageDigest = null;
try {
messageDigest = MessageDigest.getInstance("SHA-256");
byte[] hashBytes = messageDigest.digest(input.getBytes(StandardCharsets.UTF_8));
messageDigest.update(hashBytes);
return messageDigest.digest();
} catch (NoSuchAlgorithmException e) {
LogUtil.error("No such algorithm exception: {}", e.getMessage());
throw new RuntimeException(e);
}

}

protected static Long calculateHash(byte[] digest, int idx) {
if (digest.length < (idx + 1) * 8) {
throw new IllegalArgumentException("Insufficient length of digest");
}

long hash = 0;
// 8 bytes digest,a byte is 8 bits like :1321 2432
// each loop choose a byte to calculate hash,and shift i*8 bits
for (int i = 0; i < 8; i++) {
hash |= (255L & (long)digest[i + idx * 8]) << (8 * i);
}
return hash;
}

Implement a virtual node selector.

According to the definition of the consistent hashing algorithm, a virtual node selector needs to generate multiple virtual nodes for the service and map each node to multiple hash values, finally obtaining the nearest node based on the passed hash value and returning it to the caller.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
private static class ConsistentHashLoadBalanceSelector {
// hash to virtual node list
private final TreeMap<Long, String> virtualInvokers;

private ConsistentHashLoadBalanceSelector(List<String> serviceUrlList, int virtualNodeNumber) {
this.virtualInvokers = new TreeMap<>();
// generate service address virtual node]
// one address may map to multiple virtual nodes
// use the md5 hash algorithm to generate the hash value of the
// virtual node
LogUtil.info("init add serviceUrlList:{}", serviceUrlList);
for (String serviceNode : serviceUrlList) {
addVirtualNode(serviceNode, virtualNodeNumber);
}

}

private void addVirtualNode(String serviceNode, int virtualNodeNumber) {
for (int i = 0; i < virtualNodeNumber / 8; i++) {
String virtualNodeName = serviceNode + "#" + i;
byte[] md5Hash = md5Hash(virtualNodeName);
// md5Hash have 32 bytes
// use 8 byte for each virtual node
for (int j = 0; j < 4; j++) {
Long hash = calculateHash(md5Hash, j);
virtualInvokers.put(hash, serviceNode);
}
}
}

public String select(String rpcServiceKey) {
byte[] digest = md5Hash(rpcServiceKey);
// use first 8 byte to get hash
return selectForKey(calculateHash(digest, 0));
}

public String selectForKey(long hashCode) {
Map.Entry<Long, String> entry = virtualInvokers.tailMap(hashCode, true).firstEntry();

if (entry == null) {
entry = virtualInvokers.firstEntry();
}

return entry.getValue();
}

}

Implement a complete load balancing method

Use the hash of the interface name and the available service list as the key to cache the corresponding consistent hashing selector. If it exists, directly obtain a load node from the existing hash selector. If it does not exist, create a new one.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
public class ConsistentHashLoadBalanceService implements LoadBalanceService {

private final Map<String, ConsistentHashLoadBalanceSelector> serviceToSelectorMap = new ConcurrentHashMap<>();

private static class ConsistentHashLoadBalanceSelector {
// hash to virtual node list
private final TreeMap<Long, String> virtualInvokers;

private ConsistentHashLoadBalanceSelector(List<String> serviceUrlList, int virtualNodeNumber) {
this.virtualInvokers = new TreeMap<>();
// generate service address virtual node]
// one address may map to multiple virtual nodes
// use the md5 hash algorithm to generate the hash value of the
// virtual node
LogUtil.info("init add serviceUrlList:{}", serviceUrlList);
for (String serviceNode : serviceUrlList) {
addVirtualNode(serviceNode, virtualNodeNumber);
}

}

private void addVirtualNode(String serviceNode, int virtualNodeNumber) {
for (int i = 0; i < virtualNodeNumber / 8; i++) {
String virtualNodeName = serviceNode + "#" + i;
byte[] md5Hash = md5Hash(virtualNodeName);
// md5Hash have 32 bytes
// use 8 byte for each virtual node
for (int j = 0; j < 4; j++) {
Long hash = calculateHash(md5Hash, j);
virtualInvokers.put(hash, serviceNode);
}
}
}

public String select(String rpcServiceKey) {
byte[] digest = md5Hash(rpcServiceKey);
// use first 8 byte to get hash
return selectForKey(calculateHash(digest, 0));
}

public String selectForKey(long hashCode) {
Map.Entry<Long, String> entry = virtualInvokers.tailMap(hashCode, true).firstEntry();

if (entry == null) {
entry = virtualInvokers.firstEntry();
}

return entry.getValue();
}

}

protected static byte[] md5Hash(String input) {
MessageDigest messageDigest = null;
try {
messageDigest = MessageDigest.getInstance("SHA-256");
byte[] hashBytes = messageDigest.digest(input.getBytes(StandardCharsets.UTF_8));
messageDigest.update(hashBytes);
return messageDigest.digest();
} catch (NoSuchAlgorithmException e) {
LogUtil.error("No such algorithm exception: {}", e.getMessage());
throw new RuntimeException(e);
}

}

protected static Long calculateHash(byte[] digest, int idx) {
if (digest.length < (idx + 1) * 8) {
throw new IllegalArgumentException("Insufficient length of digest");
}

long hash = 0;
// 8 bytes digest,a byte is 8 bits like :1321 2432
// each loop choose a byte to calculate hash,and shift i*8 bits
for (int i = 0; i < 8; i++) {
hash |= (255L & (long)digest[i + idx * 8]) << (8 * i);
}
return hash;
}

/**
* Choose one from the list of existing service addresses list
*
* @param serviceUrlList Service address list
* @param rpcRequest
* @return
*/
@Override
public String selectServiceAddress(List<String> serviceUrlList, RpcRequest rpcRequest) {
int serviceListHash = System.identityHashCode(serviceUrlList);
String interfaceName = rpcRequest.getServiceName();
String selectorKey = interfaceName + serviceListHash;

ConsistentHashLoadBalanceSelector consistentHashLoadBalanceSelector = serviceToSelectorMap
.computeIfAbsent(selectorKey, key -> new ConsistentHashLoadBalanceSelector(serviceUrlList, VIRTUAL_NODES));

return consistentHashLoadBalanceSelector.select(interfaceName + Arrays.stream(rpcRequest.getParameters()));
}

}

Send request

Send request
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
  @Override
public Object sendRpcRequest(RpcRequest rpcRequest) {
CompletableFuture<RpcResponse<Object>> result = new CompletableFuture<>();
InetSocketAddress address = findServiceAddress(rpcRequest);
Channel channel = fetchAndConnectChannel(address);
if (channel.isActive()) {
addToProcessQueue(rpcRequest.getTraceId(), result);
RpcData rpcData = prepareRpcData(rpcRequest);
sendRpcData(channel, rpcData, result);
} else {
log.error("Send request[{}] failed", rpcRequest);
throw new IllegalStateException();
}
return result;
}

private void addToProcessQueue(String traceId, CompletableFuture<RpcResponse<Object>> result) {
waitingProcessRequestQueue.put(traceId, result);
}

private RpcData prepareRpcData(RpcRequest rpcRequest) {
return RpcData.builder()
.data(rpcRequest)
.serializeMethodCodec(SerializationTypeEnum.HESSIAN.getCode())
.compressType(CompressTypeEnum.GZIP.getCode())
.messageType(RpcConstants.REQUEST_TYPE)
.build();
}
private void sendRpcData(Channel channel, RpcData rpcData, CompletableFuture<RpcResponse<Object>> result) {
channel.writeAndFlush(rpcData).addListener((ChannelFutureListener)future -> {
if (future.isSuccess()) {
LogUtil.info("client send message: [{}]", rpcData);
} else {
future.channel().close();
result.completeExceptionally(future.cause());
LogUtil.error("Send failed:", future.cause());
}
});
}
Use channel to connect to server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private Channel fetchAndConnectChannel(InetSocketAddress address) {
Channel channel = addressChannelManager.get(address);
if (channel == null) {
// connect to service to get new address and rebuild the channel
channel = connect(address);
addressChannelManager.set(address, channel);
}
return channel;
}

private Channel connect(InetSocketAddress address) {
CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
bootstrap.connect(address).addListener((ChannelFutureListener)future -> {
if (future.isSuccess()) {
// set channel to future
LogUtil.info("The client has connected [{}] successful!", address.toString());
completableFuture.complete(future.channel());
} else {
LogUtil.error("The client failed to connect to the server [{}],future", address.toString(), future);
throw new IllegalStateException();
}
});
Channel channel = null;
try {
channel = completableFuture.get();
} catch (Exception e) {
LogUtil.error("occur exception when connect to server:", e);
}
return channel;
}

Consumer handles return values

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class NettyRpcClientHandler extends SimpleChannelInboundHandler<RpcData> {

private final RpcSendingServiceAdapterImpl adapter;

private final WaitingProcessRequestQueue waitingProcessRequestQueue;

public NettyRpcClientHandler() {
this.adapter = SingletonFactory.getInstance(RpcSendingServiceAdapterImpl.class);
this.waitingProcessRequestQueue = SingletonFactory.getInstance(WaitingProcessRequestQueue.class);
}

/**
* heart beat handle
*
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// if the channel is free,close it
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent)evt).state();
if (state == IdleState.WRITER_IDLE) {
LogUtil.info("write idle happen [{}]", ctx.channel().remoteAddress());
Channel channel = adapter.getChannel((InetSocketAddress)ctx.channel().remoteAddress());
RpcData rpcData = new RpcData();
rpcData.setSerializeMethodCodec(SerializationTypeEnum.HESSIAN.getCode());
rpcData.setCompressType(CompressTypeEnum.GZIP.getCode());
rpcData.setMessageType(RpcConstants.HEARTBEAT_REQUEST_TYPE);
rpcData.setData(RpcConstants.PING);
channel.writeAndFlush(rpcData).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
} else {
super.userEventTriggered(ctx, evt);
}
}

/**
* Called when an exception occurs in processing a client message
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LogUtil.error("server exceptionCaught");
cause.printStackTrace();
ctx.close();
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcData rpcData) throws Exception {
LogUtil.info("Client receive message: [{}]", rpcData);
RpcData rpcMessage = new RpcData();
setupRpcMessage(rpcMessage);

if (rpcData.isHeartBeatResponse()) {
LogUtil.info("heart [{}]", rpcMessage.getData());
} else if (rpcData.isResponse()) {
RpcResponse<Object> rpcResponse = (RpcResponse<Object>)rpcData.getData();
waitingProcessRequestQueue.complete(rpcResponse);
}
}

private void setupRpcMessage(RpcData rpcMessage) {
rpcMessage.setSerializeMethodCodec(SerializationTypeEnum.HESSIAN.getCode());
rpcMessage.setCompressType(CompressTypeEnum.GZIP.getCode());
}

}

Based on SpringBoot, handwrite a simple RPC framework (part one)


Code:pjpjsocute/rpc-service: personal rcp attempt (github.com) Technology stack includes: springboot, zookeeper, netty, java spi


RPC definition

Remote Procedure Call (RPC) is a communication mechanism that allows different services to communicate and interact over a network.

Through RPC, one service can make a request to another service and receive a response, just like a local call, without developers manually handling the underlying network communication details. The RPC framework encapsulates the underlying network transmission and provides functions such as defining remote service interfaces, serializing and deserializing data.

Distinguishing RPC from HTTP:

HTTP is an application-layer protocol used for transmitting hypermedia. It facilitates communication between clients and servers. It operates on a request-response model, where the client sends an HTTP request to the server, and the server processes the request and returns an appropriate HTTP response. RPC is more similar to an architectural concept; RPC can be implemented using HTTP or TCP.

RPC Process

A simple RPC architecture is shown as follows:

image-20230522161808172

Implementation method:

A simple RPC call chain:

image-20230522163034434

Server implementation based on netty and ZK

According to the figure above, we first need to implement service registration.

Service registration:

Currently, most RPC frameworks support registration through annotations, and the same approach is used here.

Define the registration annotation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Inherited
public @interface RpcProvider {

/**
* Service group, default value is empty string
*/
String project() default "default";

/**
* Service version, default value is 1.0
*
* @return
*/
String version() default "1.0";

/**
* Service group, default value is empty string
*/
String group() default "default";

}

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.TYPE})
@Inherited
public @interface RpcConsumer {
/**
* Service project, default value is empty string
*/
String project() default "default";

/**
* Service version, default value is 1.0
*
* @return
*/
String version() default "1.0";

/**
* Service group, default value is empty string
*/
String group() default "default";
}

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Import(CustomBeanScannerRegistrar.class)
@Documented
public @interface SimpleRpcApplication {

String[] basePackage();
}

This annotation will define the service version, group (to distinguish different interfaces with the same name in the same project), project name, used for service exposure.

Similarly, an annotation is also needed for consumption; an annotation to define the packages to be scanned

Register the service at startup

First, it is necessary to register the annotation with @provider

Get the package that needs to be scanned, then register the annotated bean into Spring

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class CustomBeanScannerRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware {

private ResourceLoader resourceLoader;

private static final String API_SCAN_PARAM = "basePackage";

private static final String SPRING_BEAN_BASE_PACKAGE = "org.example.ray";

@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
//get the scan annotation and the bean package to be scanned
String[] scanBasePackages = fetchScanBasePackage(importingClassMetadata);
LogUtil.info("scanning packages: [{}]", (Object) scanBasePackages);

// //scan the package and register the bean
// RpcBeanScanner rpcConsumerBeanScanner = new RpcBeanScanner(registry, RpcConsumer.class);
RpcBeanScanner rpcProviderBeanScanner = new RpcBeanScanner(registry, RpcProvider.class);
RpcBeanScanner springBeanScanner = new RpcBeanScanner(registry, Component.class);
if (resourceLoader != null) {
springBeanScanner.setResourceLoader(resourceLoader);
rpcProviderBeanScanner.setResourceLoader(resourceLoader);
}
int rpcServiceCount = rpcProviderBeanScanner.scan(scanBasePackages);
LogUtil.info("rpcServiceScanner扫描的数量 [{}]", rpcServiceCount);
LogUtil.info("scanning RpcConsumer annotated beans end");
}

@Override
public void setResourceLoader(ResourceLoader resourceLoader) {
this.resourceLoader = resourceLoader;
}

private String[] fetchScanBasePackage(AnnotationMetadata importingClassMetadata){
AnnotationAttributes annotationAttributes = AnnotationAttributes.fromMap(importingClassMetadata.getAnnotationAttributes(SimpleRpcApplication.class.getName()));
String[] scanBasePackages = new String[0];
if (annotationAttributes != null) {
scanBasePackages = annotationAttributes.getStringArray(API_SCAN_PARAM);
}
//user doesn't specify the package to scan,use the Application base package
if (scanBasePackages.length == 0) {
scanBasePackages = new String[]{((org.springframework.core.type.StandardAnnotationMetadata) importingClassMetadata).getIntrospectedClass().getPackage().getName()};
}
return scanBasePackages;
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Component
public class RpcBeanPostProcessor implements BeanPostProcessor {

private final RpcServiceRegistryAdapter adapter;

private final RpcSendingServiceAdapter sendingServiceAdapter;

public RpcBeanPostProcessor() {
this.adapter = SingletonFactory.getInstance(RpcServiceRegistryAdapterImpl.class);;
this.sendingServiceAdapter = ExtensionLoader.getExtensionLoader(RpcSendingServiceAdapter.class)
.getExtension(RpcRequestSendingEnum.NETTY.getName());
}

/**
* register service
*
* @param bean
* @param beanName
* @return
* @throws BeansException
*/
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
LogUtil.info("start process register service: {}", bean);
// register service
if (bean.getClass().isAnnotationPresent(RpcProvider.class)) {
RpcProvider annotation = bean.getClass().getAnnotation(RpcProvider.class);
// build rpc service config
RpcServiceConfig serviceConfig = RpcServiceConfig.builder()
.service(bean)
.project(annotation.project())
.version(annotation.version())
.group(annotation.group())
.build();
LogUtil.info("register service: {}", serviceConfig);
adapter.registryService(serviceConfig);
}
return bean;
}
}
Implement the specific method for service registration

To register a service, at least it should include: service provider (IP), service name, and variables in @RpcProvider, so, you can first define an RpcServiceConfig.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class RpcServiceConfig {
/**
* service version
*/
private String version = "";

/**
* target service
*/
private Object service;

/**
* belong to which project
*/
private String project = "";

/**
* group
*/
private String group = "";

/**
* generate service name,use to distinguish different service,and * can be split to get the service name
* @return
*/
public String fetchRpcServiceName() {
return this.getProject() + "*" + this.getGroup() + "*" + this.getServiceName() + "*" + this.getVersion();
}

/**
* get the interface name
*
* @return
*/
public String getServiceName() {
return this.service.getClass().getInterfaces()[0].getCanonicalName();
}

}

Provide 2 methods, register services, and get the corresponding bean based on service names

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface RpcServiceRegistryAdapter {

/**
* @param rpcServiceConfig rpc service related attributes
*/
void registryService(RpcServiceConfig rpcServiceConfig);

/**
* @param rpcClassName rpc class name
* @return service object
*/
Object getService(String rpcClassName);

}

The registration process can be divided into 3 steps:

  • generate address ->

  • register service into Zookeeper ->

  • register into cache.

Here, a ConcurrentHashMap is used to cache services (the method finally calls the Zookeeper API for registration, as it is not closely related to RPC, so it is omitted, and you can refer to the source code directly).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class RpcServiceRegistryAdapterImpl implements RpcServiceRegistryAdapter {

/**
* cache map
*/
private final Map<String, Object> serviceMap = new ConcurrentHashMap<>();

@Override
public void registryService(RpcServiceConfig rpcServiceConfig) {
try {
// first get address and service
String hostAddress = InetAddress.getLocalHost().getHostAddress();
// add service to zk
LogUtil.info("add service to zk,service name{},host:{}", rpcServiceConfig.fetchRpcServiceName(),hostAddress);
registerServiceToZk(rpcServiceConfig.fetchRpcServiceName(),
new InetSocketAddress(hostAddress, PropertiesFileUtil.readPortFromProperties()));
// add service to map cache
registerServiceToMap(rpcServiceConfig);
} catch (UnknownHostException e) {
LogUtil.error("occur exception when getHostAddress", e);
throw new RuntimeException(e);
}

}

@Override
public Object getService(String rpcServiceName) {
Object service = serviceMap.get(rpcServiceName);
if (null == service) {
throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND.getCode(),"service not found");
}
return service;
}

private void registerServiceToZk(String rpcServiceName, InetSocketAddress inetSocketAddress) {
String servicePath = CuratorClient.ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName + inetSocketAddress.toString();
CuratorFramework zkClient = CuratorClient.getZkClient();
CuratorClient.createPersistentNode(zkClient, servicePath);
}

private void registerServiceToMap(RpcServiceConfig rpcServiceConfig) {
String rpcServiceName = rpcServiceConfig.fetchRpcServiceName();
if (serviceMap.containsKey(rpcServiceName)) {
return;
}
serviceMap.put(rpcServiceName, rpcServiceConfig.getService());
}
}

At this point, the registration process for a service is complete.

Java SPI application

Spi definition

In Java, SPI stands for Service Provider Interface. SPI is a mechanism that allows applications to extend their functionality by discovering and loading pluggable components or service providers found on the classpath. It provides a loosely coupled way to enable developers to write code that can interact with multiple implementations without explicitly referencing specific implementation classes.

Difference between API and SPI

API is used to define the interaction rules and conventions between software components, while SPI is a mechanism for achieving the extensibility of pluggable components or services.

API is used to expose functionalities and features for other developers to use and integrate, while SPI is used for dynamically loading and utilizing replaceable components for implementation.

API is developer-oriented, providing programming interfaces and documentation for the correct use and integration of software components. SPI is developer- and framework/application-oriented, used to extend the functionality of frameworks or applications.

API is defined by the caller, specifying the calling methods and parameters. SPI is defined by the callee, allowing the caller to provide implementations.

Spi mechanism

WX20230605-150555@2x

Implementation process

WX20230605-151041@2x

Implementation in Rpc

In the simple RPC framework implementation, an annotation-based SPI mechanism was implemented based on Dubbo. Here, the principle will be briefly introduced.

Implementation:

1

Define an SPI annotation to mark all interfaces that need SPI registration

1
2
3
4
5
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface SPI {
}

Secondly, since to prevent concurrency conflicts, a wrapper class is used to wrap the instance interface to ensure data security in multi-threaded processes.

1
2
3
4
5
6
7
8
9
10
11
12
public class Holder<T> {

private volatile T value;

public T get() {
return value;
}

public void set(T value) {
this.value = value;
}
}

holder can serve as a lock object to ensure security.

Refer to Spring SPI, the configuration file for extensions is defined in the following format: key = ‘full path’, such as zk=org.example.ray.infrastructure.adapter.impl.RpcServiceFindingAdapterImpl

This allows for easy creation, retrieval, and invocation within the system using the key

2

To avoid issues with repeated loading and creation, a map is used as a cache. Additionally, to improve retrieval efficiency, different extension instances are cached for different services.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public final class ExtensionLoader<T> {
//extention path
private static final String SERVICE_DIRECTORY = "META-INF/extension/";
//save extentionloader for load different class
private static final Map<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<>();
//save all instance
private static final Map<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<>();

private final Class<?> type;
//save different service hold instance
private final Map<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<>();
//save different instance
private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<>();

private ExtensionLoader(Class<?> type) {
this.type = type;
}
}

3

Implement methods to retrieve the classloader and getExtension

WX20230605-170936@2x

Based on SpringBoot, Handwritten Simple RPC Framework(II)

image-20230522163034434

Continuing from the previous chapter, after implementing Service Registration, we need to implement service invocation.

Service Execution

An RPC service call should be divided into the following steps:

Request Listening;

Decode Request;

Method Invocation;

Return Result;

Next, the above functions will be implemented in sequence;

Request Listening

A RpcRequest request class needs to be defined, as subsequent processing

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class RpcRequest implements Serializable {

private static final long serialVersionUID = 8509587559718339795L;
/**
* traceId
*/
private String traceId;
/**
* interface name
*/
private String serviceName;
/**
* method name
*/
private String methodName;
/**
* parameters
*/
private Object[] parameters;
/**
* parameter types
*/
private Class<?>[] paramTypes;
/**
* version
*/
private String version;
/**
* group
*/
private String project;

private String group;

/**
* generate service name,use to distinguish different service,and * can be split to get the service name
*/
public String fetchRpcServiceName() {
return this.getProject() +"*"+this.getGroup()+"*"+ this.getServiceName() +"*"+ this.getVersion();
}

}

Listening for requests requires starting a netty server, used for listening to the service of the request.

First, resources such as previously registered services need to be closed upon startup.

Subsequently, resources required by Netty are initialized in order.

Below is a sample Netty startup code, where encoders and decoders need to be added for protocol parsing and heartbeat detection.

At the same time, rate limiting and request handling after decoding need to be added.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
@Component
public class NettyServer {

public NettyServer() {

}

public void start() {
LogUtil.info("netty server init");

ServerShutdownHook.getInstance().registerShutdownHook();

EventLoopGroup listenerGroup = initListenerGroup();
EventLoopGroup workerGroup = initWorkerGroup();
DefaultEventExecutorGroup businessGroup = initBusinessGroup();

LogUtil.info("netty server start");

try {
ServerBootstrap serverBootstrap = configureServerBootstrap(listenerGroup, workerGroup, businessGroup);
bindAndListen(serverBootstrap);
} catch (Exception e) {
LogUtil.error("occur exception when start server:", e);
} finally {
shutdown(listenerGroup, workerGroup, businessGroup);
}

}

private EventLoopGroup initListenerGroup() {
return new NioEventLoopGroup(1);
}

private EventLoopGroup initWorkerGroup() {
return new NioEventLoopGroup();
}

private DefaultEventExecutorGroup initBusinessGroup() {
return new DefaultEventExecutorGroup(
Runtime.getRuntime().availableProcessors() * 2,
ThreadPoolFactoryUtil.createThreadFactory("netty-server-business-group", false)
);
}

private ServerBootstrap configureServerBootstrap(EventLoopGroup listenerGroup, EventLoopGroup workerGroup, DefaultEventExecutorGroup businessGroup) {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(listenerGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_BACKLOG, 128)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
pipeline.addLast(new RpcMessageEncoder());
pipeline.addLast(new RpcMessageDecoder());
pipeline.addLast(new DefaultTrafficBlockHandler());
pipeline.addLast(businessGroup, new NettyRpcServerHandler());
}
});

return serverBootstrap;
}

private void bindAndListen(ServerBootstrap serverBootstrap) throws UnknownHostException, InterruptedException {
LogUtil.info("netty server bind port:{} " , PropertiesFileUtil.readPortFromProperties());
String host = InetAddress.getLocalHost().getHostAddress();
ChannelFuture f = serverBootstrap.bind(host, PropertiesFileUtil.readPortFromProperties()).sync();
f.channel().closeFuture().sync();
}

private void shutdown(EventLoopGroup listenerGroup, EventLoopGroup workerGroup, DefaultEventExecutorGroup businessGroup) {
listenerGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
businessGroup.shutdownGracefully();
}

}

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class ServerShutdownHook {

private static final ServerShutdownHook INSTANCE = new ServerShutdownHook();

public static ServerShutdownHook getInstance() {
return INSTANCE;
}

/**
* register shut down hook
*/
public void registerShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// 执行清理操作
clearAll();
}));
}

private void clearAll() {
try {
// 清理注册表
InetSocketAddress inetSocketAddress = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), PropertiesFileUtil.readPortFromProperties());
CuratorClient.clearRegistry(CuratorClient.getZkClient(), inetSocketAddress);
} catch (Exception ignored) {

}
// 关闭线程池
ThreadPoolFactoryUtil.shutDownAllThreadPool();
}

}

Combining with ApplicationRunner to achieve automatic startup of the server

1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
public class NettyServerRunner implements ApplicationRunner {

@Autowired
private NettyServer nettyServer;

public NettyServerRunner() {}

@Override
public void run(ApplicationArguments args) throws Exception {
nettyServer.start();
}
}

Serialization

This project only implements Hessian serialization and gzip compression/decompression by default, and there are many tutorials on this, so it won’t be covered here. The specific code can be found in the org.example.ray.infrastructure.serialize package and org.example.ray.infrastructure.compress package in the source code

Encoding and Protocol

After implementing the service, we need to sequentially add encoding and processing classes for it.

Before implementing the encoding service, it is first necessary to determine the underlying encoding protocol.

Protocol

This project references some existing protocol designs and selects a relatively simple protocol design approach, as shown in the figure below:

image-20230522174913202

The protocol consists of a 16-byte header and a body.

The 0-4 bytes are the magic code, used for verification.

The 4-5 bytes are the custom protocol version.

The 5-8 bytes indicate the length of the entire message, used for decoding.

8-9 define the message type, including request, response, heartbeat request, and heartbeat response.

10 indicates the encoding method

11 indicates the compression method

12-16 is an integer, representing the request number

Java pojo as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class RpcData {
/**
* rpc message type
*/
private byte messageType;
/**
* serialization type
*/
private byte serializeMethodCodec;
/**
* compress type
*/
private byte compressType;
/**
* request id
*/
private int requestId;
/**
* request data
*/
private Object data;

public boolean isHeatBeatRequest() {
return messageType == RpcConstants.HEARTBEAT_REQUEST_TYPE;
}

public boolean canSendRequest() {
return messageType != RpcConstants.HEARTBEAT_REQUEST_TYPE
&& messageType != RpcConstants.HEARTBEAT_RESPONSE_TYPE;
}

public boolean isHeartBeatResponse() {
return messageType == RpcConstants.HEARTBEAT_RESPONSE_TYPE;
}

public boolean isResponse() {
return messageType == RpcConstants.RESPONSE_TYPE;
}
}

After understanding the protocol, implement decoding

Decoding

The LengthFieldBasedFrameDecoder decoder can refer to the following article

1
https://zhuanlan.zhihu.com/p/95621344"

Based on understanding the LengthFieldBasedFrameDecoder decoder, the decoding process is actually not complex. It mainly consists of three parts: decoding the header, verification, and decoding the body. The specific implementation can be referred to in the code and comments.

The decoding part uses Java SPI, allowing customization of serialization and decompression methods. This part can be referred to in the code on GitHub, or the SPI part can be replaced with fixed serialization and decompression methods.

This project by default only implements Hessian serialization and gzip compression/decompression. There are many tutorials on this, so it is introduced here. The specific code can be found in the org.example.ray.infrastructure.serialize package and org.example.ray.infrastructure.compress package in the source code.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
public class RpcMessageDecoder extends LengthFieldBasedFrameDecoder {

public RpcMessageDecoder() {
// lengthFieldOffset: magic code is 4B, and version is 1B, and then full
// length. so value is 5
// lengthFieldLength: full length is 4B. so value is 4
// lengthAdjustment: full length include all data and read 9 bytes
// before, so the left length is (fullLength-9). so values is -9
// initialBytesToStrip: we will check magic code and version manually,
// so do not strip any bytes. so values is 0
this(8 * 1024 * 1024, 5, 4, -9, 0);
}

public RpcMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment,
int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
}

@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
// get the bytebuf which contains the frame
Object decode = super.decode(ctx, in);
if (decode instanceof ByteBuf) {
ByteBuf byteBuf = (ByteBuf)decode;
// if data not empty, decode it
if (byteBuf.readableBytes() >= RpcConstants.HEAD_LENGTH) {
try {
return decode(byteBuf);
} catch (Exception e) {
LogUtil.error("Decode error:{} ,input:{}", e, byteBuf);
} finally {
byteBuf.release();
}
}
}
return decode;
}

/**
* read byte array from byteBuf
*
* @param byteBuf
* @return
*/
private Object decode(ByteBuf byteBuf) {
LogUtil.info("start decode");
checkMagicCode(byteBuf);
checkVersion(byteBuf);

int fullLength = byteBuf.readInt();
RpcData rpcMessage = decodeRpcMessage(byteBuf);

if (rpcMessage.isHeatBeatRequest()) {
return handleHeatBeatRequest(rpcMessage);
}

if (rpcMessage.isHeartBeatResponse()) {
return handleHeartBeatResponse(rpcMessage);
}

return handleNormalRequest(rpcMessage, byteBuf, fullLength);
}

private RpcData decodeRpcMessage(ByteBuf byteBuf) {
LogUtil.info("start decode RpcMessage data");
byte messageType = byteBuf.readByte();
byte codec = byteBuf.readByte();
byte compress = byteBuf.readByte();
int traceId = byteBuf.readInt();

return RpcData.builder()
.serializeMethodCodec(codec)
.traceId(traceId)
.compressType(compress)
.messageType(messageType)
.build();
}

private RpcData handleHeatBeatRequest(RpcData rpcMessage) {
rpcMessage.setData(RpcConstants.PING);
return rpcMessage;
}

private RpcData handleHeartBeatResponse(RpcData rpcMessage) {
rpcMessage.setData(RpcConstants.PONG);
return rpcMessage;
}

private Object handleNormalRequest(RpcData rpcMessage, ByteBuf byteBuf, int fullLength) {
int bodyLength = fullLength - RpcConstants.HEAD_LENGTH;
if (bodyLength <= 0) {
return rpcMessage;
}
return decodeBody(rpcMessage, byteBuf, bodyLength);
}

private RpcData decodeBody(RpcData rpcMessage, ByteBuf byteBuf, Integer bodyLength) {
LogUtil.info("start decode body");
byte[] bodyBytes = new byte[bodyLength];
byteBuf.readBytes(bodyBytes);
// decompose
String compressName = CompressTypeEnum.getName(rpcMessage.getCompressType());
CompressService extension =
ExtensionLoader.getExtensionLoader(CompressService.class).getExtension(compressName);
bodyBytes = extension.decompress(bodyBytes);
// deserialize
if (rpcMessage.getMessageType() == RpcConstants.REQUEST_TYPE) {
RpcRequest rpcRequest = ExtensionLoader.getExtensionLoader(SerializationService.class)
.getExtension(SerializationTypeEnum.getName(rpcMessage.getSerializeMethodCodec()))
.deserialize(bodyBytes, RpcRequest.class);
rpcMessage.setData(rpcRequest);
} else {
RpcResponse rpcResponse = ExtensionLoader.getExtensionLoader(SerializationService.class)
.getExtension(SerializationTypeEnum.getName(rpcMessage.getSerializeMethodCodec()))
.deserialize(bodyBytes, RpcResponse.class);
rpcMessage.setData(rpcResponse);
}
return rpcMessage;

}

private void checkVersion(ByteBuf byteBuf) {
byte version = byteBuf.readByte();
if (version != RpcConstants.VERSION) {
throw new IllegalArgumentException("version is not compatible: " + version);
}
}

private void checkMagicCode(ByteBuf byteBuf) {
int length = RpcConstants.MAGIC_NUMBER.length;
byte[] magicNumber = new byte[length];
byteBuf.readBytes(magicNumber);
for (int i = 0; i < length; i++) {
if (magicNumber[i] != RpcConstants.MAGIC_NUMBER[i]) {
throw new IllegalArgumentException("Unknown magic code: " + new String(magicNumber));
}
}
}
}
Encoding

The encoding process is relatively simple, just write the corresponding data of each bit according to the protocol

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class RpcMessageEncoder extends MessageToByteEncoder<RpcData> {

private static final AtomicInteger ATOMIC_INTEGER = new AtomicInteger(0);

@Override
protected void encode(ChannelHandlerContext channelHandlerContext, RpcData rpcData, ByteBuf byteBuf) {
try {
//encode head,marked full length index
int fullLengthIndex = encodeHead(rpcData,byteBuf);
// encode body
int fullLength = encodeBody(rpcData, byteBuf);
// back fill full length
encodeLength(fullLengthIndex,fullLength,byteBuf);
} catch (Exception e) {
LogUtil.error("Encode request error:{},data:{}", e, rpcData);
throw new RpcException(RpcErrorMessageEnum.REQUEST_ENCODE_FAIL.getCode(),
RpcErrorMessageEnum.REQUEST_ENCODE_FAIL.getMessage());
}

}
private int encodeHead(RpcData rpcData,ByteBuf byteBuf){
// write magic code and version 0-5
byteBuf.writeBytes(RpcConstants.MAGIC_NUMBER);
byteBuf.writeByte(RpcConstants.VERSION);
// marked full length index.
int fullLengthIndex = byteBuf.writerIndex();
// write placeholder for full length 9+
byteBuf.writerIndex(byteBuf.writerIndex() + 4);
// write message type
byteBuf.writeByte(rpcData.getMessageType());
// write codec
byteBuf.writeByte(rpcData.getSerializeMethodCodec());
// write compress
byteBuf.writeByte(rpcData.getCompressType());
// write requestId
byteBuf.writeInt(ATOMIC_INTEGER.getAndIncrement());
return fullLengthIndex;
}

private int encodeBody(RpcData rpcData,ByteBuf byteBuf){
byte[] bodyBytes = null;
int fullLength = RpcConstants.HEAD_LENGTH;
if (rpcData.canSendRequest()) {
LogUtil.info("serialize request start");
bodyBytes = ExtensionLoader.getExtensionLoader(SerializationService.class)
.getExtension(SerializationTypeEnum.getName(rpcData.getSerializeMethodCodec()))
.serialize(rpcData.getData());
LogUtil.info("serialize request end");

String compressName = CompressTypeEnum.getName(rpcData.getCompressType());
CompressService extension =
ExtensionLoader.getExtensionLoader(CompressService.class).getExtension(compressName);
bodyBytes = extension.compress(bodyBytes);
fullLength += bodyBytes.length;
}
if (bodyBytes != null) {
byteBuf.writeBytes(bodyBytes);
}
return fullLength;
}

private void encodeLength(int fullLengthIndex,int fullLength,ByteBuf byteBuf){
int writeIndex = byteBuf.writerIndex();
byteBuf.writerIndex(fullLengthIndex);
byteBuf.writeInt(fullLength);
byteBuf.writerIndex(writeIndex);
}
}

Request processing and invocation

Here, the SimpleChannelInboundHandler of Netty is used, which can avoid resource release issues

Since the decoding has been implemented before, it is only necessary to perform different processing for different request types.

If it’s a heartbeat request, return a heartbeat response

If it’s a service request, call the service through a dynamic proxy and write the result back to the consumer.

Define a response class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class RpcResponse<T> implements Serializable {

private static final long serialVersionUID = 347966260947189201L;
/**
* request id
*/
private String requestId;
/**
* response code
*/
private Integer code;
/**
* response message
*/
private String message;
/**
* response body
*/
private T data;

/**
* success
* @param data
* @param requestId
* @return
* @param <T>
*/
public static <T> RpcResponse<T> success(T data, String requestId) {
RpcResponse<T> response = new RpcResponse<>();
response.setCode(RpcResponseCodeEnum.SUCCESS.getCode());
response.setMessage(RpcResponseCodeEnum.SUCCESS.getMessage());
response.setRequestId(requestId);
if (null != data) {
response.setData(data);
}
return response;
}

/**
* fail
* @return
* @param <T>
*/
public static <T> RpcResponse<T> fail() {
RpcResponse<T> response = new RpcResponse<>();
response.setCode(RpcResponseCodeEnum.FAIL.getCode());
response.setMessage(RpcResponseCodeEnum.FAIL.getMessage());
return response;
}

}

The core method of serverhandler is channelRead0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
public class NettyRpcServerHandler extends SimpleChannelInboundHandler<RpcData> {
/**
* Read the message transmitted by the server
*/

private final RpcRequestHandler rpcRequestHandler;

public NettyRpcServerHandler() {
this.rpcRequestHandler = SingletonFactory.getInstance(RpcRequestHandler.class);
}

/**
* heart beat handle
*
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// if the channel is free,close it
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent)evt).state();
if (state == IdleState.READER_IDLE) {
LogUtil.info("idle check happen, so close the connection");
ctx.close();
}
} else {
super.userEventTriggered(ctx, evt);
}
}

/**
* Called when an exception occurs in processing a client message
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LogUtil.error("server exceptionCaught");
cause.printStackTrace();
ctx.close();
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcData rpcData) throws Exception {
LogUtil.info("Server receive message: [{}]", rpcData);
RpcData rpcMessage = new RpcData();
setupRpcMessage(rpcMessage);

if (rpcData.isHeatBeatRequest()) {
handleHeartbeat(rpcMessage);
} else {
handleRpcRequest(ctx, rpcData, rpcMessage);
}
ctx.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}

private void setupRpcMessage(RpcData rpcMessage) {
rpcMessage.setSerializeMethodCodec(SerializationTypeEnum.HESSIAN.getCode());
rpcMessage.setCompressType(CompressTypeEnum.GZIP.getCode());
}

private void handleHeartbeat(RpcData rpcMessage) {
rpcMessage.setMessageType(RpcConstants.HEARTBEAT_RESPONSE_TYPE);
rpcMessage.setData(RpcConstants.PONG);
}

private void handleRpcRequest(ChannelHandlerContext ctx, RpcData rpcData, RpcData rpcMessage) throws Exception {
RpcRequest rpcRequest = (RpcRequest)rpcData.getData();

// invoke target method
Object result = rpcRequestHandler.handle(rpcRequest);
LogUtil.info("Server get result: {}", result);

rpcMessage.setMessageType(RpcConstants.RESPONSE_TYPE);
buildAndSetRpcResponse(ctx, rpcRequest, rpcMessage, result);
}

private void
buildAndSetRpcResponse(ChannelHandlerContext ctx, RpcRequest rpcRequest, RpcData rpcMessage, Object result) {
if (canBuildResponse(ctx)) {
// If the channel is active and writable, a successful RPC response is constructed
RpcResponse<Object> rpcResponse = RpcResponse.success(result, rpcRequest.getTraceId());
rpcMessage.setData(rpcResponse);
} else {
// Construct a failed RPC response if the channel is not writable
RpcResponse<Object> rpcResponse = RpcResponse.fail();
rpcMessage.setData(rpcResponse);
LogUtil.error("Not writable now, message dropped,message:{}", rpcRequest);
}
}

private boolean canBuildResponse(ChannelHandlerContext ctx) {
return ctx.channel().isActive() && ctx.channel().isWritable();
}
}

tip: Services cached after registration to zk can be called directly based on dynamic proxies

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class RpcRequestHandler {

private final RpcServiceRegistryAdapter adapter;

public RpcRequestHandler() {
this.adapter = SingletonFactory.getInstance(RpcServiceRegistryAdapterImpl.class);
}

/**
* Processing rpcRequest: call the corresponding method, and then return the
* method
*/
public Object handle(RpcRequest request) {
Object service = adapter.getService(request.fetchRpcServiceName());
return invoke(request, service);
}

/**
* get method execution results
*
* @param rpcRequest client request
* @param service service object
* @return the result of the target method execution
*/
private Object invoke(RpcRequest rpcRequest, Object service) {
Object result;
try {
Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
result = method.invoke(service, rpcRequest.getParameters());
LogUtil.info("service:[{}] successful invoke method:[{}]", rpcRequest.getServiceName(),
rpcRequest.getMethodName());
} catch (NoSuchMethodException | IllegalArgumentException | InvocationTargetException
| IllegalAccessException e) {
LogUtil.error("occur exception when invoke target method,error:{},RpcRequest:{}", e, rpcRequest);
throw new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE.getCode(), RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE.getMessage());
}
return result;
}
}

At this point, the server-side code is complete

Q:

Given a string s, find the longest palindromic substring in s.

A string is a palindrome if it reads the same backward as forward.

Input:

1
2
3
Input: s = "babad"
Output: "bab"
Explanation: "aba" is also a valid answer.

Solution:

If this is your first time encountering such a problem, it might not be easy to immediately think of dynamic programming.

However, with step-by-step reasoning and optimization, DP naturally emerges as a clean solution.


Step 1: Naive Approach

The simplest idea is:

  1. Design a helper method judgeIsPalindrome.
    Using a two-pointer technique, we can check whether String(i, j) is a palindrome in O(n).

  2. Enumerate all substring lengths k, from large to small.
    For each length, check all substrings String(i, i+k-1).
    If a palindrome is found, return it as the result.

This works, but it’s inefficient.

  • We must enumerate all k values (worst case: N).
  • For each k, we may check up to N substrings.
  • This leads to high complexity, which is not feasible.

We need something more efficient.


Step 2: Binary Search Optimization

Instead of enumerating all possible lengths k, we could binary search for the maximum valid k.

This reduces the search to O(log N).
(Interested readers can try implementing this approach.)

However, there’s still redundancy: each new substring is checked independently, without reusing previously computed results.


Step 3: Dynamic Programming Insight

Let’s improve by leveraging earlier computations.

Suppose for some length k_i, we already know whether every substring of length k_i is a palindrome.

Now, consider extending a substring:
If we know String(i, i+k_i-1) is a palindrome, then checking String(i-1, i+k_i) can be done in O(1):

1
dp[i][j] = dp[i+1][j-1] && (s[i] == s[j])

That is:

  • If the inner substring String(i+1, j-1) is a palindrome,
  • and the boundary characters match (s[i] == s[j]),
  • then String(i, j) is also a palindrome.

DP Implementation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public String longestPalindrome(String s) {
int len = s.length();
if (len < 2) {
return s;
}
boolean[][] dp = new boolean[len][len];
for (int i = 0; i < len; i++) {
dp[i][i] = true;
}
int maxLen = 1;
int start = 0;
for (int j = 1; j < len; j++) {
for (int i = j - 1; i >= 0; i--) {
if (s.charAt(i) == s.charAt(j)) {
if (j - i < 3) {
dp[i][j] = true;
} else {
dp[i][j] = dp[i + 1][j - 1];
}
} else {
dp[i][j] = false;
}

if (dp[i][j]) {
int curLen = j - i + 1;
if (curLen > maxLen) {
maxLen = curLen;
start = i;
}
}
}
}
return s.substring(start, start + maxLen);
}

Summary

  • Naive approach: brute force with palindrome checking → O(N³).
  • Optimization: binary search on substring length → O(N² log N).
  • Dynamic Programming: reuse previously computed results → O(N²).

This step-by-step refinement shows how dynamic programming becomes a natural and efficient solution.

Binary Indexed Tree and Its Applications on LeetCode

Why Do We Need a Binary Indexed Tree?

Problem Setup

Question:
Given an integer array input = [1,2,7,4,3], how can we quickly calculate the sum of the first K numbers?

Solution:
A common approach is to build a prefix sum array preSumArray, where preSumArray[i] represents the sum of the first i elements.
With this, the sum of the first N numbers can be retrieved in O(1) time. If we need to perform K queries, the total complexity is O(K).

Making It Harder

Question:
Now suppose we still want to query prefix sums on the same array input = [1,2,7,4,3], but before querying we might increase or decrease the value at index i.

Solution:
If we continue to rely on preSumArray, then every update at position i requires modifying all subsequent entries of the array.
That means each update costs O(N), and K queries with updates would cost O(KN).

If we abandon preSumArray, updates are O(1), but each query would degrade to O(N).

This is where a Binary Indexed Tree (Fenwick Tree) helps us balance both operations efficiently.


Prerequisite: A Binary Trick

A useful bitwise operation in this context is:

1
lowbit(x) = x & (-x)

This extracts the largest power of 2 that divides x, i.e., the value of the rightmost 1 bit.

Examples:

  • 5 & -5 = 1
  • 10 & -10 = 2
  • 12 & -12 = 4

Binary Indexed Tree (Fenwick Tree)

Definition

Conceptually, it’s still an array—similar to a prefix sum array—but instead of storing complete prefix sums, it stores the sum of the last lowbit(i) elements up to index i.

WX20230515-152510@2x

1
2
3
4
5
6
7
8
B(1) = A(1);
B(2) = A(1)+A(2);
B(3) = A(3);
B(4) = A(1)+A(2)+A(3)+A(4);
B(5) = A(5);
B(6) = A(5)+A(6);
B(7) = A(7);
B(8) = A(1)+A(2)+A(3)+A(4)+A(5)+A(6)+A(7)+A(8);

Note: The index of a Binary Indexed Tree must start from 1.


Core Operations

The Binary Indexed Tree mainly supports two operations: prefix sum queries and updates.

Querying Prefix Sums

Example:

1
2
getSum(7) = A(1)+...+A(7) = B(4)+B(6)+B(7)
getSum(6) = B(4)+B(6)

Implementation:

1
2
3
4
5
6
7
public int getSum(int x) {
int res = 0;
for (int i = x; i > 0; i -= lowbit(i)) {
res += bit[i];
}
return res;
}

Recursive form:

1
2
3
4
5
6
public int getSum(int x) {
if (x <= 0) {
return 0;
}
return bit[x] + getSum(x - lowbit(x));
}

Time complexity: O(log N)

For a range sum sum(i, j), simply compute getSum(j) - getSum(i-1).


Updating Values

Example: update(6, 7) means adding 7 to position 6. This requires updating B(6) and B(8).

Implementation:

1
2
3
4
5
public void update(int x, int value) {
for (int i = x; i < bit.length; i += lowbit(i)) {
bit[i] += value;
}
}

Applications on LeetCode

LeetCode 493 — Reverse Pairs

Question:
Given an array nums, a pair (i, j) is called a reverse pair if i < j and nums[i] > 2*nums[j].
Return the total number of reverse pairs.

Example Input:

1
[1,3,2,3,1]

Output:

1
2

Solution:

We can transform the problem into:

For each j, count how many elements to its left are greater than 2 * nums[j].

Steps:

  1. Sort and discretize the array into a mapped range 1..n.
  2. Count occurrences of each number.
  3. Use a BIT to query prefix sums over the mapped values.

Code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class Solution {
class TrieArr {
long[] arr;
public TrieArr(int n) { arr = new long[n]; }
public int lowbit(int x) { return x & -x; }
public int getSum(int x) {
if (x <= 0) return 0;
return (int)(arr[x] + getSum(x - lowbit(x)));
}
public void update(int x, int c) {
for (int i = x; i < arr.length; i += lowbit(i)) arr[i] += c;
}
}

public int reversePairs(int[] nums) {
Map<Long,Integer> map = new HashMap<>();
TreeSet<Long> set = new TreeSet<>();
for (int i : nums) {
set.add((long)i);
set.add((long)i * 2);
}
int index = 1;
while (!set.isEmpty()) {
map.put(set.pollFirst(), index++);
}
TrieArr bit = new TrieArr(map.size() + 1);
int ans = 0;
for (int i = 0; i < nums.length; i++) {
long target = (long)nums[i] * 2;
int l = map.get(target);
ans += bit.getSum(map.size()) - bit.getSum(l);
bit.update(map.get((long)nums[i]), 1);
}
return ans;
}
}

Similar Problems

  • LeetCode 307 — Range Sum Query – Mutable
  • …and many others that require efficient prefix sums with updates.

Q:

You are given an integer array nums and an integer k. You can partition the array into at most k non-empty adjacent subarrays. The score of a partition is the sum of the averages of each subarray.

Note that the partition must use every integer in nums, and that the score is not necessarily an integer.

Return the maximum score you can achieve of all the possible partitions. Answers within 10-6 of the actual answer will be accepted.

Read more »

Preface

Link tables, queues, and stacks are some of the most basic data structures, and link tables are the most basic. Most of the complex data structures that follow evolved from them.

Linked list

A threaded data structure that differs from an array in that it is not necessarily stored sequentially in memory space. To ensure the continuity of elements in a linked list, a pointer is generally used to find the next element.

linkedlist

Read more »

Q:

RandomizedCollection is a data structure that contains a collection of numbers, possibly duplicates (i.e., a multiset). It should support inserting and removing specific elements and also reporting a random element.

Implement the RandomizedCollection class:

  • RandomizedCollection() Initializes the empty RandomizedCollection object.
  • bool insert(int val) Inserts an item val into the multiset, even if the item is already present. Returns true if the item is not present, false otherwise.
  • bool remove(int val) Removes an item val from the multiset if present. Returns true if the item is present, false otherwise. Note that if val has multiple occurrences in the multiset, we only remove one of them.
  • int getRandom() Returns a random element from the current multiset of elements. The probability of each element being returned is linearly related to the number of the same values the multiset contains.

You must implement the functions of the class such that each function works on average O(1) time complexity.

Note: The test cases are generated such that getRandom will only be called if there is at least one item in the RandomizedCollection.

Read more »