单体环境
jar
<dependency>
<groupId>javax.websocket</groupId>
<artifactId>javax.websocket-api</artifactId>
<version>1.1</version>
</dependency>
java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.*;
@ServerEndpoint(value="/shopWebsocket/{shopId}")
public class ShopWebSocket {
private static final String SESSION_KEY_PRE = "websocketId";
public static Map<String, Session> sessions = new HashMap<>();
private static String shopId;
private Logger log = LoggerFactory.getLogger(ShopWebSocket.class);
//连接时执行
@OnOpen
public void onOpen(@PathParam("shopId") String shopId, Session session) throws IOException {
this.shopId = shopId;
log.info("session id: " + session.getId());
log.info("新连接:{}", shopId);
sessions.put(shopId, session);
}
//关闭时执行
@OnClose
public void onClose(){
log.info("连接:{} 关闭", this.shopId);
sessions.remove(shopId);
}
//收到消息时执行
@OnMessage
public void onMessage(String message, Session session) throws IOException {
log.info("收到用户{}的消息{}", this.shopId, message);
if (message.equals("ping")) {
session.getBasicRemote().sendText("heartCheck"); //回复用户
return;
}
session.getBasicRemote().sendText("收到 " + this.shopId + " 的消息: " + message); //回复用户
}
//连接错误时执行
@OnError
public void onError(Session session, Throwable error){
log.info("用户id为:{}的连接发送错误", this.shopId);
error.printStackTrace();
}
/**
* 发送消息
* @author zhuxiang
* @date 3:52 下午 2021/7/19
* @param: shopId
* @param: msg
* @return void
*/
public static boolean sendMsg(String shopId, String msg)
throws IOException {
Session session = sessions.get(shopId);
if (session == null) {
return false;
}
session.getBasicRemote().sendText(msg); // 发送消息
return true;
}
/**
* 获取药店ID
* @author zhuxiang
* @date 10:54 上午 2021/7/20
* @param:
* @return java.util.List<java.lang.String>
*/
public synchronized static List<String> shopIdList() {
List<String> list = new ArrayList<>();
for (String shopId : ShopWebSocket.sessions.keySet()) {
list.add(shopId);
}
return list;
}
}
js
<!DOCTYPE html>
<html>
<head>
</head>
<body>
<script src="http://cdn.jsdelivr.net/sockjs/1/sockjs.min.js"></script>
<script type="text/javascript">
var ws = null;
function openWebSocket(){
//判断当前浏览器是否支持WebSocket
if ('WebSocket' in window) {
ws = new WebSocket("ws://192.168.0.122:8080/taodoctor/shopWebsocket/user000");
} else {
ws = new SockJS("http://localhost:8080/taodoctor/sockjs/myWebSocket/info?type=mall");
}
ws.onopen = function () {
}; //这个事件是接受后端传过来的数据
ws.onmessage = function (event) {
//根据业务逻辑解析数据
console.log(event);
};
ws.onclose = function (event) {
};
}
openWebSocket();
</script>
</body>
</html>
分布式环境
使用redis订阅模式,推送消息
redis配置
channel绑定
@Configuration
public class ZhRedisMessageListenerContainer {
@Bean
MessageListenerAdapter messageListener() {
return new MessageListenerAdapter(new ShopMessageSubscriber());
}
@Bean
RedisMessageListenerContainer redisContainer(RedisConnectionFactory factory) {
final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
container.addMessageListener(messageListener(), new ChannelTopic("shopMsg"));
return container;
}
}
消息监听
@Service
public class ShopMessageSubscriber implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
ZhMedicineShopMessageMapper zhMedicineShopMessageMapper = SpringUtil.getBean(ZhMedicineShopMessageMapper.class);
String msg = message.toString();
String channel = new String(message.getChannel());
if ("shopMsg".equals(channel)) {
try {
String msgId = msg;
ZhMedicineShopMessage zhMedicineShopMessage = zhMedicineShopMessageMapper.selectByPrimaryKey(msgId);
if (ShopWebSocket.sendMsg(zhMedicineShopMessage.getShopId(), JSONObject.toJSONString(zhMedicineShopMessage))) {
List<String> ids = new ArrayList<>();
ids.add(zhMedicineShopMessage.getId());
zhMedicineShopMessageMapper.updateStatus(ids, 1);
}
}
catch (IOException e) {
e.printStackTrace();
}
}
}
}
消息推送
redisTemplate.convertAndSend("shopMsg", message.getId());
redisTemplate
@Configuration
public class JedisConfig {
@Value("${redis.host}")
private String redisHost;
@Value("${redis.port}")
private int redisPort;
@Value("${redis.password}")
private String redisPassword;
@Value("${redis.database}")
private int redisDatabase;
@Bean
public JedisConnectionFactory jedisConnectionFactory() {
JedisShardInfo poolConfig = new JedisShardInfo(redisHost, redisPort);
if (StringUtils.isNoneBlank(redisPassword)) {
poolConfig.setPassword(redisPassword);
}
JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(poolConfig );
jedisConnectionFactory.setDatabase(redisDatabase);
return jedisConnectionFactory;
}
@Bean(name = {"redisTemplate"})
public RedisTemplate<String, Serializable> redisTemplate() {
RedisTemplate<String, Serializable> tem = new RedisTemplate<>();
tem.setConnectionFactory(jedisConnectionFactory());
tem.setKeySerializer(new StringRedisSerializer());
tem.setValueSerializer(new JdkSerializationRedisSerializer());
return tem;
}
@Bean(name = {"redisStringTemplate"})
public RedisTemplate<String, String> redisStringTemplate() {
RedisTemplate<String, String> tem = new RedisTemplate<>();
tem.setConnectionFactory(jedisConnectionFactory());
tem.setKeySerializer(new StringRedisSerializer());
tem.setValueSerializer(new StringRedisSerializer());
return tem;
}
@Bean(name = {"redisTemplateMap"})
public RedisTemplate<String, HashMap<String, String>> redisTemplateMap() {
RedisTemplate<String, HashMap<String, String>> tem = new RedisTemplate<>();
tem.setConnectionFactory(jedisConnectionFactory());
tem.setKeySerializer(new StringRedisSerializer());
tem.setValueSerializer(new JdkSerializationRedisSerializer());
return tem;
}
}