RTMP协议视频平台EasyDSS编译过程中Go语言异步信息处理设计与实现

在EasyDSS开发过程中,有此种场景:Go模块通过http请求获取C模块的信息,然后将信息保存到数据库中。基本流程如下:

71.png

该种模式一般称为同步处理,将收到的结果写入到数据库完毕后才会进行下一次的http请求。但是实际情况下,下一次的http请求,和上一次的结果是可以并发进行的。即以上模型可以优化为:

1.第一步:发送第一次 http 请求 A ,获取 A 的结果
2.第二步:并发执行以下两个步骤:
1)将 A 的结果写入到数据库中
2)发出第二次的 http 请求 B

因为第二步是并发执行,就节省了一段时间,属于异步处理,比最开始的同步执行速度更快,以下介绍如何实现该功能。

首先全部创建一个通道,用于处理 http 请求的结果。

var (
   // 处理响应的进程
   gProcessSpaceChan = make(chan vs.Group, 20)
)

然后编写处理请求结果的函数,该函数是阻塞读取 gProcessSpaceChan 通道中的数据,只要有数据则写入到数据库中。

// 处理收到http结果
func progressSpacesResponse() {
   // 收到 group 信息, 写入到数据库中
   for group := range gProcessSpaceChan {
      space := &table.Space{}
      space.CopyData(&group)
 
      dbSpace := &table.Space{}
      dbSpace.ID = group.Id
      err := gSpaceDao.Get(dbSpace)
      // 如果未查找到数据
      if err == gorm.ErrRecordNotFound {
         err = gSpaceDao.Add(space)
      } else if err == nil {
         space.CreateAt = dbSpace.CreateAt
         err = gSpaceDao.Save(space)
      }
 
      if err != nil {
         gErrorLog.Error("写入到数据库中失败", zap.Error(err))
      }
   }
}

还有一个是发送 http 请求的函数,其中 gProcessSpaceChan <- group就是将数据写入到通道中。

// 获取信息
func updateSpaces() {
 
   var i int64 = 1
   for {
      req := vs.CreateDescribeGroupsRequest()
      req.PageNum = requests.NewInteger64(i)
      res, err := gClientVS.DescribeGroups(vs.CreateDescribeGroupsRequest())
 
      if err != nil {
         gErrorLog.Error("更新信息失败", zap.Error(err))
         gLog.Error("获取信息失败")
         return
      }
    // 将结果直接写入到通道中,在通道中异步处理数据
      for _, group := range res.Groups {
         gProcessSpaceChan <- group
      }
 
      if i >= res.PageCount {
         break
      }
      i++
   }
}
 

在初始化函数的时候调用以上函数:

go progressSpacesResponse()
go updateSpaces()
 

至此,设计与实现完毕,整个程序采用异步处理所有数据,处理速度更快。

原文地址:https://www.cnblogs.com/easydss/p/14377640.html