mini Redis(项目 二)

一个仿Redis的内存数据库(主要用来做命令解析)服务端,

 客户端使用的开源工具 :  https://dom4j.github.io/      github:https://github.com/hehaoyuan/mini-Redis

Redis 简介:
Redis是一个开源的底层使用C语言编写的key-value内存数据库。可用于缓存数据、事件发布订阅、高速队列等场景,而且支持丰富的数据类型:string(字符串)、hash(哈希)、list(列表)、set(无序集合)、zset(有序集合)


使用场景:

使用场景
随着数据量的增长,MySQL已经满足不了大型互联网类应用的需求。因此,Redis基于内存存储数据,在某些场景下,就会大大提高效率。

缓存:对于热点数据,缓存以后可能读取数十万次,因此对于热点数据,不但缓存的价值非常大,而且当总数据量比较大的时候直接从数据库中查询会比较影响性能。例如:对经常需要查询且变动不是很频繁的数据,可以考虑基于Redis实现缓存。
会话缓存:Redis还可以进行会话缓存。例如:将web session存放在Redis中。
计数器:因为Redis具有原子性,所以在某些方面可以避免并发问题,比如:统计点击率、点赞率、收藏率等。
消息队列:Redis能作为一个很好的消息队列来使用,依赖List类型利用LPUSH命令将数据添加到链表头部,通过BRPOP命令将元素从链表尾部取出。
社交列表:社交属性相关的列表信息,例如,用户点赞列表、用户分享列表、用户收藏列表、用户关注列表、用户粉丝列表等,使用Hash类型数据结构是个不错的选择。
最新动态:按照时间顺序排列的最新动态,也是一个很好的应用,可以使用Sorted Set类型的分数权重存储时间戳进行排序。


Redis数据类型
String(字符串)

String是Redis的最基本数据结构,以一个键和一个值存储在Redis内部,类似java的Map结构,可以通过键去找值。

Hash(哈希)

Redis hash是一个string类型的field和value的映射表,hash特别适合用于存储对象,类似Java里面的Map<String, Object>。

List(列表)

根据插入顺序排序的字符串元素的集合,底层实现是链表。

Set(集合)

Redis的Set是string类型的无序集合,它是通过HashTable实现的。

zset(sorted set:有序集合)

Redis zset和set一样也是string类型元素的集合,且不允许重复。但每个字符串元素与浮点数值相关联,称为分数,元素总是按其分数排序,可以检索一系列元素。

Redis协议规范
Redis客户端使用名为RESP(REdis序列化协议)的协议与Redis服务器通信。虽然该协议是专为Redis设计的,但它可以用于其他客户端 - 服务器软件项目。

RESP可以序列化不同的数据类型,如整数,字符串,数组,还有一种特定的错误类型。请求从客户端发送到Redis服务器,作为表示要执行的命令的参数的字符串数组,Redis使用特定于命令的数据类型进行回复。

注意:此处概述的协议仅用于客户端 - 服务器通信。Redis Cluster使用不同的二进制协议,以便在节点之间交换消息。

网络层:客户端连接到Redis服务器,创建到端口6379的TCP连接。

请求 - 响应模型:Redis接受由不同参数组成的命令。收到命令后,将对其进行协议解析,并将回复发送回客户端。

RESP协议说明
RESP协议是在Redis 1.2中引入的,但它成为了与Redis 2.0中的Redis服务器通信的标准方式。这是您应该在Redis客户端中实现的协议。

RESP实际上是一个支持以下数据类型的序列化协议:Simple Strings、Errors、Integers、Bulk Strings、Arrays。

RESP在Redis中用作请求 - 响应协议的方式如下:

客户端将命令作为Bulk Strings的RESP数组发送到Redis服务器,服务器根据命令实现回复一种RESP类型。在RESP中,某些数据的类型取决于第一个字节:

对于Simple Strings,回复的第一个字节是“+”
对于Errors,回复的第一个字节是“ - ”
对于Integers,回复的第一个字节是“:”
对于Bulk Strings,回复的第一个字节是“$”
对于Arrays,回复的第一个字节是“*”
此外,RESP能够使用指定的Bulk Strings或Array的特殊变体来表示Null值(如:"$-1 " 和 "*-1 " 都表示null)。在RESP中,协议的不同部分始终以“ r n”(CRLF)结束。


结构原理图:

 

需要的外包依赖:

    <dependencies>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.26</version>
        </dependency>

        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>
    </dependencies>

第一步:

将客户端发到服务端的命令,输入流后,进行解码,

再将解完码后的对象根据Command命令编码,不同命令的类通过其类加载器,调用其自身将命令对应的操作输出流到缓存中

package com.hhy;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.List;

public class Protocol {
    public static Object read(InputStream is) throws IOException {
        return process(is);
    }

    public static Command readCommand(InputStream is) throws Exception {
        Object o  = read(is);
        // 作为 Server 来说,一定不会收到 "+OK
"
        if (!(o instanceof List)) {
            throw new Exception("命令必须是 Array 类型");
        }

        List<Object> list = (List<Object>)o;
        if (list.size() < 1) {
            throw new Exception("命令元素个数必须大于 1");
        }

        Object o2 = list.remove(0);
        if (!(o2 instanceof byte[])) {
            throw new Exception("错误的命令类型");
        }

        byte[] array = (byte[])o2;
        String commandName = new String(array);
        String className = String.format("com.hhy.commands.%sCommand", commandName.toUpperCase());
        Class<?> cls = Class.forName(className);
        //不属于这个接口
        if (!Command.class.isAssignableFrom(cls)) {
            throw new Exception("错误的命令");
        }
        Command command = (Command)cls.newInstance();
        command.setArgs(list);

        return command;
    }

    private static String processSimpleString(InputStream is) throws IOException {
        return readLine(is);
    }

    private static String processError(InputStream is) throws IOException {
        return readLine(is);
    }

    private static long processInteger(InputStream is) throws IOException {
        return readInteger(is);
    }

    private static byte[] processBulkString(InputStream is) throws IOException {
        int len = (int)readInteger(is);
        if (len == -1) {
            // "$-1
"    ==> null
            return null;
        }

        byte[] r = new byte[len];
        is.read(r, 0, len);
        /*
        for (int i = 0; i < len; i++) {
            int b = is.read();
            r[i] = (byte)b;
        }
        */

        // "$5
hello
";
        is.read();
        is.read();

        return r;
    }

    private static List<Object> processArray(InputStream is) throws IOException {
        int len = (int)readInteger(is);
        if (len == -1) {
            // "*-1
"        ==> null
            return null;
        }

        List<Object> list = new ArrayList<>(len);
        for (int i = 0; i < len; i++) {
            try {
                list.add(process(is));
            } catch (RemoteException e) {
                list.add(e);
            }
        }

        return list;
    }
    private static Object process(InputStream is) throws IOException {
        int b = is.read();
        if (b == -1) {
            throw new RuntimeException("不应该读到结尾的");
        }

        switch (b) {
            case '+':
                return processSimpleString(is);
            case '-':
                //考虑异常被当做对象写进数组,所以定义一个额外的对象类,否则就会抛出异常,不会吧异常当做对象
                throw new RemoteException(processError(is));
            case ':':
                return processInteger(is);
            case '$':
                return processBulkString(is);
            case '*':
                return processArray(is);
            default:

                throw new RuntimeException("不识别的类型");
        }
    }

    private static String readLine(InputStream is) throws IOException {
        boolean needRead = true;
        StringBuilder sb = new StringBuilder();
        int b = -1;
        while (true) {
            if (needRead == true) {
                b = is.read();
                if (b == -1) {
                    throw new RuntimeException("不应该读到结尾的");
                }
            } else {
                needRead = true;
            }

            if (b == '
') {
                int c = is.read();
                if (c == -1) {
                    throw new RuntimeException("不应该读到结尾的");
                }

                if (c == '
') {
                    break;
                }

                if (c == '
') {
                    sb.append((char) b);
                    b = c;
                    needRead = false;
                } else {
                    sb.append((char) b);
                    sb.append((char) c);
                }
            } else {
                sb.append((char)b);
            }
        }
        return sb.toString();
    }

    public static long readInteger(InputStream is) throws IOException {
        boolean isNegative = false;
        StringBuilder sb = new StringBuilder();
        int b = is.read();
        if (b == -1) {
            throw new RuntimeException("不应该读到结尾");
        }

        if (b == '-') {
            isNegative = true;
        } else {
            sb.append((char)b);
        }

        while (true) {
            b = is.read();
            if (b == -1) {
                throw new RuntimeException("不应该读到结尾的");
            }

            if (b == '
') {
                int c = is.read();
                if (c == -1) {
                    throw new RuntimeException("不应该读到结尾的");
                }

                if (c == '
') {
                    break;
                }

                throw new RuntimeException("没有读到\r\n");
            } else {
                sb.append((char)b);
            }
        }

        long v = Long.parseLong(sb.toString());
        if (isNegative) {
            v = -v;
        }

        return v;
    }

    public static void writeError(OutputStream os, String message) throws IOException {
        os.write('-');
        os.write(message.getBytes());
        os.write("
".getBytes());
    }

    public static void writeInteger(OutputStream os, long v) throws IOException {
        // v = 10
        //:10


        // v = -1
        //:-1


        os.write(':');
        os.write(String.valueOf(v).getBytes());
        os.write("
".getBytes());
    }

    public static void writeArray(OutputStream os, List<?> list) throws Exception {
        os.write('*');
        os.write(String.valueOf(list.size()).getBytes());
        os.write("
".getBytes());
        for (Object o : list) {
            if (o instanceof String) {
                writeBulkString(os, (String)o);
            } else if (o instanceof Integer) {
                writeInteger(os, (Integer)o);
            } else if (o instanceof Long) {
                writeInteger(os, (Long)o);
            } else {
                throw new Exception("错误的类型");
            }
        }
    }

    public static void writeBulkString(OutputStream os, String s) throws IOException {
        byte[] buf = s.getBytes();
        os.write('$');
        os.write(String.valueOf(buf.length).getBytes());
        os.write("
".getBytes());
        os.write(buf);
        os.write("
".getBytes());
    }

    public static void writeNull(OutputStream os) throws IOException {
        os.write('$');
        os.write('-');
        os.write('1');
        os.write('
');
        os.write('
');
    }
}

命令的接口与实现类(只实现了两类,List与HashMap的增加与查询):

package com.hhy;

import java.io.IOException;
import java.io.OutputStream;
import java.util.List;

public interface Command {
    void setArgs(List<Object> args);

    void run(OutputStream os) throws IOException;
}

lLPUSH:

package com.hhy.commands;

import com.hhy.Command;
import com.hhy.Database;
import com.hhy.Protocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.OutputStream;
import java.util.List;

public class LPUSHCommand implements Command {
    private static final Logger logger = LoggerFactory.getLogger(LPUSHCommand.class);
    private List<Object> args;

    @Override
    public void setArgs(List<Object> args) {
        this.args = args;
    }

    @Override
    public void run(OutputStream os) throws IOException {
        if (args.size() != 2) {
            Protocol.writeError(os, "命令至少需要两个参数");
            return;
        }
        String key = new String((byte[])args.get(0));
        String value = new String((byte[])args.get(1));
        logger.debug("运行的是 lpush 命令: {} {}", key, value);

        // 这种方式不是一个很好的线程同步的方式
        List<String> list = Database.getList(key);
        list.add(0, value);

        logger.debug("插入后数据共有 {} 个", list.size());

        Protocol.writeInteger(os, list.size());
    }
}

LRANGE:

package com.hhy.commands;

import com.hhy.Command;
import com.hhy.Database;
import com.hhy.Protocol;

import java.io.IOException;
import java.io.OutputStream;
import java.util.List;

public class LRANGECommand implements Command {
    private List<Object> args;

    @Override
    public void setArgs(List<Object> args) {
        this.args = args;
    }

    @Override
    public void run(OutputStream os) throws IOException {
        String key = new String((byte[])args.get(0));
        int start = Integer.parseInt(new String((byte[])args.get(1)));
        int end = Integer.parseInt(new String((byte[])args.get(2)));

        List<String> list = Database.getList(key);
        if (end < 0) {
            end = list.size() + end;
        }
        List<String> result = list.subList(start, end + 1);
        try {
            Protocol.writeArray(os, result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

HGET:

package com.hhy.commands;

import com.hhy.Command;
import com.hhy.Database;
import com.hhy.Protocol;

import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;

public class HGETCommand implements Command {
    private List<Object> args;

    @Override
    public void setArgs(List<Object> args) {
        this.args = args;
    }

    @Override
    public void run(OutputStream os) throws IOException {
        String key = new String((byte[])args.get(0));
        String field = new String((byte[])args.get(1));

        Map<String, String> hash = Database.getHashes(key);
        String value = hash.get(field);
        if (value != null) {
            Protocol.writeBulkString(os, value);
        } else {
            Protocol.writeNull(os);
        }
    }
}

HSET:

package com.hhy.commands;

import com.hhy.Command;
import com.hhy.Database;
import com.hhy.Protocol;

import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;

public class HSETCommand implements Command {
    private List<Object> args;

    @Override
    public void setArgs(List<Object> args) {
        this.args = args;
    }

    @Override
    public void run(OutputStream os) throws IOException {
        String key = new String((byte[])args.get(0));
        String field = new String((byte[])args.get(1));
        String value = new String((byte[])args.get(2));
        Map<String, String> hash =Database.getHashes(key);
        boolean isUpdate = hash.containsKey(field);
        hash.put(field, value);
        if (isUpdate) {
            Protocol.writeInteger(os, 0);
        } else {
            Protocol.writeInteger(os, 1);
        }
    }
}

自定义异常类:

package com.hhy.exceptions;

public class RemoteException extends Exception {
    public RemoteException() {
    }

    public RemoteException(String message) {
        super(message);
    }

    public RemoteException(String message, Throwable cause) {
        super(message, cause);
    }

    public RemoteException(Throwable cause) {
        super(cause);
    }

    public RemoteException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
        super(message, cause, enableSuppression, writableStackTrace);
    }
}

写到内存中去存储:

package com.hhy;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class Database {
    private static Map<String, List<String>> lists = new HashMap<>();
    private static Map<String, Map<String, String>> hashes = new HashMap<>();


    public static List<String> getList(String key) {
        /*
        List<String> list = lists.computeIfAbsent(key, k -> {
            return new ArrayList<>();
        });
         */

        List<String> list =  lists.get(key);
        if (list == null) {
            list = new ArrayList<>();
            lists.put(key, list);
        }

        return list;
    }

    public static Map<String, String> getHashes(String key) {
        Map<String, String> hash =  hashes.get(key);
        if (hash == null) {
            hash = new HashMap<>();
            hashes.put(key, hash);
        }

        return hash;
    }
}

多线程编程:

package com.hhy;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;


public class MutliThread implements Runnable {

    private Socket client;

    public MutliThread(Socket client) {
        this.client = client;
    }

    @Override
    public void run() {
        while (true) {
            try {
                InputStream inputStream = client.getInputStream();
                OutputStream outputStream = client.getOutputStream();

                while (true) {
                    Command command = Protocol.readCommand(inputStream);
                    command.run(outputStream);
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

server服务端:

package com.hhy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Server {
    private static final Logger logger = LoggerFactory.getLogger(Server.class);
    

    public static void main(String[] args) throws IOException{
        int port = 6379;

        ServerSocket serverSocket = new ServerSocket(port);//ServerScoket监听端口

        System.out.println("服务器等待连接..."+serverSocket.getLocalSocketAddress());

        ExecutorService executorService = Executors.newFixedThreadPool(20);

        while(true){

            Socket client = serverSocket.accept();

            System.out.println("有客户端连接到服务器..."+client.getRemoteSocketAddress());

            executorService.execute(new MutliThread(client));
        }
    }
}

如何根据二进制字节流解析出命令名称?
项目中实现了一个 Protocol 类,专门用于协议解析,将二进制流转为Java对象。

如何根据命令名称获取到命令所对应的对象?
根据命令名称找到对应的类名称,这里采用了约定俗成的办法,把类名称和命令名称起成一样的。
根据类名称获取指定的对象,很容易想到通过反射。具体做法是每次命令创建一个新的对象,相同的命令共用同一个对象(通过单例模式实现)。

原文地址:https://www.cnblogs.com/hetaoyuan/p/11331589.html