分布式事务10_最大努力通知形-copy

实现
业务活动的主动发,在完成业务处理后,向业务活动的被动方发送消息允许消息丢失
业务活动的被动方根据定时策略,向业务活动的主动发查询,恢复丢失的业务消息
约束
被动方的处理结果不影响主动方的处理结果
成本
业务查询与校对系统的建设成本
适用范围
对业务最终一致性的时间敏感度低
跨企业的业务活动
用到的服务模式
可查询操作
方案特点
业务活动的主动发在完成业务处理后,向业务活动被动方发送通知消息(允许消息丢失)
主动发可以设置时间梯度通知规则,在通知失败后按照规则重复通知,直到通知N次后不再通知
主动发提供校对查询接口给被动方按需校对查询,用于恢复丢失的业务消息
行业应用案例
银行通知、商户通知等(各大交易业务平台的商户通知:多次通知、查询校对、对账文件)
设计
数据库
rp_notify_record
DROP TABLE IF EXISTS `rp_notify_record`;
CREATE TABLE `rp_notify_record` (
`id` varchar(50) NOT NULL DEFAULT '' COMMENT '主键ID',
`version` int(11) NOT NULL DEFAULT '0' COMMENT '版本事情',
`create_time` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '创建时间',
`edit_time` datetime DEFAULT NULL COMMENT '最后修改时间',
`notify_rule` varchar(255) DEFAULT NULL COMMENT '通知规则(单位:分钟)',
`notify_times` int(11) NOT NULL DEFAULT '0' COMMENT '已通知次数',
`limit_notify_times` int(11) NOT NULL DEFAULT '0' COMMENT '最大通知次数限制',
`url` varchar(2000) NOT NULL DEFAULT '' COMMENT '通知请求链接(包含通知内容)',
`merchant_order_no` varchar(50) NOT NULL DEFAULT '' COMMENT '商户订单号',
`merchant_no` varchar(50) NOT NULL DEFAULT '' COMMENT '商户编号',
`status` varchar(50) NOT NULL DEFAULT '' COMMENT '通知状态(对应枚举值)',
`notify_type` varchar(30) DEFAULT NULL COMMENT '通知类型',
PRIMARY KEY (`id`),
KEY `AK_KEY_2` (`merchant_order_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='通知记录表 RP_NOTIFY_RECORD';
rp_notify_record_log
DROP TABLE IF EXISTS `rp_notify_record_log`;
CREATE TABLE `rp_notify_record_log` (
`id` varchar(50) NOT NULL DEFAULT '' COMMENT 'ID',
`version` int(11) NOT NULL DEFAULT '0' COMMENT '版本号',
`edit_time` datetime DEFAULT NULL COMMENT '最后修改时间',
`create_time` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '创建时间',
`notify_id` varchar(50) NOT NULL DEFAULT '' COMMENT '通知记录ID',
`request` varchar(2000) NOT NULL DEFAULT '' COMMENT '请求内容',
`response` varchar(2000) NOT NULL DEFAULT '' COMMENT '响应内容',
`merchant_no` varchar(50) NOT NULL DEFAULT '' COMMENT '商户编号',
`merchant_order_no` varchar(50) NOT NULL COMMENT '商户订单号',
`http_status` varchar(50) NOT NULL COMMENT 'HTTP状态',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='通知记录日志表 RP_NOTIFY_RECORD_LOG';
接口
RpNotifyService
public interface RpNotifyService {

/**
* 创建消息通知
* @param rpNotifyRecord
*/
public long createNotifyRecord(RpNotifyRecord rpNotifyRecord) throws NotifyBizException;

/**
* 修改消息通知
* @param rpNotifyRecord
*/
public void updateNotifyRecord(RpNotifyRecord rpNotifyRecord) throws NotifyBizException;

/**
* 创建消息通知记录
* @param rpNotifyRecordLog
* @return
*/
public long createNotifyRecordLog(RpNotifyRecordLog rpNotifyRecordLog) throws NotifyBizException;

/**
* 发送消息通知
* @param notifyUrl 通知地址
* @param merchantOrderNo 商户订单号
* @param merchantNo 商户编号
*/
public void notifySend(String notifyUrl,String merchantOrderNo,String merchantNo) throws NotifyBizException;


/**
* 通过ID获取通知记录
* @param id
* @return
*/
public RpNotifyRecord getNotifyRecordById(String id) throws NotifyBizException;

/**
* 根据商户编号,商户订单号,通知类型获取通知记录
* @param merchantNo 商户编号
* @param merchantOrderNo 商户订单号
* @param notifyType 消息类型
* @return
*/
public RpNotifyRecord getNotifyByMerchantNoAndMerchantOrderNoAndNotifyType(String merchantNo , String merchantOrderNo , String notifyType) throws NotifyBizException;

/**
* 按条件分页查询通知记录.
*/
public PageBean<RpNotifyRecord> queryNotifyRecordListPage(PageParam pageParam , Map<String, Object> paramMap) throws NotifyBizException;


}
RpNotifyServiceImpl
public class RpNotifyServiceImpl implements RpNotifyService {

@Autowired
private JmsTemplate notifyJmsTemplate;

@Autowired
private RpNotifyRecordDao rpNotifyRecordDao;

@Autowired
private RpNotifyRecordLogDao rpNotifyRecordLogDao;

/**
* 创建消息通知
*
* @param rpNotifyRecord
*/
@Override
public long createNotifyRecord(RpNotifyRecord rpNotifyRecord) {
return rpNotifyRecordDao.insert(rpNotifyRecord);
}

/**
* 修改消息通知
*
* @param rpNotifyRecord
*/
@Override
public void updateNotifyRecord(RpNotifyRecord rpNotifyRecord) {
rpNotifyRecordDao.update(rpNotifyRecord);
}

/**
* 创建消息通知记录
*
* @param rpNotifyRecordLog
* @return
*/
@Override
public long createNotifyRecordLog(RpNotifyRecordLog rpNotifyRecordLog) {
return rpNotifyRecordLogDao.insert(rpNotifyRecordLog);
}



/**
* 发送消息通知
*
* @param notifyUrl 通知地址
* @param merchantOrderNo 商户订单号
* @param merchantNo 商户编号
*/
@Override
public void notifySend(String notifyUrl, String merchantOrderNo, String merchantNo) {

RpNotifyRecord record = new RpNotifyRecord();
record.setNotifyTimes(0);
record.setLimitNotifyTimes(5);
record.setStatus(NotifyStatusEnum.CREATED.name());
record.setUrl(notifyUrl);
record.setMerchantOrderNo(merchantOrderNo);
record.setMerchantNo(merchantNo);
record.setNotifyType(NotifyTypeEnum.MERCHANT.name());

Object toJSON = JSONObject.toJSON(record);
final String str = toJSON.toString();

notifyJmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(str);
}
});
}

/**
* 通过ID获取通知记录
*
* @param id
* @return
*/
@Override
public RpNotifyRecord getNotifyRecordById(String id) {
return rpNotifyRecordDao.getById(id);
}

/**
* 根据商户编号,商户订单号,通知类型获取通知记录
*
* @param merchantNo 商户编号
* @param merchantOrderNo 商户订单号
* @param notifyType 消息类型
* @return
*/
@Override
public RpNotifyRecord getNotifyByMerchantNoAndMerchantOrderNoAndNotifyType(String merchantNo, String merchantOrderNo, String notifyType) {
return rpNotifyRecordDao.getNotifyByMerchantNoAndMerchantOrderNoAndNotifyType(merchantNo,merchantOrderNo,notifyType);
}

@SuppressWarnings("unchecked")
@Override
public PageBean<RpNotifyRecord> queryNotifyRecordListPage(PageParam pageParam, Map<String, Object> paramMap) {
return rpNotifyRecordDao.listPage(pageParam,paramMap);
}


}
延时执行
import java.util.concurrent.DelayQueue;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import com.roncoo.pay.app.notify.core.NotifyPersist;
import com.roncoo.pay.app.notify.core.NotifyTask;
import com.roncoo.pay.service.notify.aip.RpNotifyService;

/**
* 商户通知应用启动类.
*
*/
public class App
{
private static final Log LOG = LogFactory.getLog(App.class);

/**
* 通知任务延时队列,对象只能在其到期时才能从队列中取走。
*/
public static DelayQueue<NotifyTask> tasks = new DelayQueue<NotifyTask>();

private static ClassPathXmlApplicationContext context;

private static ThreadPoolTaskExecutor threadPool;

public static RpNotifyService rpNotifyService;

public static NotifyPersist notifyPersist;

public static void main(String[] args) {
try {
context = new ClassPathXmlApplicationContext(new String[] { "spring/spring-context.xml" });
context.start();
threadPool = (ThreadPoolTaskExecutor) context.getBean("threadPool");
rpNotifyService = (RpNotifyService) context.getBean("rpNotifyService");

// 从数据库中取一次数据用来当系统启动时初始化
notifyPersist = (NotifyPersist) context.getBean("notifyPersist");
.initNotifyDataFromDB();

startThread(); // 启动任务处理线程

LOG.info("== context start");
} catch (Exception e) {
LOG.error("== application start error:", e);
return;
}
synchronized (App.class) {
while (true) {
try {
App.class.wait();
} catch (InterruptedException e) {
LOG.error("== synchronized error:", e);
}
}
}
}

private static void startThread() {
LOG.info("==>startThread");

threadPool.execute(new Runnable() {
public void run() {
try {
while (true) {
LOG.info("==>threadPool.getActiveCount():" + threadPool.getActiveCount());
LOG.info("==>threadPool.getMaxPoolSize():" + threadPool.getMaxPoolSize());
// 如果当前活动线程等于最大线程,那么不执行
if (threadPool.getActiveCount() < threadPool.getMaxPoolSize()) {
LOG.info("==>tasks.size():" + tasks.size());
//使用take方法获取过期任务,如果获取不到,就一直等待,知道获取到数据
final NotifyTask task = tasks.take();
if (task != null) {
threadPool.execute(new Runnable() {
public void run() {
tasks.remove(task);
task.run(); // 执行通知处理
LOG.info("==>tasks.size():" + tasks.size());
}
});
}
}
}
} catch (Exception e) {
LOG.error("系统异常;",e);
}
}
});
}

}
————————————————
版权声明:本文为CSDN博主「chenshiying007」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_27384769/article/details/79331027

原文地址:https://www.cnblogs.com/hanease/p/14471449.html