适合高并发情况下使用的分片map

toc

原始的解决方案

go自带的map不是goroutine安全的,为解决这个问题,最简单的方法是在map上挂把锁,每个goroutine操作map前上锁,操作完成解锁。在数据量很低,或并发度很低的情况下,这个把大锁导致的性能问题不足以放在心上,但是当数据量很大,导致map的操作非常耗时,或并发量很大,大量goroutine都争抢那把锁时,性能问题就值得关注了

解决办法

因此,为了提高上述场景的性能,应该减小锁的粒度,降低同一时间访问锁的goroutine的数量。
分片思想刚好就能同时处理好上述两个要求,分片需要:

  1. 将原来存储在一个数据结构的全部数据,通过一定的方法尽量均匀的分配到多个子数据结构片中
  2. 将原来控制数据结构的一个锁,增加到子数据结构个数的锁,每个锁只负责控制自己所在片的数据结构

经过分片处理后有以下好处:

  1. 锁的控制范围就由原来的整个大数据结构,缩小到自己所在的片所在的子数据结构,降低了锁的粒度
  2. 由于数据被分散到多个片中,每个锁只需管理自己的片,因此和原来相比,锁同时被多个执行流访问到的概率也相应降低

通过对数据进行分片来提高并发访问的性能,是一种思想,运用此思想的其他例子还有数据库的分、分库、负载均衡等

代码

结构的定义

type shard struct{
    mapItem map[string]interface{}
    rwLock sync.RWMutex
}

type ShardMap []*shard

每个分片的底层数据结构是map,每个片有自己的rwmutex保障并行安全
ShardMap结构是切片,底层为内存连续的数组,可更好的利用缓存提高性能

创建

const sliceCount = 256        //必须为2的N次方

func NewShardMap() ShardMap{
    shards := make([]*shard, sliceCount, sliceCount)
    for i := 0; i < sliceCount; i++{
        shards[i] = &shard{
            mapItem:make(map[string]interface{}),
        }
    }
    return shards
}

创建切片,并初始化每一个分片

Get、Set、Del

func (shardmap *ShardMap)SetVal(key string, val interface{}) {
    shard := shardmap.getShard(key)
    shard.rwLock.Lock()
    defer shard.rwLock.Unlock()
    shard.mapItem[key] = val
}

func (shardmap *ShardMap)GetVal(key string) (interface{}, bool){
    shard := shardmap.getShard(key)
    shard.rwLock.RLock()
    defer shard.rwLock.RUnlock()
    val, ok := shard.mapItem[key]
    return val, ok
}

func (shardmap *ShardMap)Del(key string){
    shard := shardmap.getShard(key)
    shard.rwLock.RLock()
    defer shard.rwLock.RUnlock()
    delete(shard.mapItem, key)
}

这三个方法的实现都是先根据key选定具体的分片,随后对片加锁后进行操作,随后解锁

计数

func (shardmap *ShardMap)Counts() int{
    sum := 0
    for _, val := range *shardmap{
        val.rwLock.RLock()
        sum += len(val.mapItem)
        val.rwLock.RUnlock()
    }
    return sum
}

一个goroutine直接遍历所有的分片,累加各片中元素个数
注:

  • len(map)本身是O(1), 虽然可以开启多个goroutine分别计数,但那样需要额外的同步操作,增加了代码复杂度与运行时间,不值得
  • 与C++的析构函数不同,defer在函数返回时才执行,前期只是入栈,因此在for循环内需要手动解锁,降低锁粒度

获取全部键值对

type ShardMapKVPair struct{
    Key string
    Value interface{}
}

func (shardmap *ShardMap)KeyValues() <-chan ShardMapKVPair{
    count := shardmap.Counts()
    ch := make(chan ShardMapKVPair, count)
    go func(){
        wg := sync.WaitGroup{}
        wg.Add(sliceCount)
        for _, elem := range *shardmap{
            go func(ele *shard){
                ele.rwLock.RLock()
                for key,val := range ele.mapItem{
                    ch <- ShardMapKVPair{
                        Key: key,
                        Value: val,
                    }
                }
                ele.rwLock.RUnlock()
                wg.Done()
            }(elem)
        }
        wg.Wait()
        close(ch)
    }()
    return ch
}
  • 使用了通道工厂,定义了一个用于外界读取的通道,并立即返回给调用者供其遍历,由内部的goroutine等待遍历各片goroutine结束,随后close通道,以向正在遍历通道的goroutine发出写完成消息,来结束通道的遍历
  • 各分片内map里数据众多,所以需要多个goroutine分别获取键值对以提高效率
    注:
  • 由于调度的关系,在循环内启动goroutine时,需要通过函数传参的方式将遍历参数绑定到goroutine中(C/C++种循环内创建线程时也需要注意此问题),否则可能预期不同

清空

func (shardmap *ShardMap)Clear(){
    cpus := runtime.NumCPU()
    var taskLenPerCPU = 0
    if sliceCount % cpus == 0{
        taskLenPerCPU = sliceCount / cpus        //均分
    }else{
        taskLenPerCPU = sliceCount / cpus + 1    //除最后的g外均分,最后的g少分或没有
    }
    wg := sync.WaitGroup{}
    wg.Add(cpus)
    for i := 0; i < cpus; i++{
        var tasks []*shard
        if i == cpus - 1{
            tasks = (*shardmap)[(i) * taskLenPerCPU :]
        }else{
            tasks = (*shardmap)[i * taskLenPerCPU : (i + 1) * taskLenPerCPU]
        }
        go func(mapItems []*shard){    //对切片分段处理
            for _, val := range mapItems{
                val.rwLock.Lock()
                val.mapItem = make(map[string]interface{})    //清空map
                val.rwLock.Unlock()                
            }
            wg.Done()
        }(tasks)
    }
    wg.Wait()
}

清空函数中我采取了和获取键值对时不一样的方法,这种方法我一般在C/C++中用得比较多
通过创建Cpu核心数量的任务goroutine可以获取最好的性能,每个goroutine处理均分或尽量均分后的一段数据

分片选取

func (shardmap *ShardMap)getShard(key string)*shard{
   hashVal := bkdrHash(key)
   index := hashVal & (sliceCount - 1)    //sliceCount为2的N次方成立
   return (*shardmap)[index]
}

func bkdrHash(key string) uint32{
   seed := uint32(131) // the magic number, 31, 131, 1313, 13131, etc.. orz..
   hash := uint32(0)
   for i := 0; i < len(key); i++ {
      hash = hash * seed + uint32(key[i])
   }
   return hash & 0x7FFFFFFF
}

对key计算hash,并根据子片数取余得到对应子片的下标
注:

  • 计算hash时使用的是bkdr,此hash有较好的性能,也简单易实现,参考各种字符串Hash函数比较
  • index := hashVal & (sliceCount - 1)使用位操作提高性能,当sliceCount为2的N次方时,等价于取余计算

性能测试

测试环境



9年前的老本了,性能测试只供参考,主要是比较与sync.map的性能

并行情况下getset

先来shardmap

const testCount = 1000000

func GenTestCase(count int) ([]string, []interface{}){
   arr1 := make([]string, count)
   arr2 := make([]interface{}, count)
   for i := 0; i < count; i++{
      arr1[i] = strconv.Itoa(i)
      arr2[i] = i
   }
   return arr1, arr2
}

func BenchmarkShardMap_GetSet(b *testing.B) {
   arr1, arr2 := GenTestCase(testCount)
   shardMap := NewShardMap()
   for i := 0; i < testCount; i++{
      shardMap.SetVal(arr1[i], arr2[i])
   }
   b.N = testCount * 10
   b.ResetTimer()

   for i := 0; i < 10; i++{
      rand.Seed(time.Now().UnixNano())
      b.Run(fmt.Sprintf("times %d:", i), func(b *testing.B){
         wg := sync.WaitGroup{}
         wg.Add(2 *b.N)
         for j := 0; j < b.N; j++{
            go func(){
               shardMap.SetVal(arr1[rand.Intn(testCount)], arr2[rand.Intn(testCount)])
               wg.Done()
            }()

            go func(){
               shardMap.GetVal(arr1[rand.Intn(testCount)])
               wg.Done()
            }()
         }
         wg.Wait()
      })
   }
}


再来sync.map

func BenchmarkSyncMap_GetSet(b *testing.B) {
   arr1, arr2 := GenTestCase(testCount)
   syncMap := sync.Map{}
   for i := 0; i < testCount; i++{
      syncMap.Store(arr1[i], arr2[i])
   }
   b.N = testCount * 10
   b.ResetTimer()

   for i := 0; i < 10; i++{
      rand.Seed(time.Now().UnixNano())
      b.Run(fmt.Sprintf("times %d:", i), func(b *testing.B){
         wg := sync.WaitGroup{}
         wg.Add(2 *b.N)
         for j := 0; j < b.N; j++{
            go func(){
               syncMap.Store(arr1[rand.Intn(testCount)], arr2[rand.Intn(testCount)])
               wg.Done()
            }()

            go func(){
               syncMap.Load(arr1[rand.Intn(testCount)])
               wg.Done()
            }()
         }
         wg.Wait()
      })
   }
}


好像shardmap性能比sync.map稍好

非并行获取键值对

func BenchmarkShardMap_KeyValues(b *testing.B) {
   arr1, arr2 := GenTestCase(testCount)
   shardMap := NewShardMap()
   for i := 0; i < testCount; i++{
      shardMap.SetVal(arr1[i], arr2[i])
   }
   b.ResetTimer()

   for i := 0; i < 10; i++{
      b.Run(fmt.Sprintf("times %d:", i), func(b *testing.B){
         keyVals := make([]ShardMapKVPair, 0)
         for val := range shardMap.KeyValues(){
            keyVals = append(keyVals, val)
         }
      })
   }
}

type SyncMapKVPair struct {
   key interface{}
   val interface{}
}

func BenchmarkSyncMap_KeyValues(b *testing.B) {
   arr1, arr2 := GenTestCase(testCount)
   syncMap := sync.Map{}
   for i := 0; i < testCount; i++{
      syncMap.Store(arr1[i], arr2[i])
   }
   b.ResetTimer()

   for i := 0; i < 10; i++{
      b.Run(fmt.Sprintf("times %d:", i), func(b *testing.B){
         keyVals := make([]SyncMapKVPair, 0)
         syncMap.Range(func(key, value interface{}) bool {
            keyVals = append(keyVals, SyncMapKVPair{
               key,
               value,
            })
            return true
         })

      })
   }
}

单独测试清空

func BenchmarkShardMap_CLear(b *testing.B){
   arr1, arr2 := GenTestCase(testCount)
   shardMap := NewShardMap()
   for i := 0; i < testCount; i++{
      shardMap.SetVal(arr1[i], arr2[i])
   }

   b.ResetTimer()
   for i := 0; i < 10; i++{
      b.Run(fmt.Sprintf("times %d:", i), func(b *testing.B){
         shardMap.Clear()
         if shardMap.Counts() != 0{
            b.Fatalf("Clear Fail")
         }
      })
   }
}

不足

根据鸽巢原理,哈希算法必然存在冲突,可能导致随着时间的增长,每个分片间的数据个数不均匀,应该要有相应的扩容操作并将数据rehash来保证分片数据的均衡,为保证吞吐率,可以效仿redis的dict结构,实行渐进式rehash

参考

Shard Your Hash Table To Reduce Write Locks
各种字符串Hash函数比较





附件列表

    原创不易,转载请注明出处,谢谢
    原文地址:https://www.cnblogs.com/Keeping-Fit/p/14458546.html