异步编程解决方案

  • 事件发布/订阅模式
  • promise/deferrd模式
  • 流程控制模式

事件发布/订阅模式

事件监听器模式是异步回调的事件化,又称发布订阅/模式

node核心模块events

方法

  • addListener/on
  • once
  • removeListener
  • removeAllListeners
  • emit

简单操作

let events = require('events')
let request = require('http').request
let emit = new events.EventEmitter()
emit.on('message', function (msg) {
    console.log(msg)
})
emit.once('message1', function (msg) {
    console.log(msg)
})
emit.emit('message','message')
emit.emit('message','message')
emit.emit('message1','message1')
emit.emit('message1','message1')


此示例中,因为message事件通过on来监听,message1通过once来监听,所以message事件的回调可以执行多次,message1事件的回调只能执行一次

结果:

message
message
message1

事件分发/订阅也可以用来做一个功能黑盒,分发者只需要触发这个事件,不需要知道有多少订阅者,这些订阅者都在处理什么业务。事件侦听器模式也是一种钩子机制,node中的封装很多是一个黑盒,不使用事件监听我们很难知道内部状态变化的中间数据。使用事件侦听器可以不必知道内部如何实现,只关注特定的事件点,比如下面的http请求

let request = require('http').request
let options = {
    host: 'www.google.com',
    port: 80,
    path: '/upload',
    method: 'POST'
}
let req = request(options, function(res) {
    console.log('STATUS', res.statusCode)
    console.log('header', JSON.stringify(res.headers))
    res.setEncoding('utf-8')
    res.on('data', function (chunck) {
        console.log('Chunck', chunck)
    })
    res.on('end', function() {
        console.log('end')
    })
})
req.on('error', function(err) {
    console.log(err)
})
req.write('data
')
req.write('data
')
req.end()

只需要监听data  end  error事件就可以完成功能,不需要知道request究竟如何实现

细节:

  1. 如果对一个事件设置了10个以上的监听器,会有警告。可通过emitter.setMaxListeners(0)将限制去掉
  2. 异常处理,如果有监听error事件,则会执行error中的代码否则将会抛出错误,整个进程结束

利用队列解决雪崩问题

雪崩问题:在高并发高访问量的情况下缓存失效的情境下,大量请求同时发向数据库,数据库无法承受如此多的查询请求,进而影响网站整体性能

var proxy = new events.EventEmitter();
var status = "ready";
var select = function (callback) {
  proxy.once("selected", callback);
  if (status === "ready") {
    status = "pending";
    db.select("SQL", function (results) {
      proxy.emit("selected", results);
      status = "ready";
    });
  }
};


短时间内,如果执行多个select,不会执行多次查询请求,而是会用此次的callback来监听select事件,只执行一次查询请求,当这个请求返回时,触发select事件,执行之前的所有callback,由于用的once来监听,所以每个callback只会执行一次。

多异步之间的协作方案

const fs = require('fs')
const after = function (times = 0, callback) {
    let time = 0,datas = {}
    return function (key, value) {
        datas[key] = value
        if(++ time >= times) {
            return callback(datas)
        }
    }
}
const done = after(2, function(data) {
    console.log(data)
})
const readFile = function(path,key, callback) {
    fs.readFile(path, 'utf-8', function(err, data) {
        if(err) {
            throw(err)
        }
        callback(key,data)
    })
}
readFile('../a.json', 'a', done)
readFile('../b.json', 'b', done)


通过总数和当前执行的次数来计数并执行最终的calback

const done = after(2, function(data) {
    console.log(data)
})
let emit = new events.EventEmitter()
emit.on('done', done)
emit.on('done', other)
function readFile(path, key, emit, name) {
    fs.readFile(path, 'utf-8', function(err, data) {
        if(err) {
            throw err
        }
        emit.emit(name, key, data)
    })
}
readFile('../a.json', 'a', emit, 'done')
readFile('../b.json', 'b', emit, 'done')

通过事件监听来实现多对多异步

let EventProxy = require('eventproxy')
let fs = require('fs')
let proxy = new EventProxy()
proxy.all('a', 'b', function(a, b) {
    console.log('a',a,'b',b)
})
function readFile(path, name) {
    fs.readFile(path, 'utf-8', function(err, data) {
        if(err) {
            throw err
        }
        proxy.emit(name, data)
    })
}
readFile('../a.json', 'a')
readFile('../b.json', 'b')
readFile('../a.json', 'a')
readFile('../b.json', 'b')

通过eventproxy来实现一对多异步

promise/deffered模式

用event实现一个request的promise

var Deferred = function() {
    this.promise = new Promise()
    this.status = 'unfullfiled'
}
Deferred.prototype.resolve = function(obj) {
    this.status = 'fullfiled'
    this.promise.emit('success', obj)
}
Deferred.prototype.reject = function(err) {
    this.status = 'failed'
    this.promise.emit('error', err)
}
Deferred.prototype.process = function(data) {
    this.promise.emit('process', data)
}

var promisify = function(res) {
    var deffer = new Deferred()
    let result = ''
    res.setEncoding('utf-8')
    res.on('data', function(chunck) {
        result += chunck
        deffer.process(chunck)
    })
    res.on('end', function() {
        deffer.resolve(result) 
    })
    res.on('error', function(err) {
        deffer.reject(err)
    })
    return deffer.promise
}

let option = {
    host: 'www.baidu.com',
    port: 80,
    method: 'POST',
    path: 'upload'
}
let req = http.request(option, function(res) {
    promisify(res).then(function(data) {
        console.log('success', data)
    }, function(err) {
        console.log('error', err)
    }, function(chunck) {
        console.log('Chunck', chunck)
    })
})
req.write('data')
req.write('data')
req.end()

defferd为内部封装promise为外部调用

有一些库来帮助定制化promise

流程控制库

  1. 尾触发和next(connect)
  2. 流程控制模块async
    1. 无依赖串行

      let async = require('async')
      let fs = require('fs')
      async.series([
          function(callback) {
              fs.readFile('../a.json', 'utf-8', callback)
          },
          function(callback) {
              fs.readFile('../b.json', 'utf-8', callback)
          },
      ], function(err, datas) {
          console.log(datas)
      })


      相当于 

      let fs = require('fs')
      let callback = function(err, results) {
          console.log(results)
      }
      fs.readFile('../a.json', 'utf-8', function(err, content) {
          if(err) {
              return callback(err)
          }
          fs.readFile('../b.json', 'utf-8', function(err, data) {
              if(err) {
                  return callback(err)
              }
              callback(null, [content, data])
          })
      })

       callback传递由async完成

    2. 并行

      let fs = require('fs')
      let async = require('async')
      async.parallel([
          function(callback) {
              fs.readFile('../a.json', 'utf-8', callback)
          },
          function(callback) {
              fs.readFile('../b.json', 'utf-8', callback)
          },
      ], function(err, results) {
          console.log(results)
      })

      相当于

      let fs = require('fs')
      
      let counter = 2
      let results = []
      let failError = null
      let callback = function(err, data) {
          console.log(data)
      }
      
      let done = function(index, data) {
          results[index] = data
          if(-- counter === 0) {
              callback(null, results)
          }
      }
      
      let fail = function(err) {
          if(!failError) {
              failError = err
              callback(failError)
          }
      }
      
      fs.readFile('../a.json', 'utf-8', function(err, data) {
          if(err) {
              fail()
          }
          done(0, data)
      })
      fs.readFile('../b.json', 'utf-8', function(err, data) {
          if(err) {
              fail()
          }
          done(1, data)
      })
    3. 前一个输出为后一个输入async.waterfall
    4. 自动化执行

      let async = require('async')
      let fs = require('fs')
      
      let dep = {
          b: function(callback) {
              fs.readFile('../b.json', 'utf-8', function(err, data) {
                  console.log(data)
                  callback()
              })
          },
          a: ['b', function(callback) {
              fs.readFile('../a.json', 'utf-8', function(err, data) {
                  console.log(data)
              })
          }]
      }
      
      async.auto(dep)

       使a在b执行完后执行

原文地址:https://www.cnblogs.com/ranjianxi/p/8404541.html