前言
实现客户端与服务端间通信,在传输层协议 TCP 或 UDP 的基础上,定义客户端与服务端都能识别信息的一套规则:一方进行编码,另一方进行解码,这便是【协议】。
在 Redis 中,定义了简单的 RESP(REdis Serialization Protocol)文本协议,接下来,我们尝试实现一款能与服务端通信的 Redis 客户端,其核心原理如市面上常见的 Redisson、Jedis 等类似。
由浅入深,理解市面上流行的开源组件会更加的得心应手。
实现
基础
1. 网络通信
我们知道,在传统计算机网络模型中,传输层
(TCP / UDP)的上一层便是应用层
。应用层协议一般专注于数据的编解码等约定,比如经典的 HTTP 协议。
RESP 协议本质和 HTTP 是一个级别,都属于应用层协议
。
在 redis 中,传输层协议使用的是 TCP
,服务端从 TCP socket 缓冲区中读取数据,然后经过 RESP 协议解码得到我们的指令。
而写入数据则是相反,服务器先将响应数据使用 RESP 编码,然后将编码后的数据写入 TCP Socket 缓冲区发送给客户端。
2. 协议格式
在 RESP 协议中,第一个字节决定了具体数据类型:
简单字符串
:Simple Strings,首字节响应+
错误
:Errors,首字节响应-
整型
:Integers,首字节响应:
批量字符串
:Bulk Strings,首字节响应$
数组
:Arrays,首字节响应*
我们来看看一具体的例子,我们一条正常指令 PSETEX test_redisson_batch_key8 120000 test_redisson_batch_key=>value:8,经 RESP 协议编码后长这样:
*4 $6 PSETEX $24 test_redisson_batch_key8 $6 120000 $32 test_redisson_batch_key=>value:8
值得注意的是,在 RESP 协议中的每一部分都是以 \R\N
结尾。
基于 Socket 实现
实现一个简单的 Redis 客户端可以使用 Java 的 Socket 编程来直接与 Redis 服务器进行通信。
Redis 使用 RESP(REdis Serialization Protocol)协议进行通信,因此我们需要按照该协议格式化请求并解析响应。
1. 创建 RedisClient 类
首先,我们创建一个 RedisClient 类,用于管理与 Redis 服务器的连接和通信。
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStream; import java.net.Socket; public class RedisClient { private Socket socket; private BufferedReader reader; private OutputStream writer; // 连接到 Redis 服务器 // 通过 Java 的 Socket 进行连接 public RedisClient(String host, int port) throws IOException { this.socket = new Socket(host, port); this.reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); this.writer = socket.getOutputStream(); } public void close() throws IOException { reader.close(); writer.close(); socket.close(); } // 发送命令到服务端,并读取响应 private String sendCommand(String command) throws IOException { writer.write(command.getBytes()); writer.flush(); return parseResponse(); } // 解析响应数据 private String parseResponse() throws IOException { String response = reader.readLine(); if (response == null) { throw new IOException("Empty response from server"); } char prefix = response.charAt(0); switch (prefix) { case '+': // Simple String return response.substring(1); case '-': // Error throw new IOException("Error response from server: " + response.substring(1)); case ':': // Integer return response.substring(1); case '$': // Bulk String int length = Integer.parseInt(response.substring(1)); if (length == -1) { return null; // Null Bulk String } char[] bulkString = new char[length]; reader.read(bulkString, 0, length); reader.readLine(); // Read the trailing \r\n return new String(bulkString); case '*': // Array int count = Integer.parseInt(response.substring(1)); StringBuilder arrayResponse = new StringBuilder(); for (int i = 0; i < count; i++) { arrayResponse.append(parseResponse()).append("\n"); } return arrayResponse.toString(); default: throw new IOException("Unknown response prefix: " + prefix); } } // 设置键值对 // 这里需要按照 RESP 协议进行编码数据,然后通过 sendCommand 发送 public String set(String key, String value) throws IOException { String command = String.format("*3\r\n$3\r\nSET\r\n$%d\r\n%s\r\n$%d\r\n%s\r\n", key.length(), key, value.length(), value); return sendCommand(command); } // 获取键值对 public String get(String key) throws IOException { String command = String.format("*2\r\n$3\r\nGET\r\n$%d\r\n%s\r\n", key.length(), key); return sendCommand(command); } public String del(String key) throws IOException { String command = String.format("*2\r\n$3\r\nDEL\r\n$%d\r\n%s\r\n", key.length(), key); return sendCommand(command); } public String setex(String key, int seconds, String value) throws IOException { String command = String.format("*4\r\n$5\r\nSETEX\r\n$%d\r\n%s\r\n$%d\r\n%d\r\n$%d\r\n%s\r\n", key.length(), key, String.valueOf(seconds).length(), seconds, value.length(), value); return sendCommand(command); } }
基本步骤:
通过 Socket 与服务端建立连接
指令编码(使用 RESP 协议)
指令发送以及解析响应数据
2. 使用 RedisClient 类
通过 RedisClient 进行通信:
public class RedisClientTest { public static void main(String[] args) { try { RedisClient client = new RedisClient("localhost", 6379); // 设置一个键值对 System.out.println(client.set("name", "神医")); // 获取键对应的值 System.out.println(client.get("name")); // 设置一个带有过期时间的键值对(10秒后过期) System.out.println(client.setex("temp_key", 10, "临时值")); // 获取带有过期时间的键值对 System.out.println(client.get("temp_key")); // 删除一个键 System.out.println(client.del("name")); // 尝试获取已删除的键 System.out.println(client.get("name")); client.close(); } catch (IOException e) { e.printStackTrace(); } } }
基于 Netty 实现
如果想要异步通信能力、高吞吐等能力,使用 Java Socket 实现会更加繁琐,而成熟的 Netty 组件提供了异步处理和更高级的网络通信功能,已经应用于各类组件底层通信能力,使用 Netty 实现 Redis 客户端将更容易获得这些能力。
注:常见的开源 Redis 客户端 Redisson 底层便使用 Netty 实现。
接下来我们也使用 Netty 实现 Redis 客户端:
RESP 编码器
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; public class RedisCommandEncoder extends MessageToByteEncoder<String> { @Override protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) { String[] parts = msg.split(" "); out.writeByte('*'); out.writeBytes(Integer.toString(parts.length).getBytes()); out.writeBytes("\r\n".getBytes()); for (String part : parts) { out.writeByte('$'); out.writeBytes(Integer.toString(part.length()).getBytes()); out.writeBytes("\r\n".getBytes()); out.writeBytes(part.getBytes()); out.writeBytes("\r\n".getBytes()); } } }
RESP 解码器
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; public class RedisResponseDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { if (!in.isReadable()) { return; } byte firstByte = in.readByte(); switch (firstByte) { case '+': out.add(readSimpleString(in)); break; case '-': out.add(readError(in)); break; case ':': out.add(readInteger(in)); break; case '$': out.add(readBulkString(in)); break; case '*': out.add(readArray(in)); break; default: throw new IllegalArgumentException("Unknown RESP type: " + (char) firstByte); } } private String readSimpleString(ByteBuf in) { return readLine(in); } private String readError(ByteBuf in) { return readLine(in); } private long readInteger(ByteBuf in) { return Long.parseLong(readLine(in)); } private String readBulkString(ByteBuf in) { int length = Integer.parseInt(readLine(in)); if (length == -1) { return null; } byte[] bytes = new byte[length]; in.readBytes(bytes); in.readByte(); // \r in.readByte(); // \n return new String(bytes); } private Object[] readArray(ByteBuf in) { int length = Integer.parseInt(readLine(in)); if (length == -1) { return null; } Object[] array = new Object[length]; for (int i = 0; i < length; i++) { array[i] = decode(in); } return array; } private String readLine(ByteBuf in) { StringBuilder sb = new StringBuilder(); while (in.isReadable()) { char c = (char) in.readByte(); if (c == '\r') { in.readByte(); // \n break; } sb.append(c); } return sb.toString(); } }
Redis 客户端实现
import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class NettyRedisClient { private final String host; private final int port; private Channel channel; private BlockingQueue<Object> responseQueue = new ArrayBlockingQueue<>(1); public NettyRedisClient(String host, int port) { this.host = host; this.port = port; } public void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new RedisCommandEncoder(), new RedisResponseDecoder(), new SimpleChannelInboundHandler<Object>() { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) { responseQueue.offer(msg); } }); } }); channel = bootstrap.connect(host, port).sync().channel(); } finally { // group.shutdownGracefully(); } } public Object sendCommand(String command) throws InterruptedException { channel.writeAndFlush(command); return responseQueue.take(); } public void stop() { channel.close(); } public static void main(String[] args) throws InterruptedException { NettyRedisClient client = new NettyRedisClient("localhost", 6379); client.start(); System.out.println(client.sendCommand("PING")); System.out.println(client.sendCommand("SET mykey myvalue")); System.out.println(client.sendCommand("GET mykey")); client.stop(); } }
使用 Netty 可以显著提高网络通信的性能和效率,同时简化开发过程,提供更好的资源管理和错误处理机制。
对于需要处理大量并发连接和高性能要求的应用程序,Netty 是一个非常好的选择。
以上是最小可用版的 Redis 客户端,你可以基于此基础上实现更丰富的功能~