springboot+websocket+vue 服务端向前端推送消息

最近项目中需要进行在线用户管理,故采用了websocket来实现消息推送至前端

pom依赖

<!-- WebSocket -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
    <version>2.1.0.RELEASE</version>
</dependency>
后端
@Service
public class UserOnlineInfoServiceImpl implements UserOnlineInfoService {
    @Autowired
    private UserDao userDao;
    @Autowired
    private RoleDao roleDao;
    @Autowired
    private UserOnlineInfoDao userOnlineInfoDao;

    /**
     * 根据条件查询
     * @param userOnlineQuery
     * @return
     */
    @Override
    public List<UserOnlineInfoDTO> query(UserOnlineQuery userOnlineQuery) {
        try {
            UserOnlineInfo onlineInfo = new UserOnlineInfo();
            if (userOnlineQuery != null) {
                if (StrUtil.isBlank(userOnlineQuery.getName())) {
                    userOnlineQuery.setName(null);
                }
                if (StrUtil.isBlank(userOnlineQuery.getCode())) {
                    userOnlineQuery.setCode(null);
                }
                BeanUtils.copyProperties(userOnlineQuery, onlineInfo);
            }
            List<UserOnlineInfo> onlineInfos = userOnlineInfoDao.queryByCondition(onlineInfo);
            List<UserOnlineInfoDTO> onlineInfoDTOs = new ArrayList<>(onlineInfos.size());
            onlineInfos.forEach(info -> {
                UserOnlineInfoDTO infoDTO = new UserOnlineInfoDTO();
                BeanUtils.copyProperties(info, infoDTO);
                onlineInfoDTOs.add(infoDTO);
            });
            return onlineInfoDTOs;
        } catch (Exception e) {
            throw new ServiceException(SystemManageExceptionEnum.ONLINEUSER_MANAGE_QUERY_FALSE, e);
        }
    }

    /**
     * 类型转换
     *
     * @Param: [dto]
     * @Return: com.boss.bes.user.permission.dao.entity.UserOnlineInfoPo
     * @Date: 2020/9/2 15:27
     */
    public UserOnlineInfo doObjectTransfer(UserOnlineInfoDTO dto) {
        if (dto == null) {
            return null;
        }
        UserOnlineInfo userOnlineInfoPo = new UserOnlineInfo();
        BeanUtil.copyProperties(dto, userOnlineInfoPo);
        return userOnlineInfoPo;
    }

    /**
     * 类型转换
     *
     * @Param: [entity]
     * @Return: com.boss.bes.user.permission.pojo.dto.UserOnlineInfoDto
     * @Date: 2020/9/2 15:27
     */
    public UserOnlineInfoDTO doObjectTransfer(UserOnlineInfo entity) {
        if (entity == null) {
            return null;
        }
        UserOnlineInfoDTO userOnlineInfoDto = new UserOnlineInfoDTO();
        BeanUtil.copyProperties(entity, userOnlineInfoDto);
        return userOnlineInfoDto;
    }


    @Override
    public UserOnlineInfoDTO userLogin(UserOnlineInfoDTO userOnlineInfoDTO) {
        if (userOnlineInfoDTO == null || userOnlineInfoDTO.getUserId() == null) {
            return null;
        }
        try {
            User user = userDao.get(userOnlineInfoDTO.getUserId());
            if (user == null) {
                return null;
            }
            UserOnlineInfo userOnlineInfo = new UserOnlineInfo();
            userOnlineInfo.setUserId(user.getId());
            userOnlineInfo.setCode(user.getCode());
            userOnlineInfo.setName(user.getName());
            userOnlineInfo.setIp(userOnlineInfoDTO.getIp());
            userOnlineInfo.setOnlineTime(new Date());
            userOnlineInfoDao.save(userOnlineInfo);
            BeanUtils.copyProperties(userOnlineInfo, userOnlineInfoDTO);
            return userOnlineInfoDTO;
        } catch (Exception e) {
            throw new ServiceException(SystemManageExceptionEnum.ONLINEUSER_MANAGE_LOGIN_FALSE, e);
        }
    }

    @Override
    public List<UserOnlineInfoDTO> forceUserLogout(List<UserOnlineInfoDTO> infoDTOs) {
        if (CollectionUtil.isEmpty(infoDTOs)) {
            return new ArrayList<>();
        }
        //过滤重复的,或为null的用户id
        Set<Long> userIdSet = infoDTOs.stream()
            .filter(dto -> dto != null && dto.getUserId() != null)
            .map(UserOnlineInfoDTO::getUserId)
            .collect(Collectors.toSet());
        try {
            if (CollectionUtil.isEmpty(userIdSet)) {
                return new ArrayList<>();
            }
            //过滤非本公司或者本机构的用户
            Set<Long> safeUserIdSet = roleDao.queryUserIdIn(userIdSet).stream()
                .map(BaseEntity::getId)
                .collect(Collectors.toSet());
            if (CollectionUtil.isEmpty(safeUserIdSet)) {
                return new ArrayList<>();
            }
            infoDTOs.removeIf(infoDTO -> !safeUserIdSet.contains(infoDTO.getUserId()));
            if (CollectionUtil.isEmpty(infoDTOs)) {
                return new ArrayList<>();
            }
            List<UserOnlineInfo> infos = new ArrayList<>(infoDTOs.size());
            Date currentDate = new Date();
            infoDTOs.forEach(infoDTO -> {
                UserOnlineInfo onlineInfo = new UserOnlineInfo();
                infoDTO.setOfflineTime(currentDate);
                //下线状态
                infoDTO.setStatus((byte) 1);
                BeanUtils.copyProperties(infoDTO, onlineInfo);
                long duration = (currentDate.getTime() - infoDTO.getOnlineTime().getTime()) / (1000);
                onlineInfo.setStopTime((int) duration);
                infos.add(onlineInfo);
            });
            userOnlineInfoDao.batchUpdate(infos);
            return infoDTOs;
        } catch (Exception e) {
            throw new ServiceException(SystemManageExceptionEnum.ONLINEUSER_MANAGE_LOGOUT_FALSE, e);
        }
    }

    @Override
    public boolean userLogout(UserOnlineInfoDTO infoDTO) {
        if (infoDTO == null) {
            return false;
        }
        try {
            infoDTO.setOfflineTime(new Date());
            long duration = (infoDTO.getOfflineTime().getTime() - infoDTO.getOnlineTime().getTime()) / (1000);
            infoDTO.setStopTime((int) duration);
            //下线状态
            infoDTO.setStatus((byte) 1);
            UserOnlineInfo onlineInfo = new UserOnlineInfo();
            BeanUtils.copyProperties(infoDTO, onlineInfo);
            return userOnlineInfoDao.unsafeUpdate(onlineInfo) > 0;
        } catch (Exception e) {
            throw new ServiceException(SystemManageExceptionEnum.ONLINEUSER_MANAGE_LOGOUT_FALSE, e);
        }
    }
}

前端
 Vuex中在用户登录调用的方法中加入建立websocket的代码,在用户登录的时候就发送请求,与服务器进行连接,全程一直保持连接,接受服务器的信息。
     // 与服务器建立连接
          if (WebSocket) {
            const socket = new WebSocket(`${BASE_URL}/permission/websocket/${user.id}`)
            commit('SET_SOCKET', socket)
            socket.onopen = (e) => {
              heartCheck.start()
            }
            socket.onmessage = (e) => {
              console.log(e)
              try {
                const res = JSON.parse(e.data)
                if (res && res.body) {
                  // 收到强制下线请求
                  if (res.body === 'logout') {
                    if (state.socket) {
                      const socket = state.socket
                      commit('SET_SOCKET', null)
                      socket.close()
                    }
                    MessageBox.confirm('你已经被管理员强制下线了', '强制下线通知', {
                      confirmButtonText: '重新登录',
                      cancelButtonText: '停留在此页',
                      type: 'warning'
                    }).then(() => {
                      // 登出
                      dispatch('logout').then(() => {
                        location.reload()
                      })
                    }).catch(() => {
                      // 清除token信息
                      dispatch('resetToken').then(() => {
                      })
                    })
                  }
                  // 收到被顶下线请求
                  if (res.body === 'replaceLogout') {
                    if (state.socket) {
                      const socket = state.socket
                      commit('SET_SOCKET', null)
                      socket.close()
                    }
                    MessageBox.confirm('你已经其他地方登录', '强制下线通知', {
                      confirmButtonText: '重新登录',
                      cancelButtonText: '停留在此页',
                      type: 'warning'
                    }).then(() => {
                      // 登出
                      dispatch('logout').then(() => {
                        location.reload()
                      })
                    }).catch(() => {
                      // 清除token信息
                      dispatch('resetToken').then(() => {
                      })
                    })
                  }
                }
              } catch (err) {
                console.log(err)
              }
            }
            socket.onclose = (e) => {
              console.log(e)
              heartCheck.clear()
              dispatch('resetToken').then(() => {})
            }
            socket.onerror = (e) => {
              console.log(e)
              Message({
                message: '连接服务器失败',
                type: 'error',
                duration: 5 * 1000
              })
            }
          }
原文地址:https://www.cnblogs.com/venb/p/13792086.html