坐席排队功能

 需求:坐席排队  五秒之内没请求接口认为用户不在排队 从内存移除

redis用作消息队列

  1)实现过程: 用户请求接口  查询坐席有没有空闲,坐席没有空闲 继续排队   如果有 查询redis中有没有预先排队的用户  如果有 判断是否是队列中第一请求的用户,如果是分配坐席,如果不是 更新队列中的时间

  2)添加一个定时任务类 每三百毫秒刷新一次  查询队列中有没有超时的用户 超时之后将其移除

检测坐席是否为空闲状态

  如果有空闲坐席 通过websocket 发送给客户端消息 将坐席状态修改为2,请查看 ocatedseat 方法

比较麻烦的几点:

  1)添加用户到队列中返回添加队列的值(这个是lpush返回的)然后吧当前的值 和用户请求过来的唯一id存到redis 中 以备 刷新时间用

      long userindex = redisTemplate.opsForList().leftPush("GROUP_SEQUEUE_" + agent_group,JSONObject.toJSONString(map));

      redisTemplate.opsForValue().set(uniqueid, String.valueOf(userindex));

  2)刷新用户时间:取出队列的size 减去 第一步 存到redis中的值  然后修改队列中用户的时间 

      long size = redisTemplate.opsForList().size("GROUP_SEQUEUE_" + agent_group);
      String index = redisTemplate.opsForValue().get(uniqueid);
      long sub = size - Integer.parseInt(index);

      redisTemplate.opsForList().set("GROUP_SEQUEUE_" + agent_group,sub, JSONObject.toJSONString(map));

  3)坐席状态同步问题

     weibsocket 心跳 必须与 状态分开来发送否则会出现, 服务端发送给客户端状态为分配中时 而 客户端在接受到的时候会有时间间隔,在这个间隔的时候 客户端会发送之前的状态给服务端 造成状态不一致问题

public void handleMessage(WebSocketSession session,WebSocketMessage<?>message){ 
       //解析message 修改坐席状态
       try{
    	   User user = (User)session.getAttributes().get("user");
	       String state =   message.getPayload().toString();
	       JSONObject json = JSONObject.parseObject(state);
	       String type = (String) json.get("type");
	       Object state1 = json.get("status");
	       String name = (String) json.get("name");
	       if("0".equals(type)){
	    	   //修改状态
	    	   if(Integer.parseInt(state1.toString())==0){
	    		   redisTemplate.opsForSet().remove("OCATED", user.getUserid().toString());
	    	   }
	    	   redisTemplate.opsForList().remove("GROUP_"+user.getGroupid(), 0, JSONObject.toJSONString(user));
	    	   user.setState(Integer.parseInt(state1.toString()));
	    	   user.setNext_hop("123.57.144.26:9060/udp");
		       user.setTo("<sip:"+name+"@123.57.144.26:9060>");
		       redisTemplate.opsForList().leftPush("GROUP_"+user.getGroupid(), JSONObject.toJSONString(user));
		       logger.info("收到坐席{}发来得消息!当前状态为:{}   ",user.getUserid(),state1,DateUtil.formateDate(new Date(), "yyyy-MM-dd HH:mm:ss"));
	       }else if("1".equals(type)){
	    	   //只检测坐席是否掉线
	    	   this.hart.put(user, new Date());
		      // session.sendMessage(new TextMessage(JSONObject.toJSONString(user)+"  "+new Date()));
	       }else if("2".equals(type)){
	    	   //会话接通   保存到redis中 定时插入数据库
	    	   boolean flag = false;
	    	   Conversation con = new Conversation();
	    	   String reportnum = (String) json.get("reportnum");
	    	   String carnum = (String) json.get("carnum");
	    	   List<String> str = redisTemplate.opsForList().range("conversation", 0, -1);
	    	   for(String s : str){
		   			JSONObject j = JSONObject.parseObject(s);
		   			con = JSONObject.toJavaObject(j,Conversation.class);
		   			if(con.getCarnum().equals(carnum) && con.getReportnum().equals(reportnum)){
		   				flag = true;
		   			}
	   			}
	    	   if(flag){
	    		 //已经保存过一次
	    		   Long endtime = (Long) json.get("endtime");
		    	   con.setEndtime(DateUtil.timestampToDate(endtime.toString()));;
	    	   }else{
	    		   Long starttime = (Long) json.get("starttime");
	    		   con.setUserid(user.getUserid());
		    	   con.setCarnum(carnum);
		    	   con.setStarttime(DateUtil.timestampToDate(starttime.toString()));
		    	   con.setReportnum(reportnum);
	    	   }
	    	   redisTemplate.opsForList().leftPush("conversation", JSONObject.toJSONString(con));
	       }else{
	    	   session.sendMessage(new TextMessage("参数错误!"));
	       }
	       
	      
       }catch(Exception e){
    	   logger.error(e.getMessage(),e);
       }
    }  

  

下面奉上代码

1、controller   

package com.cloudunicomm.controller;

import java.io.IOException;
import java.io.OutputStream;
import java.text.ParseException;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.socket.WebSocketSession;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.cloudunicomm.interceptor.MySocketHandler;
import com.cloudunicomm.utils.DateUtil;
import com.cloudunicomm.vo.User;

@Controller
public class FreeController {
	
	private static final Logger logger = LoggerFactory.getLogger(FreeController.class);
	@Autowired
	private RedisTemplate<String, String> redisTemplate;
	@Bean
	public MySocketHandler mySorketHandle() {
		return new MySocketHandler();
	}
	@RequestMapping(value = "agent_service", method = RequestMethod.GET)
	@ResponseBody
	public void agentservice(HttpServletRequest request, HttpServletResponse response,
			@RequestParam(name = "customer_id", required = false) String customer_id,
			@RequestParam(name = "agent_group", required = false) String agent_group,
			@RequestParam(name = "uniqueid", required = false) String uniqueid) throws IOException, ParseException {
		//logger.info("组:{}---用户:{}----唯一id:{}过来请求接口!",agent_group,customer_id,uniqueid);
		StringBuffer sb = new StringBuffer();
		OutputStream out = response.getOutputStream();
		response.setHeader("Content-Disposition", "attachment; filename=" + DateUtil.getDate() + ".ini");
		sb.append("reply_code=200
").append("reply_reason='OK'
");
		//查询有没有空闲坐席
		List<String> users = redisTemplate.opsForList().range("GROUP_" + agent_group, 0, -1);
		for (String s : users) {
			User user = (User) JSONObject.toJavaObject(JSONObject.parseObject(s), User.class);
			Boolean aa = redisTemplate.opsForSet().isMember("OCATED", user.getUserid().toString());
			if (user.getState() == 0  && aa==false) {
				//查询有没有排队用户
				List<String> lists = redisTemplate.opsForList().range("GROUP_SEQUEUE_" + agent_group, 0, -1);
				if (lists.size() > 0) {
					String str = lists.get(lists.size() - 1);
					Set<Entry<String, Object>> obj = JSON.parseObject(str).entrySet();
					for (Entry<String, Object> s1 : obj) {
						if (s1.getKey().split("_")[1].equals(uniqueid)) {
							redisTemplate.opsForList().remove("GROUP_SEQUEUE_"+agent_group, 0, JSONObject.toJSONString(s1));
							this.ocatedseat(agent_group, user, customer_id, uniqueid);
							sb.append("next_hop="+user.getNext_hop()+"
");
							sb.append("To="+user.getTo());
							out.write(sb.toString().getBytes());
							out.flush();
							out.close();
							return ;
							//return ResultMessage.getSuccess().setMessage("已分配作息!");
						}
					}
				}else{
					this.ocatedseat(agent_group, user, customer_id, uniqueid);
					sb.append("next_hop="+user.getNext_hop()+"
");
					sb.append("To="+user.getTo());
					out.write(sb.toString().getBytes());
					out.flush();
					out.close();
					return ;
				}
			}
		}
		String exit = redisTemplate.opsForValue().get(agent_group+"_"+uniqueid);
		if(null == exit){
			 //没有
			this.adduserqueue(agent_group, uniqueid);
			//return ResultMessage.getSuccess().setMessage("请等待!");
			sb.append("retry_wait=5s");
			out.write(sb.toString().getBytes());
			out.flush();
			out.close();
			return;
		}else{
			//return ResultMessage.getSuccess().setMessage("继续等待");
			this.resettime(agent_group, uniqueid);
			sb.append("retry_wait=5s");
			out.write(sb.toString().getBytes());
			out.flush();
			out.close();
			return;
		}
	}
	
	/**
	 * 重置时间
	 * @param agent_group
	 * @param uniqueid
	 */
	public void resettime(String agent_group,String uniqueid){
		Map<String, Date> map = new HashMap<>();
		map.put(agent_group + "_" + uniqueid, new Date());
		long size = redisTemplate.opsForList().size("GROUP_SEQUEUE_" + agent_group);
		String index = redisTemplate.opsForValue().get(agent_group+"_"+uniqueid);
		long sub = size - Integer.parseInt(index);
		redisTemplate.opsForList().set("GROUP_SEQUEUE_" + agent_group,sub, JSONObject.toJSONString(map));
	}
	/**
	 * 添加用户到队列中
	 * @param map
	 * @param agent_group
	 * @param uniqueid
	 */
	public void adduserqueue(String agent_group,String uniqueid){
		logger.info("组{}唯一{} 加入队列",agent_group,uniqueid);
		Map<String, Date> map = new HashMap<>();
		map.put(agent_group + "_" + uniqueid, new Date());
		long userindex = redisTemplate.opsForList().leftPush("GROUP_SEQUEUE_" + agent_group,JSONObject.toJSONString(map));
		redisTemplate.opsForValue().set(agent_group+"_"+uniqueid, String.valueOf(userindex));
		
	}
	public  void ocatedseat(String agent_group,User user ,String customer_id,String uniqueid) throws ParseException{
		redisTemplate.opsForSet().add("OCATED", user.getUserid().toString());
		user.setState(2);
		WebSocketSession session = MySocketHandler.sessionid.get(user.getUserid().toString());
		mySorketHandle().pushMsg(session.getId(), "2");
		updateIndex(agent_group,uniqueid);
		//sb.append(user.getNext_hop() + "
").append(user.getTo());
		logger.info("坐席{}正在被分配给{}  !  {}", user.getUserid(),uniqueid,DateUtil.formateDate(new Date(), "yyyy-MM-dd HH:mm:ss"));
	}
	/**
	 * 更新索引值
	 * @param agent_group
	 * @param uniqueid
	 */
	private void updateIndex(String agent_group, String uniqueid) {
		Set<String> index = redisTemplate.keys(agent_group+"_*");
		//比较的value
		String comvalue = redisTemplate.opsForValue().get(agent_group+"_"+uniqueid);
		for(String s : index ){
			 int l = Integer.parseInt(redisTemplate.opsForValue().get(s));
			 if(l>Integer.parseInt(comvalue)){
				 int reslu = l-1;
				redisTemplate.opsForValue().set(s,String.valueOf(reslu));
			 }
			
		}
		redisTemplate.delete(agent_group+"_"+uniqueid);
	}
}

  2、定时任务

package com.cloudunicomm.task;

import java.text.ParseException;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSON;

/**
 * 检测redis中过期的key
 * @Title: CheckRedisKeyExpireTask.java
 * @date: 2018年6月29日
 * @author:  zhangdong
 * @version: v1.0
 * @Description
 */
@Component
public class CheckRedisKeyExpireTask {
	
	private static final Logger logger = LoggerFactory.getLogger(CheckRedisKeyExpireTask.class);
	@Autowired
	private RedisTemplate<String, ?> redisTemplate;
	
	@Scheduled(cron="0/2 * *  * * ? ")   //每秒执行一次   
	//@Scheduled(fixedRate=3000)
    public void job() throws ParseException {
		//logger.info("开始检测有没有过期的key   "+DateUtil.formateDate(new Date(), "yyyy-MM-dd HH:mm:ss"));
		Set<String> set = redisTemplate.keys("GROUP_SEQUEUE_*");
		for(String s:set){
			List<String> sss = (List<String>) redisTemplate.opsForList().range(s, 0, -1);
			for(String m : sss){
				Map<String, Long> maps = (Map<String, Long>)JSON.parse(m);
				for(Entry<String, Long> entry : maps.entrySet()){
					long start =  entry.getValue();
					long end = new Date().getTime();
					if(end-entry.getValue()>500000){
						//超时 从redis中移除
						String[] keys = entry.getKey().toString().split("_");
						redisTemplate.opsForList().remove("GROUP_SEQUEUE_"+keys[0], 0, maps.toString());
						redisTemplate.delete(entry.getKey());
						logger.info("组{}中的值{}已超过{}秒,将其移除!","GROUP_SEQUEUE_"+keys[0],maps.toString(),50);
					}
				}
			}
		}
    }
}

  

原文地址:https://www.cnblogs.com/xdcr/p/9258497.html