8_1_异步设计和站内邮件通知系统

一、需求描述

1. 利用Redis做消息队列,实现一个异步化服务框架;如图:

2. 利用搭建好的框架实现异步化发送点赞信息和登录异常信息 。

 

二、具体diamante实现

首先搭建应用Redis做消息队列的异步化框架

1.准备

JedisAdapter.java

类中加上lpush 和 bpop的代码用来实现消息队列;加上setObject 和 getObject实现序列化与反序列的过程(将事件存入消息队列的时候要序列化,从队列中取出事件的时候需要反序列化):

 public long lpush(String key, String value){
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            return jedis.lpush(key, value);
        }catch (Exception e){
                logger.error("Jedis lpush 发生异常 " + e.getMessage());
                return 0;
        }finally {
            if(jedis != null){
                try {
                    jedis.close();
                }catch (Exception e){
                    logger.error("Jedis 关闭异常 " + e.getMessage());
                }
            }
        }

    }

    public List<String> brpop(int timeout, String key){

        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            return jedis.brpop(timeout, key);
        }catch (Exception e){
            logger.error("Jedis brpop发生异常 " + e.getMessage());
            return null;
        }finally {
            if (jedis != null){
                try {
                    jedis.close();
                }catch (Exception e){
                    logger.error("Jedis 关闭异常" + e.getMessage());
                }
            }
        }
    }

    //序列化
    public void setObject(String key, Object object){
        set(key, JSON.toJSONString(object));
    }

    //反序列化
    public <T> T getObject(String key, Class<T> clazz){

        String value = get(key);
        if(value != null){
            return JSON.parseObject(value, clazz);
        }
        return null;

    }
View Code

RedisKeyUtil.java

类中加上一个生成事件key的方法,以后的事件都存入这个key对应的set集合中。

private static String BIZ_EVENT = "DISLIKE";


    /**
     * 事件发生的时候,生成key
     * @return
     */
    public static String getEventQueueKey(){
        return BIZ_EVENT;
    }
View Code

2. 异步化框架

EventType.java :事件类型

package com.nowcoder.async;

/**
 * Created by Administrator on 2017/5/7.
 */
public enum EventType {
    LIKE(0),
    COMMENT(1),
    LOGIN(1),
    MAIL(3);

    private int value;
    public int getValue() {
        return value;
    }
    EventType(int value) {
        this.value = value;
    }
}
View Code

EventModel.java : 发生的事件的数据都打包成一个Model(然后对这个model中数据进行序列化)

package com.nowcoder.async;

import java.util.HashMap;
import java.util.Map;

/**
 * Created by Administrator on 2017/5/7.
 */
public class EventModel {

    private EventType type;
    //事件触发者
    private int actorId;
    //表示一个触发事件的对象
    private int entityId;
    private int entityType;
    //事件对象的拥有者
    private int entityOwnerId;

    //存放触发的事件数据
    Map<String, String> exts = new HashMap<>();


    public EventModel(EventType type){
        this.type = type;
    }

    public String getExt(String key) {
        return exts.get(key);
    }

    public EventModel setExt(String key, String value) {
        exts.put(key, value);
        return this;
    }

    public EventModel(){

    }

    public EventType getType() {
        return type;
    }

    public EventModel setType(EventType type) {
        this.type = type;
        return this;
    }

    public int getActorId() {
        return actorId;
    }

    public EventModel setActorId(int actorId) {
        this.actorId = actorId;
        return this;
    }

    public int getEntityId() {
        return entityId;
    }

    public EventModel setEntityId(int entityId) {
        this.entityId = entityId;
        return this;
    }

    public int getEntityType() {
        return entityType;
    }

    public EventModel setEntityType(int entityType) {
        this.entityType = entityType;
        return this;
    }

    public int getEntityOwnerId() {
        return entityOwnerId;
    }

    public EventModel setEntityOwnerId(int entityOwnerId) {
        this.entityOwnerId = entityOwnerId;
        return this;
    }

    public Map<String, String> getExts() {
        return exts;
    }

    public void setExts(Map<String, String> exts) {
        this.exts = exts;
    }
}
View Code

EventProducer.java : 将发生的事件推送到消息队列。

package com.nowcoder.async;

import com.alibaba.fastjson.JSONObject;
import com.nowcoder.util.JedisAdapter;
import com.nowcoder.util.RedisKeyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * Created by Administrator on 2017/5/7.
 */
@Service
public class EventProducer {

    private static final Logger logger = LoggerFactory.getLogger(EventProducer.class);
    @Autowired
    JedisAdapter jedisAdapter;

    /**
     * 将产生的事件model推送到redis的工作队列中
     * @param model
     * @return
     */
    public boolean fireEvent(EventModel model){

        try {
            //序列化
            String json = JSONObject.toJSONString(model);
            //产生key
            String eventkey = RedisKeyUtil.getEventQueueKey();
            //放入工作队列
            jedisAdapter.lpush(eventkey, json);
            return true;
        }catch (Exception e){
            logger.error("EventProducer fireEvent 发生异常 : " + e.getMessage());
            return false;
        }
    }




}
View Code

 EventConsumer.java : 从消息队列中获取事件交给Handler类进行处理。

package com.nowcoder.async;

import com.alibaba.fastjson.JSON;
import com.nowcoder.util.JedisAdapter;
import com.nowcoder.util.RedisKeyUtil;
import jdk.nashorn.api.scripting.JSObject;
import org.apache.commons.collections.map.HashedMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Created by Administrator on 2017/5/7.
 */
@Service
public class EventConsumer implements InitializingBean, ApplicationContextAware{

    private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);

    //用来存储各种type事件的Handler
    private Map<EventType, List<EventHandler>> config = new HashMap<EventType, List<EventHandler>>();
    private ApplicationContext applicationContext;

    @Autowired
    JedisAdapter jedisAdapter;

    @Override
    public void afterPropertiesSet() throws Exception {
        //获取上下文所有实现EventHandler的类
        //使用BeanFatory的getBeansOfType()方法,该方法返回一个Map类型的实例,Map中的key为Bean的名,key对应的内容为Bean的实例。
        Map<String, EventHandler> beans = applicationContext.getBeansOfType(EventHandler.class);
        if (beans != null){
            for (Map.Entry<String, EventHandler> entry : beans.entrySet()){
                List<EventType> eventTypes = entry.getValue().getSupportEventType();
                for (EventType type : eventTypes){
                   //初始化的时候,若没有type,就加入
                    if(!config.containsKey(type)){
                        config.put(type, new ArrayList<EventHandler>());
                    }
                    config.get(type).add(entry.getValue());
                }
            }
        }

        //启动线程从工作队列中取出事件进行处理
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true){
                    String key = RedisKeyUtil.getEventQueueKey();
                    //从Redis数据库的键为key的set集合中获取存储的事件(事件Event为序列化过的,String类型)
                    List<String> events = jedisAdapter.brpop(0, key);
                    for (String message : events){
                        if (message.equals(key)){
                            continue;
                        }
                        EventModel eventModel = JSON.parseObject(message, EventModel.class);
                        //若事件没有注册过
                        if (!config.containsKey(eventModel.getType())){
                            logger.error("不能识别的事件 ");
                            continue;
                        }
                        //获取关注过该事件的handler,一一进行处理事件
                        for (EventHandler handler : config.get(eventModel.getType())){
                            handler.doHandle(eventModel);
                        }
                    }
                }
            }
        });
        thread.start();

    }

    /**
     * 实现ApplicationContextAware接口的context注入函数, 将其存入静态变量.
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}
View Code

EventHandler.java: 接口,可以从消费者中获取事件交给对应的Handler实现类去处理:

package com.nowcoder.async;

import java.util.List;

/**
 * Created by Administrator on 2017/5/7.
 */
public interface EventHandler {

    //对EventConsumer中的event事件进行处理
    void doHandle(EventModel model);
    //获取哪些关注事件类型
    List<EventType> getSupportEventType();
}
View Code

LikeHandler.java: 实现点赞通知的类

package com.nowcoder.async.handler;

import com.nowcoder.async.EventHandler;
import com.nowcoder.async.EventModel;
import com.nowcoder.async.EventType;
import com.nowcoder.model.HostHolder;
import com.nowcoder.model.Message;
import com.nowcoder.model.User;
import com.nowcoder.service.MessageService;
import com.nowcoder.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.Date;
import java.util.List;

/**
 * Created by Administrator on 2017/5/7.
 */
@Component
public class LikeHandler implements EventHandler{

    @Autowired
    MessageService messageService;
    @Autowired
    UserService userService;
    @Autowired
    HostHolder hostHolder;

    @Override
    public void doHandle(EventModel model) {

        System.out.print("有人点赞了");
       Message message = new Message();

        //测试方便查看:就是自己发送给自己站内信
        //int fromId = model.getActorId();
       // int toId = fromId;

        //正常情况下fromId是当前点赞用户id,toId是点赞的咨询news所在的id
        // actorId = hostHolder.getUser().getId();
        int fromId = model.getActorId();
        //entityOwnerId = news.getId()
        int toId = model.getEntityOwnerId();

        message.setHasRead(0);// 0 代表未读 1 代表已读
        message.setFromId(fromId);
        message.setToId(toId);
        message.setConversationId(fromId < toId ? String.format("%d_$d", fromId, toId) : String.format("%d_%d", toId, fromId));

        User user = userService.getUser(model.getActorId());
        message.setContent("用户" + user.getName()
                + "赞了你的资讯,http://127.0.0.1:8080/news/" + model.getEntityId());
        message.setCreatedDate(new Date());
        messageService.addMessage(message);
    }

    @Override
    public List<EventType> getSupportEventType() {
        return Arrays.asList(EventType.LIKE);
    }
}
View Code
LoginExceptionHandler:登录时发生登录异常到对应的站内信,以及实现邮件发送
package com.nowcoder.async.handler;

import com.nowcoder.async.EventHandler;
import com.nowcoder.async.EventModel;
import com.nowcoder.async.EventType;
import com.nowcoder.model.Message;
import com.nowcoder.service.MessageService;
import com.nowcoder.util.MailSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.*;

/**
 * Created by Administrator on 2017/5/7.
 */
@Service
public class LoginExceptionHandler implements EventHandler{

    @Autowired
    MessageService messageService;

    @Autowired
    MailSender mailSender;

    @Override
    public void doHandle(EventModel model) {
        // 判断是否有异常登陆
        Message message = new Message();
        message.setToId(model.getActorId());
        message.setContent("你上次的登陆ip异常");
        message.setFromId(17);
        message.setCreatedDate(new Date());
        messageService.addMessage(message);

        //邮件发送
        Map<String, Object> map = new HashMap<String, Object>();
        map.put("username", model.getExt("username"));
        mailSender.sendWithHTMLTemplate(model.getExt("email"), "登陆异常", "mails/welcome.html",
                map);
    }

    @Override
    public List<EventType> getSupportEventType() {
        return Arrays.asList(EventType.LOGIN);
    }
}
View Code

3. 邮件发送

引入jar包:

<dependency>
            <groupId>com.sun.mail</groupId>
            <artifactId>javax.mail</artifactId>
            <version>1.5.5</version>
        </dependency>
package com.nowcoder.util;

import org.apache.velocity.app.VelocityEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.mail.javamail.JavaMailSenderImpl;
import org.springframework.mail.javamail.MimeMessageHelper;
import org.springframework.stereotype.Service;

import javax.mail.internet.MimeUtility;

import org.springframework.ui.velocity.VelocityEngineUtils;

import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
import java.util.Map;
import java.util.Properties;

/**
 * Created by Administrator on 2017/5/7.
 */
@Service
public class MailSender implements InitializingBean {
    private static final Logger logger = LoggerFactory.getLogger(MailSender.class);
    private JavaMailSenderImpl mailSender;

    @Autowired
    private VelocityEngine velocityEngine;

    public boolean sendWithHTMLTemplate(String to, String subject,
                                        String template, Map<String, Object> model) {
        try {
            String nick = MimeUtility.encodeText("阮宏宝");
            InternetAddress from = new InternetAddress(nick + "<1032335358@qq.com>");
            MimeMessage mimeMessage = mailSender.createMimeMessage();
            MimeMessageHelper mimeMessageHelper = new MimeMessageHelper(mimeMessage);
            String result = VelocityEngineUtils
                    .mergeTemplateIntoString(velocityEngine, template, "UTF-8", model);
            mimeMessageHelper.setTo(to);
            mimeMessageHelper.setFrom(from);
            mimeMessageHelper.setSubject(subject);
            mimeMessageHelper.setText(result, true);
            mailSender.send(mimeMessage);
            return true;
        } catch (Exception e) {
            logger.error("发送邮件失败" + e.getMessage());
            return false;
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        mailSender = new JavaMailSenderImpl();
        mailSender.setUsername("1032335358@qq.com");
        mailSender.setPassword("***********");
        mailSender.setHost("smtp.qq.com");
        mailSender.setPort(465);
        mailSender.setProtocol("smtps");
        mailSender.setDefaultEncoding("utf8");
        Properties javaMailProperties = new Properties();
        javaMailProperties.put("mail.smtp.ssl.enable", true);
        mailSender.setJavaMailProperties(javaMailProperties);
    }
}
View Code

4. 测试

LikeController.java:

点赞的时候加入异步点赞通知:

//异步发送
        eventProducer.fireEvent(new EventModel(EventType.LIKE)
                    .setActorId(hostHolder.getUser().getId())
                    .setEntityId(newsId)
                    .setEntityType(EntityType.ENTITY_NEWS)
                    .setEntityOwnerId(news.getUserId()));

LoginController.java

登录时加上登录异常的通知:

 eventProducer.fireEvent(new EventModel(EventType.LOGIN)
                        .setActorId(18)
                        .setExt("username", username).setExt("email", "1032335358   @qq.com"));

5 相关代码

LoginController.java

package com.nowcoder.controller;

import com.nowcoder.async.EventModel;
import com.nowcoder.async.EventProducer;
import com.nowcoder.async.EventType;
import com.nowcoder.service.UserService;
import com.nowcoder.util.ToutiaoUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.*;

import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletResponse;
import java.util.Map;

/**
 * Created by Administrator on 2017/4/8.
 */
@Controller
public class LoginController {

    private static final Logger logger = LoggerFactory.getLogger(LoginController.class);

    @Autowired
    UserService userService;
    @Autowired
    EventProducer eventProducer;

    @RequestMapping(path = {"/reg/"}, method = {RequestMethod.GET, RequestMethod.POST})
    @ResponseBody
    public String reg(Model model,
                      @RequestParam("username") String username,
                      @RequestParam("password") String password,
                      @RequestParam(value = "rember", defaultValue = "0") int rember,
                      HttpServletResponse response){
        try {
           Map<String, Object> map = userService.register(username, password);
           if(map.containsKey("ticket")){
               Cookie cookie = new Cookie("ticket", map.get("ticket").toString());
               cookie.setPath("/");
                //有记住我,就设置时间长一点
               if(rember > 0){
                   cookie.setMaxAge(3600 * 24 * 5);
               }
               response.addCookie(cookie);
               return  ToutiaoUtil.getJSONString(0, "注册成功");
           }else {
               return  ToutiaoUtil.getJSONString(1, map);
           }
        }catch (Exception e){
            logger.error("注册异常" + e.getMessage());
            return ToutiaoUtil.getJSONString(1, "注册异常");
        }
    }


    @RequestMapping(path = {"/login/"}, method = {RequestMethod.GET, RequestMethod.POST})
    @ResponseBody
    public String login(Model model,
                        @RequestParam("username") String username,
                        @RequestParam("password") String password,
                        @RequestParam(value = "rember", defaultValue = "0") int rememberme,
                        HttpServletResponse response){

        try {
            Map<String, Object> map = userService.login(username, password);
            if (map.containsKey("ticket")) {
                Cookie cookie = new Cookie("ticket", map.get("ticket").toString());
                cookie.setPath("/");
                if (rememberme > 0) {
                    cookie.setMaxAge(3600*24*5);
                }
                response.addCookie(cookie);
                eventProducer.fireEvent(new EventModel(EventType.LOGIN)
                        .setActorId(18)
                        .setExt("username", username).setExt("email", "1032335358   @qq.com"));
                return ToutiaoUtil.getJSONString(0, "登录成功");
            } else {
                return ToutiaoUtil.getJSONString(1, map);
            }

        } catch (Exception e) {
            logger.error("登录异常" + e.getMessage());
            return ToutiaoUtil.getJSONString(1, "登录异常");
        }

    }

    @RequestMapping(path = {"/logout/"}, method = {RequestMethod.POST, RequestMethod.GET})
    public String logout(@CookieValue("ticket") String ticket){
        userService.logout(ticket);
        return "redirect:/";
    }

}

LikeController.java

package com.nowcoder.controller;

import com.nowcoder.async.EventModel;
import com.nowcoder.async.EventProducer;
import com.nowcoder.async.EventType;
import com.nowcoder.model.EntityType;
import com.nowcoder.model.HostHolder;
import com.nowcoder.model.News;
import com.nowcoder.service.LikeService;
import com.nowcoder.service.NewsService;
import com.nowcoder.util.ToutiaoUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

/**
 * Created by Administrator on 2017/5/1.
 */
@Controller
public class LikeController {

    @Autowired
    LikeService likeService;
    @Autowired
    NewsService newsService;
    @Autowired
    HostHolder hostHolder;
    @Autowired
    EventProducer eventProducer;

    @RequestMapping(path = {"/like"}, method = {RequestMethod.GET, RequestMethod.POST})
    @ResponseBody
    public String like(@RequestParam("newsId") int newsId){
        //在likeKey对应的集合中加入当前用户
        long likeCount = likeService.like(hostHolder.getUser().getId(), EntityType.ENTITY_NEWS, newsId);

        //资讯上更新点赞数
        News news = newsService.getById(newsId);
        newsService.updateLikeCount(newsId, (int)likeCount);

        //异步发送
        eventProducer.fireEvent(new EventModel(EventType.LIKE)
                    .setActorId(hostHolder.getUser().getId())
                    .setEntityId(newsId)
                    .setEntityType(EntityType.ENTITY_NEWS)
                    .setEntityOwnerId(news.getUserId()));

        return ToutiaoUtil.getJSONString(0, String.valueOf(likeCount));
    }

    @RequestMapping(path = {"/dislike"}, method = {RequestMethod.POST, RequestMethod.GET})
    @ResponseBody
    public String disLike(@RequestParam("newsId") int newsId){

        //在disLikeKey对应的集合中加入当前用户
        long likeCount = likeService.disLike(hostHolder.getUser().getId(), EntityType.ENTITY_NEWS, newsId);
        if(likeCount <= 0){
            likeCount = 0;
        }

        //资讯上更新喜欢数
        newsService.updateLikeCount(newsId, (int)likeCount);
        return ToutiaoUtil.getJSONString(0, String.valueOf(likeCount));
    }


}

  

原文地址:https://www.cnblogs.com/noaman/p/6822979.html