切换语言为:繁体

通过 Socket 以及 Netty 两种方式实现 Redis 客户端

  • 爱糖宝
  • 2024-10-11
  • 2049
  • 0
  • 0

前言

实现客户端与服务端间通信,在传输层协议 TCP 或 UDP 的基础上,定义客户端与服务端都能识别信息的一套规则:一方进行编码,另一方进行解码,这便是【协议】。

在 Redis 中,定义了简单的 RESP(REdis Serialization Protocol)文本协议,接下来,我们尝试实现一款能与服务端通信的 Redis 客户端,其核心原理如市面上常见的 Redisson、Jedis 等类似。

由浅入深,理解市面上流行的开源组件会更加的得心应手。

实现

基础

1. 网络通信

我们知道,在传统计算机网络模型中,传输层(TCP / UDP)的上一层便是应用层。应用层协议一般专注于数据的编解码等约定,比如经典的 HTTP 协议。

RESP 协议本质和 HTTP 是一个级别,都属于应用层协议

在 redis 中,传输层协议使用的是 TCP,服务端从 TCP socket 缓冲区中读取数据,然后经过 RESP 协议解码得到我们的指令。

而写入数据则是相反,服务器先将响应数据使用 RESP 编码,然后将编码后的数据写入 TCP Socket 缓冲区发送给客户端。

2. 协议格式

在 RESP 协议中,第一个字节决定了具体数据类型:

  • 简单字符串:Simple Strings,首字节响应 +

  • 错误:Errors,首字节响应 -

  • 整型:Integers,首字节响应 :

  • 批量字符串:Bulk Strings,首字节响应 $

  • 数组:Arrays,首字节响应 *

我们来看看一具体的例子,我们一条正常指令 PSETEX test_redisson_batch_key8 120000 test_redisson_batch_key=>value:8,经 RESP 协议编码后长这样:

*4
$6
PSETEX
$24
test_redisson_batch_key8
$6
120000
$32
test_redisson_batch_key=>value:8

值得注意的是,在 RESP 协议中的每一部分都是以 \R\N 结尾。

基于 Socket 实现

实现一个简单的 Redis 客户端可以使用 Java 的 Socket 编程来直接与 Redis 服务器进行通信。

Redis 使用 RESP(REdis Serialization Protocol)协议进行通信,因此我们需要按照该协议格式化请求并解析响应。

1. 创建 RedisClient 类

首先,我们创建一个 RedisClient 类,用于管理与 Redis 服务器的连接和通信。

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;

public class RedisClient {
    private Socket socket;
    private BufferedReader reader;
    private OutputStream writer;

    // 连接到 Redis 服务器
    // 通过 Java 的 Socket 进行连接
    public RedisClient(String host, int port) throws IOException {
        this.socket = new Socket(host, port);
        this.reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        this.writer = socket.getOutputStream();
    }

    public void close() throws IOException {
        reader.close();
        writer.close();
        socket.close();
    }

    // 发送命令到服务端,并读取响应
    private String sendCommand(String command) throws IOException {
        writer.write(command.getBytes());
        writer.flush();
        return parseResponse();
    }
    
    // 解析响应数据
    private String parseResponse() throws IOException {
    String response = reader.readLine();
    if (response == null) {
        throw new IOException("Empty response from server");
    }

    char prefix = response.charAt(0);
    switch (prefix) {
        case '+': // Simple String
            return response.substring(1);
        case '-': // Error
            throw new IOException("Error response from server: " + response.substring(1));
        case ':': // Integer
            return response.substring(1);
        case '$': // Bulk String
            int length = Integer.parseInt(response.substring(1));
            if (length == -1) {
                return null; // Null Bulk String
            }
            char[] bulkString = new char[length];
            reader.read(bulkString, 0, length);
            reader.readLine(); // Read the trailing \r\n
            return new String(bulkString);
        case '*': // Array
            int count = Integer.parseInt(response.substring(1));
            StringBuilder arrayResponse = new StringBuilder();
            for (int i = 0; i < count; i++) {
                arrayResponse.append(parseResponse()).append("\n");
            }
            return arrayResponse.toString();
        default:
            throw new IOException("Unknown response prefix: " + prefix);
     }
   }

    // 设置键值对
    // 这里需要按照 RESP 协议进行编码数据,然后通过 sendCommand 发送
    public String set(String key, String value) throws IOException {
        String command = String.format("*3\r\n$3\r\nSET\r\n$%d\r\n%s\r\n$%d\r\n%s\r\n", key.length(), key, value.length(), value);
        return sendCommand(command);
    }

    // 获取键值对
    public String get(String key) throws IOException {
        String command = String.format("*2\r\n$3\r\nGET\r\n$%d\r\n%s\r\n", key.length(), key);
        return sendCommand(command);
    }

    public String del(String key) throws IOException {
        String command = String.format("*2\r\n$3\r\nDEL\r\n$%d\r\n%s\r\n", key.length(), key);
        return sendCommand(command);
    }

    public String setex(String key, int seconds, String value) throws IOException {
        String command = String.format("*4\r\n$5\r\nSETEX\r\n$%d\r\n%s\r\n$%d\r\n%d\r\n$%d\r\n%s\r\n", key.length(), key, String.valueOf(seconds).length(), seconds, value.length(), value);
        return sendCommand(command);
    }
}

基本步骤:

  1. 通过 Socket 与服务端建立连接

  2. 指令编码(使用 RESP 协议)

  3. 指令发送以及解析响应数据

2. 使用 RedisClient 类

通过 RedisClient 进行通信:

public class RedisClientTest {
    public static void main(String[] args) {
        try {
            RedisClient client = new RedisClient("localhost", 6379);

            // 设置一个键值对
            System.out.println(client.set("name", "神医"));

            // 获取键对应的值
            System.out.println(client.get("name"));

            // 设置一个带有过期时间的键值对(10秒后过期)
            System.out.println(client.setex("temp_key", 10, "临时值"));

            // 获取带有过期时间的键值对
            System.out.println(client.get("temp_key"));

            // 删除一个键
            System.out.println(client.del("name"));

            // 尝试获取已删除的键
            System.out.println(client.get("name"));

            client.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

基于 Netty 实现

如果想要异步通信能力、高吞吐等能力,使用 Java Socket 实现会更加繁琐,而成熟的  Netty 组件提供了异步处理和更高级的网络通信功能,已经应用于各类组件底层通信能力,使用 Netty 实现 Redis 客户端将更容易获得这些能力。

注:常见的开源 Redis 客户端 Redisson 底层便使用 Netty 实现。

接下来我们也使用 Netty 实现 Redis 客户端:

RESP 编码器

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class RedisCommandEncoder extends MessageToByteEncoder<String> {
    @Override
    protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) {
        String[] parts = msg.split(" ");
        out.writeByte('*');
        out.writeBytes(Integer.toString(parts.length).getBytes());
        out.writeBytes("\r\n".getBytes());

        for (String part : parts) {
            out.writeByte('$');
            out.writeBytes(Integer.toString(part.length()).getBytes());
            out.writeBytes("\r\n".getBytes());
            out.writeBytes(part.getBytes());
            out.writeBytes("\r\n".getBytes());
        }
    }
}

RESP 解码器

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

public class RedisResponseDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        if (!in.isReadable()) {
            return;
        }

        byte firstByte = in.readByte();
        switch (firstByte) {
            case '+':
                out.add(readSimpleString(in));
                break;
            case '-':
                out.add(readError(in));
                break;
            case ':':
                out.add(readInteger(in));
                break;
            case '$':
                out.add(readBulkString(in));
                break;
            case '*':
                out.add(readArray(in));
                break;
            default:
                throw new IllegalArgumentException("Unknown RESP type: " + (char) firstByte);
        }
    }

    private String readSimpleString(ByteBuf in) {
        return readLine(in);
    }

    private String readError(ByteBuf in) {
        return readLine(in);
    }

    private long readInteger(ByteBuf in) {
        return Long.parseLong(readLine(in));
    }

    private String readBulkString(ByteBuf in) {
        int length = Integer.parseInt(readLine(in));
        if (length == -1) {
            return null;
        }
        byte[] bytes = new byte[length];
        in.readBytes(bytes);
        in.readByte(); // \r
        in.readByte(); // \n
        return new String(bytes);
    }

    private Object[] readArray(ByteBuf in) {
        int length = Integer.parseInt(readLine(in));
        if (length == -1) {
            return null;
        }
        Object[] array = new Object[length];
        for (int i = 0; i < length; i++) {
            array[i] = decode(in);
        }
        return array;
    }

    private String readLine(ByteBuf in) {
        StringBuilder sb = new StringBuilder();
        while (in.isReadable()) {
            char c = (char) in.readByte();
            if (c == '\r') {
                in.readByte(); // \n
                break;
            }
            sb.append(c);
        }
        return sb.toString();
    }
}

Redis 客户端实现

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class NettyRedisClient {
    private final String host;
    private final int port;
    private Channel channel;
    private BlockingQueue<Object> responseQueue = new ArrayBlockingQueue<>(1);

    public NettyRedisClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new RedisCommandEncoder(), new RedisResponseDecoder(), new SimpleChannelInboundHandler<Object>() {
                                @Override
                                protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
                                    responseQueue.offer(msg);
                                }
                            });
                        }
                    });

            channel = bootstrap.connect(host, port).sync().channel();
        } finally {
            // group.shutdownGracefully();
        }
    }

    public Object sendCommand(String command) throws InterruptedException {
        channel.writeAndFlush(command);
        return responseQueue.take();
    }

    public void stop() {
        channel.close();
    }

    public static void main(String[] args) throws InterruptedException {
        NettyRedisClient client = new NettyRedisClient("localhost", 6379);
        client.start();
        System.out.println(client.sendCommand("PING"));
        System.out.println(client.sendCommand("SET mykey myvalue"));
        System.out.println(client.sendCommand("GET mykey"));
        client.stop();
    }
}

使用 Netty 可以显著提高网络通信的性能和效率,同时简化开发过程,提供更好的资源管理和错误处理机制。

对于需要处理大量并发连接和高性能要求的应用程序,Netty 是一个非常好的选择。

以上是最小可用版的 Redis 客户端,你可以基于此基础上实现更丰富的功能~

0条评论

您的电子邮件等信息不会被公开,以下所有项均必填

OK! You can skip this field.