对于amqplib的使用心得

     最近在nodejs使用了amqplib--rabbitmq的nodejs客户端。封装在了express中,先来代码。

   

 1 var amqp = require('amqplib/callback_api');
 2 var config=require('../config/config');
 3 var log=require('../util/loghelp');
 4 function fail(err, conn) {
 5     log.error(err);
 6     if (conn) conn.close();
 7 }
 8 exports.StartConsumer=function (action,qname) {
 9     function on_connect(err, conn) {
10         if (err !== null) return fail(err);
11         function on_channel_open(err, ch) {
12             ch.assertQueue(qname, {durable: true}, function(err, ok) {
13                 if (err !== null) return bail(err, conn);
14                 ch.consume(qname, function(msg) {
15 
16                     log.info(`Received ${msg.content.toString()},start process`);
17                     action(JSON.parse(msg.content))
18                         .then(d=> {
19                             log.info("mq 处理成功,确认");ch.ack(msg)
20                         }
21                            )
22                         .catch(err=>
23                         ch.nack(msg));
24                 }, {noAck: false} );
25             });
26         }
27         conn.createChannel(on_channel_open);
28     }
29     amqp.connect(config.amqp.url,on_connect);
30 };
31 
32 exports.enqueue=function (data,qname) {
33     function on_connect(err, conn) {
34         if (err !== null) return bail(err);
35 
36         function on_channel_open(err, ch) {
37             if (err !== null) return bail(err, conn);
38             ch.assertQueue(qname, {durable: true}, function(err, ok) {
39                 if (err !== null) return bail(err, conn);
40                 var msg=JSON.stringify(data);
41                 ch.sendToQueue(qname, new Buffer(msg));
42                 log.info(`mq send ${msg}`);
43                 ch.close(function() { conn.close(); });
44             });
45         }
46         conn.createChannel(on_channel_open);
47     }
48     amqp.connect(config.amqp.url,on_connect);
49 };

    其中StartConsumer 会在项目启动时启动,在整个生命周期中一直保持监听状态,在程序结束时mq的链接关闭。需要注意的是 noAck 这个参数,当为false是表示消息出队后不会自动删除,如果设置成true,则无论消息处理成功与否此消息会被删除。注意到在消息不成功是,调用了ch.nack(msg)),此方法是将消息重新入队。

   而enqueue 则是消息入队列后连接立刻关闭,以免占用资源。

本人全手工打造的dotnetcore webapi 框架,可实现快速开发。地址:https://github.com/ryansecret/WebApiCore.git。 1 采用DDD模式开发,充血模型 2 添加Dapper扩展,默认实现增删改查基本操作。利用AutoMapper 做实体转换,减少重复劳动。 3 依赖注入融合Autofac,仓储层和应用层自动注入 4 实现JWT验证 5 加入swagger 文档 6 单元测试添加了xunit,MyMvc 可以方便对webapi测试 7 数据库版本控制
原文地址:https://www.cnblogs.com/ryansecreat/p/6030246.html