java-Redis集合

引用包:jedis-3.0.1.jar、commons-pool2-2.6.0.jar

一、从Redis集合中实时获取数据:

连接Redis

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public static Lock lock = new ReentrantLock();

/**
     * 连接Redis
     * @param conferenceId
     * @return
     */
    public String startRedis(String topicId) {
        textMessage = "";
        String result = "";
        try {
            JedisPoolConfig config = new JedisPoolConfig();
            config.setTimeBetweenEvictionRunsMillis(30000);
            config.setMaxWaitMillis(10 * 1000);
            config.setMaxIdle(1000);
            config.setTestOnBorrow(true);
            JedisPool jedisPool = new JedisPool(config, redisIpYJ, Integer.parseInt(redisPortYJ),10000);// 连接redis服务端
            result = "连接Redis成功";
            lock.lock();
            try {
                Thread thread = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        getRecordTextNew_Redis(jedisPool,topicId);
                    }
                });
                thread.start();
            } finally {
                lock.unlock();
            }        
        } catch (Exception e) {
            result = "连接Redis失败:" + e.getMessage();
        }
        return result;
    }

实时获取数据

public static Boolean isSelectRedis = false;//是否继续查询Redis

    /**
     * 从Redis实时获取语音记录文本(党组会)
     * @param topicId
     */
    public void getRecordTextNew_Redis(JedisPool jedisPool,String topicId) {
        Jedis jedis = null;
        while (isSelectRedis) {
        try {
                jedis = jedisPool.getResource();   //取出一个连接
                Set<String> results = jedis.zrange("asr:text:"+topicId,0,-1);
                for (String result: results) {
                    //TODO消费result
                    if (StringUtils.isNotEmpty(result)) {
                        JSONObject resultMsg = JSONObject.parseObject(result);
                        String text = resultMsg.getString("result");
                        System.out.println("消息text:"+text);
                        String pgs = "1";
                        String micName = resultMsg.getString("roleName");
                        String micId = resultMsg.getString("role");
                        String uId = resultMsg.getString("uid");//段落ID
                        if (StringUtils.isNotEmpty(text)) {
                            String dataText = "<b>" + micName + ":</b>" + text;
                            String dataText2 = "<div id=""+ uId +""><b>" + micName + ":</b>" + text+"</div>";
                            textMap.put(uId, dataText2);
                            System.out.println("消息dataText:"+dataText);
                            JSONObject textObj = new JSONObject();
                            textObj.put("dataText", dataText);
                            textObj.put("dataPgs", pgs);
                            textObj.put("dataUId", uId);
                            try {
                                Thread.sleep(400);
                            } catch (InterruptedException e) {
                                // TODO 自动生成的 catch 块
                                e.printStackTrace();
                            }
                            ConfWebSocketService.sendMessage(textObj.toJSONString(), "2");//向页面发送消息
                        }
                    }
                }
                String[] strResults = (String[])(results.toArray(new String[results.size()]));
                if (strResults.length > 0) {
                    //TODO 移除消费掉的数据
                    jedis.zrem("asr:text:"+topicId, strResults);
                }
                Thread.sleep(300);            
        } catch (Exception e) {
            if (jedis != null) {
                jedis.close();
            }
            e.printStackTrace();
        }finally {
            if (jedis != null) {
                jedis.close();
            }
        }
        }
    }

二、通过Redis订阅消息:

package net.nblh.utils.common;

import java.util.Set;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

/**
 * 建立订阅者,订阅者去订阅频道(mychannel)
 * @author lijd
 *
 */
public class GetSpeechRecognition_YJ_Sub extends Thread{
    private final JedisPool jedisPool;
    private final GetSpeechRecognition_YJ_Msg msgListener = new GetSpeechRecognition_YJ_Msg();
    private final String channel = "db0";//"mychannel";
    
    public GetSpeechRecognition_YJ_Sub(JedisPool jedisPool) {
        super("GetSpeechRecognition_YJ_Sub");
        this.jedisPool = jedisPool;
    }
    
    @Override
    public void run() {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();   //取出一个连接
            Set<String> result = jedis.zrange("asr:text:1112",0,-1);
            //jedis.subscribe(msgListener, channel); //通过subscribe的api去订阅,参数是订阅者和频道名
            //注意:subscribe是一个阻塞的方法,在取消订阅该频道前,会一直阻塞在这,无法执行后续的代码
            //这里在msgListener的onMessage方法里面收到消息后,调用了this.unsubscribe();来取消订阅,才会继续执行
            System.out.println("继续执行后续代码。。。");
            
        } catch (Exception e) {
            if (jedis != null) {
                jedis.close();
            }
            e.printStackTrace();
        }finally {
            if (jedis != null) {
                jedis.close();
            }
        }
    }
}
package net.nblh.utils.common;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

/**
 * //建立发布者,通过频道(mychannel)发布消息
 * @author lijd
 *
 */
public class GetSpeechRecognition_YJ_Pub extends Thread{
    private final JedisPool jedisPool;
    
    public GetSpeechRecognition_YJ_Pub(JedisPool jedisPool) {
        this.jedisPool = jedisPool;
    }
    
    @Override
    public void run() {
        while (true) {
            Jedis jedis = null;
            try {
                Thread.sleep(1000);
                jedis = jedisPool.getResource();//连接池中取出一个连接
                String line = "fabuxiaoxi:";
                if (!"quit".equals(line)) {
                    jedis.publish("mychannel", line);//从通过mychannel 频道发布消息
                    System.out.println(String.format("发布消息成功!channel: %s, message: %s", "mychannel", line));
                }else {
                    break;
                }
                if (jedis != null) {
                    jedis.close();
                }
            }catch (Exception e) {
                e.printStackTrace();
            }            
        }
    }
}
package net.nblh.utils.common;

import redis.clients.jedis.JedisPubSub;

/**
 * 建立消息监听类,并重写了JedisPubSub的一些相关方法
 * @author lijd
 *
 */
public class GetSpeechRecognition_YJ_Msg extends JedisPubSub{
    public GetSpeechRecognition_YJ_Msg(){}
    
    @Override
    public void onMessage(String channel, String message) {       
        //收到消息会调用
        System.out.println(String.format("收到消息成功! channel: %s, message: %s", channel, message));
        //this.unsubscribe();
    }
 
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {    
        //订阅频道会调用
        System.out.println(String.format("订阅频道成功! channel: %s, subscribedChannels %d",
        channel, subscribedChannels));
    }
 
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {   
        //取消订阅会调用
        System.out.println(String.format("取消订阅频道! channel: %s, subscribedChannels: %d",
                channel, subscribedChannels));
    }
}
原文地址:https://www.cnblogs.com/lijianda/p/10371377.html