websocket即时通讯

单体环境

jar

<dependency>
    <groupId>javax.websocket</groupId>
    <artifactId>javax.websocket-api</artifactId>
    <version>1.1</version>
</dependency>

java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.*;

@ServerEndpoint(value="/shopWebsocket/{shopId}")
public class ShopWebSocket {
    private static final String SESSION_KEY_PRE = "websocketId";
    public static Map<String, Session> sessions = new HashMap<>();
    private static String shopId;

    private Logger log = LoggerFactory.getLogger(ShopWebSocket.class);

    //连接时执行
    @OnOpen
    public void onOpen(@PathParam("shopId") String shopId, Session session) throws IOException {
        this.shopId = shopId;
        log.info("session id: " + session.getId());
        log.info("新连接:{}", shopId);
        sessions.put(shopId, session);
    }

    //关闭时执行
    @OnClose
    public void onClose(){
        log.info("连接:{} 关闭", this.shopId);
        sessions.remove(shopId);
    }

    //收到消息时执行
    @OnMessage
    public void onMessage(String message, Session session) throws IOException {
        log.info("收到用户{}的消息{}", this.shopId, message);
        if (message.equals("ping")) {
            session.getBasicRemote().sendText("heartCheck"); //回复用户
            return;
        }
        session.getBasicRemote().sendText("收到 " + this.shopId + " 的消息: " + message); //回复用户
    }

    //连接错误时执行
    @OnError
    public void onError(Session session, Throwable error){
        log.info("用户id为:{}的连接发送错误", this.shopId);
        error.printStackTrace();
    }

    /**
     * 发送消息
     * @author zhuxiang
     * @date 3:52 下午 2021/7/19
     * @param: shopId
     * @param: msg
     * @return void
     */
    public static boolean sendMsg(String shopId, String msg)
        throws IOException {
        Session session = sessions.get(shopId);
        if (session == null) {
            return false;
        }
        session.getBasicRemote().sendText(msg);  // 发送消息
        return true;
    }

    /**
     * 获取药店ID
     * @author zhuxiang
     * @date 10:54 上午 2021/7/20
     * @param:
     * @return java.util.List<java.lang.String>
     */
    public synchronized static List<String> shopIdList() {
        List<String> list = new ArrayList<>();
        for (String shopId : ShopWebSocket.sessions.keySet()) {
            list.add(shopId);
        }
        return list;
    }

}

js

<!DOCTYPE html>
<html>
<head>

</head>
<body>
<script src="http://cdn.jsdelivr.net/sockjs/1/sockjs.min.js"></script>

<script type="text/javascript">
	var ws = null;
	function openWebSocket(){
	    //判断当前浏览器是否支持WebSocket
	    if ('WebSocket' in window) {
	        ws = new WebSocket("ws://192.168.0.122:8080/taodoctor/shopWebsocket/user000");
	    } else {
	        ws = new SockJS("http://localhost:8080/taodoctor/sockjs/myWebSocket/info?type=mall");
	    }
	    ws.onopen = function () {

	    };  //这个事件是接受后端传过来的数据
	    ws.onmessage = function (event) {
	        //根据业务逻辑解析数据
	        console.log(event);
	    };
	    ws.onclose = function (event) {

	    };
	}
	openWebSocket();
</script>
</body>
</html>

分布式环境

使用redis订阅模式,推送消息

redis配置

channel绑定

@Configuration
public class ZhRedisMessageListenerContainer {
    @Bean
    MessageListenerAdapter messageListener() {
        return new MessageListenerAdapter(new ShopMessageSubscriber());
    }

    @Bean
    RedisMessageListenerContainer redisContainer(RedisConnectionFactory factory) {
        final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        container.addMessageListener(messageListener(), new ChannelTopic("shopMsg"));
        return container;
    }
}

消息监听

@Service
public class ShopMessageSubscriber implements MessageListener {

    @Override
    public void onMessage(Message message, byte[] pattern) {
        ZhMedicineShopMessageMapper zhMedicineShopMessageMapper = SpringUtil.getBean(ZhMedicineShopMessageMapper.class);
        String msg = message.toString();
        String channel = new String(message.getChannel());
        if ("shopMsg".equals(channel)) {
            try {
                String msgId = msg;
                ZhMedicineShopMessage zhMedicineShopMessage = zhMedicineShopMessageMapper.selectByPrimaryKey(msgId);
                if (ShopWebSocket.sendMsg(zhMedicineShopMessage.getShopId(), JSONObject.toJSONString(zhMedicineShopMessage))) {
                    List<String> ids = new ArrayList<>();
                    ids.add(zhMedicineShopMessage.getId());
                    zhMedicineShopMessageMapper.updateStatus(ids, 1);
                }
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }

    }
}

消息推送

redisTemplate.convertAndSend("shopMsg", message.getId());

redisTemplate

@Configuration
public class JedisConfig {
	
	@Value("${redis.host}")
	private String redisHost;
	
	@Value("${redis.port}")
	private int redisPort;
	
	@Value("${redis.password}")
	private String redisPassword;
	
	@Value("${redis.database}")
	private int redisDatabase;
	
	@Bean
	public JedisConnectionFactory jedisConnectionFactory() {
		JedisShardInfo poolConfig = new JedisShardInfo(redisHost, redisPort);
		if (StringUtils.isNoneBlank(redisPassword)) {
			poolConfig.setPassword(redisPassword);
		}
		JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(poolConfig );
		jedisConnectionFactory.setDatabase(redisDatabase);
		return jedisConnectionFactory;
	}
	
	@Bean(name = {"redisTemplate"})
	public RedisTemplate<String, Serializable> redisTemplate() {
		RedisTemplate<String, Serializable> tem = new RedisTemplate<>();
		tem.setConnectionFactory(jedisConnectionFactory());
		tem.setKeySerializer(new StringRedisSerializer());
		tem.setValueSerializer(new JdkSerializationRedisSerializer());
		return tem;
	}

	@Bean(name = {"redisStringTemplate"})
	public RedisTemplate<String, String> redisStringTemplate() {
		RedisTemplate<String, String> tem = new RedisTemplate<>();
		tem.setConnectionFactory(jedisConnectionFactory());
		tem.setKeySerializer(new StringRedisSerializer());
		tem.setValueSerializer(new StringRedisSerializer());
		return tem;
	}
	
	@Bean(name = {"redisTemplateMap"})
	public RedisTemplate<String, HashMap<String, String>> redisTemplateMap() {
		RedisTemplate<String, HashMap<String, String>> tem = new RedisTemplate<>();
		tem.setConnectionFactory(jedisConnectionFactory());
		tem.setKeySerializer(new StringRedisSerializer());
		tem.setValueSerializer(new JdkSerializationRedisSerializer());
		return tem;
	}
	
}

原文地址:https://www.cnblogs.com/zhuxiang1633/p/15041563.html