聊聊 golang 的 map

1、哈希表

哈希表是一个很常见的数据结构,用来存储无序的 key/value 对,给定的 key 可以在 O(1) 时间复杂度内查找、更新或删除对应的 value

设计一个好的哈希表,需要着重关注两个关键点:哈希函数、冲突处理。

在这里插入图片描述

1.1 哈希函数

理想情况下,哈希函数既要有优异的性能,又要让结果在映射范围内均匀的分布,使得增删改查的时间复杂度为 𝑂(1)

不过实际情况是,key 的数量往往会大于映射的范围,这势必产生哈希冲突,以及更差的读写性能。最坏的结果可能会是所有操作的时间复杂度可能会达到 𝑂(𝑛),比如结果都映射到一个 bucket(桶)上,最后变成线性查找了。

因此好的哈希函数对于哈希表来说至关重要。

1.2 冲突处理

当多个不同的 key 通过哈希函数计算得到同一个结果,这就是哈希冲突(或叫哈希碰撞)。解决的哈希冲突的办法,常见的有开放地址法和拉链法。

1.2.1 开放地址法

开放地址法是一种线性探测的方法,当发生哈希冲突时,它会按照某种规律(如线性探测、二次探测或双散列等)在哈希表中寻找下一个可用的空位来存储冲突的元素。

具体来说,当计算出的哈希值对应的槽位已被占用时,可以按照探测序列依次检查相邻的槽位,直到找到一个空槽位为止。

example image

如上图,底层数组有四个元素,key3 经过计算

index := hash("key3") % len(len)

得到 key1 一样的索引,这时发现索引所在的槽位不为空,那么就向右侧依次探测不为空的槽位,直到找到空槽位那么就把 key/value 写进去。

当需要查找某个 key 对应的 value 时,会从索引的位置开始线性探测数组

  • 步骤一:若对比索引所在位置的 key 相等,停止查找,取出 value
  • 步骤二:若对比索引所在位置的 key 不相等,则依次向右探测
    • 重复步骤一,找到则取出 value
    • 若最终没有找到则返回空

删除 key 和查找的方法同理。

最后,来看看开放地址发的优缺点:

  • 优点:
    • 不需要额外的存储空间来存储链表节点。
  • 缺点:
    • 容易产生聚集现象,即连续的槽位被占用,导致查找效率降低。
    • 当哈希表越来越满时,查找、插入和删除操作的效率会下降。

1.2.2 拉链法

拉链法是在每个哈希表的槽位上链接一个链表。当发生哈希冲突时,将具有相同哈希值的元素添加到对应槽位的链表中。查找、插入和删除操作都在相应的链表中进行。

拉链法一般会使用数组加上链表,是哈希表最常见的实现方法,像 GolangJavaPHP等编程言都用拉链法实现哈希表,另外 Redis 中字典也是使用拉链法实现哈希表的,详见拙作 Redis 之字典。

同样的,也来看看拉链法的优缺点:

  • 优点:
    • 解决了聚集问题,提高了查找、插入和删除操作的效率。
    • 动态地分配链表空间,适应于元素数量不确定的场景。
  • 缺点:
    • 需要额外的存储空间来存储链表节点。
    • 如果链表过长,查找、插入和删除操作的效率仍然会受到影响。

1.2.3 装载因子

装载因子 = 元素数量 ÷ 桶数量

哈希表中的装载因子是衡量哈希表使用程度的一个重要参数,它表示哈希表中已存储的元素数量与哈希表总槽位数量之间的比例。装载因子越大,哈希的读写性能就越差。

拿开放地址法来比方,底层数组长度为 8,目前已写入 6 个,那么装载因子为 6/8 也就是 0.75,快接近 1(为1时说明已经满了),这时再写入会导致碰撞的次数变多,哈希表的性能变得越差。

对比拉链法来说,也是一样。因为哈希表读写操作主要耗时在计算哈希定位桶遍历链表这三个过程。

对于装载因子越大,哈希的读写性能就越差,解决的办法是动态的增大哈希表的长度,当装载因子超过某个阈值时增加哈希表的长度,自动扩容。大致步骤是:

  • 创建新的哈希表,容量为原来的两倍
  • 遍历原有哈希表的元素,并重新计算索引,写入新的哈希表中
  • 遍历过程中,对于写操作只在新的哈希表中进行,查询和删除在新旧两个哈希表中分别进行
  • 遍历完成后,释放原来的哈希表

2. Golang 的 map

注意: Golang 版本为 1.19.12

2.1 数据结构

先来看看 map 相关的常量

const (
	// 一个桶里面最多可以装的键值对的个数,8对。
	bucketCntBits = 3
	bucketCnt     = 1 << bucketCntBits // 1 << 3 ,2的三次方,也就是 8

	// 触发扩容操作的最大装载因子的临界值
	loadFactor = 6.5

	// 为了保持内联,键 和 值 的最大长度都是128字节,如果超过了128个字节,就存储它的指针
	maxKeySize   = 128
	maxValueSize = 128

	// 数据偏移应该是 bmap 的整数倍,但是需要正确的对齐。
	dataOffset = unsafe.Offsetof(struct {
		b bmap
		v int64
	}{}.v)

	// tophash 的一些值
	empty          = 0 // 没有键值对
	evacuatedEmpty = 1 // 没有键值对,并且桶内的键值被迁移走了。
	evacuatedX     = 2 // 键值对有效,并且已经迁移了一个表的前半段
	evacuatedY     = 3 // 键值对有效,并且已经迁移了一个表的后半段
	minTopHash     = 4 // 最小的 tophash

	// 标记
	iterator     = 1 // 当前桶的迭代子
	oldIterator  = 2 // 旧桶的迭代子
	hashWriting  = 4 // 一个goroutine正在写入map
	sameSizeGrow = 8 // 当前字典增长到新字典并且保持相同的大小

	// 迭代子检查桶ID的哨兵
	noCheck = 1<<(8*sys.PtrSize) - 1
)

通过上面字段可以得出,Golangmap 的每个桶中可以存 8key/value 对。最大装载因子为 6.5,这个是官方统计出来的最优值,源码中(src/runtime/map.go)有介绍这个值是如何得来的,这里就不做过多介绍。若初始化有个 4 个桶,那么在存储 4*6.5 也就是在 26 个键值对后,就需要进行扩容了,不然查询效率就会降低,当然了,map 里元素越多,感觉越明显。

type hmap struct {
	count     int // map 的长度
	flags     uint8 // 标识,比如正在写数据
	B         uint8  // log以2为底,桶个数的对数 (总共能存 6.5 * 2^B 个元素)
	noverflow uint16 // 近似溢出桶的个数,当B小于16时是准确值,大于等于16时是大概的值。
	hash0     uint32 // 哈希种子

	buckets    unsafe.Pointer // 有 2^B 个桶的数组. count==0 的时候,这个数组为 nil.
	oldbuckets unsafe.Pointer // 旧的桶数组一半的元素
	nevacuate  uintptr        // 扩容增长过程中的计数器

	extra *mapextra // 可选字段
}

type mapextra struct {
    // 如果 key 和 value 都不包含指针,并且可以被 inline(<=128 字节)
    // 使用 extra 来存储 overflow bucket,这样可以避免 GC 扫描整个 map
    // 然而 bmap.overflow 也是个指针。这时候我们只能把这些 overflow 的指针
    // 都放在 hmap.extra.overflow 和 hmap.extra.oldoverflow 中了
    // overflow 包含的是 hmap.buckets 的 overflow 的 bucket
    // oldoverflow 包含扩容时的 hmap.oldbuckets 的 overflow 的 bucket
    overflow    *[]*bmap
    oldoverflow *[]*bmap

    // 指向空闲的 overflow bucket 的指针
    nextOverflow *bmap
}

整个 hmap 结构占 48 个字节内存

fmt.Println(unsafe.Sizeof(hmap{})) // 48

其中

  • count ,代表 map 当前元素的个数,通过 len(m) 获取
  • B, 是 log2 为底,桶个数的对数,比如 B 为 3,那么此时 mapbucket 的个数为 2^3 那就是 8
  • hash0,哈希种子,可以增加哈希分布的随机性,从而有效地防止哈希冲突攻击
  • buckets 指向具体 bmap 数组数组的指针
  • mapextra 主要存储溢出桶的,通过 nextOverflow 进行相连

再来看看具体存放数据桶的数据结构

// bucket 本体
type bmap struct {
    topbits  [8]uint8    // 用于存储每个键的哈希值的高位,用于快速比较和查找
    keys     [8]keytype  // 存储键的数组
    values   [8]valuetype// 存储值的数组
    pad      uintptr     // 用于内存对齐的填充字段
    overflow uintptr     // 指向溢出桶的指针,如果有的话
}

前面介绍过,桶的元素个数为 8,所以 bmap 中 前三个字段数组长度都是 8

  • topbits 主要定位 key 具体在哪个桶
  • overflow 指向溢出桶的指针,用于存储溢出元素,形成一个溢出链表。当一个桶中的元素超过 8 个时,多余的元素会存储在溢出桶中。
  • keys/values 这里放一起介绍,和其他哈希表实现不同的是,Golang 中分别把 keyvalue 单独聚合存储,主要是为了节省内存
    • 比如 map[int64]int8key8 个字节,而 value 仅占 1 个字节,内存对齐的要则要 16 个字节,但若是分开存储,8 个 value 才占 8 个字节,这样就可以起到节省内存的作用
    • pad 主要作用为了内存对齐进行填充字段用的,比如上面哈希表中只有两个 value,那么 pad 就可以填充 6 个字节

2.2 初始化

// make(map[k]v, hint)
// 如果编译器认为 map 和第一个 bucket 可以直接创建在栈上,h 和 bucket 可能都是非空
// h != nil,可以直接在 h 内创建 map
// 如果 h.buckets != nil,其指向的 bucket 可以作为第一个 bucket 来使用
func makemap(t *maptype, hint int, h *hmap) *hmap {

	// 计算 hint 个元素所需的内存大小,并检查是否会溢出或超过允许的最大分配内存 maxAlloc
	mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
	// 如果发生溢出或内存大小超过限制,则将 hint 设置为 0
	if overflow || mem > maxAlloc {
		hint = 0
	}

    // 初始化 hmap
    if h == nil {
        h = new(hmap)
    }
    h.hash0 = fastrand()

    // 按照提供的元素个数,计算初始桶的数量
    B := uint8(0)
    for overLoadFactor(hint, B) {
        B++
    }
    h.B = B

    // 分配初始的 hash table
    // 如果 B == 0,buckets 字段会由 mapassign 来 lazily 分配
    // 因为如果 hint 很大的话,对这部分内存归零会花比较长时间
    if h.B != 0 {
		var nextOverflow *bmap
		// 初始化 bucket 和 nextOverflow 
		h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
		if nextOverflow != nil {
			h.extra = new(mapextra)
			h.extra.nextOverflow = nextOverflow
		}
	}

    return h
}

这里先看下 B 的计算逻辑,这里为了更好认清计算逻辑,把相关代码抽取出来,可以直接运行的。

package _0240626

import (
	"fmt"
	"testing"
)

const (
	// Maximum number of key/elem pairs a bucket can hold.
	bucketCntBits = 3
	bucketCnt     = 1 << bucketCntBits

	// Maximum average load of a bucket that triggers growth is 6.5.
	// Represent as loadFactorNum/loadFactorDen, to allow integer math.
	loadFactorNum = 13
	loadFactorDen = 2

	PtrSize = 4 << (^uintptr(0) >> 63)
)

func TestB(t *testing.T) {
	B := uint8(0)
	hint := 8
	for overLoadFactor(hint, B) {
		B++
	}
	fmt.Println(B, 1<<B)
}

// overLoadFactor reports whether count items placed in 1<<B buckets is over loadFactor.
func overLoadFactor(count int, B uint8) bool {
	return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}

// bucketShift returns 1<<b, optimized for code generation.
func bucketShift(b uint8) uintptr {
	// Masking the shift amount allows overflow checks to be elided.
	return uintptr(1) << (b & (PtrSize*8 - 1))
}

还是得强调一点 B 是以 2 为底桶个数的对数,比如 B0,说只需一个桶。另外还需记住每个桶中存放的 key/value 对的个数是 8

好,接下来主要看看 overLoadFactor 判断逻辑

  • count > bucketCnt ,其中 bucketCnt 为上面提到的一个桶存 8 个元素
  • uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen),这里主要为了计算提供的桶能否存的下给的元素数量
    • 装载因子为 6.5,比如 B0,那么桶的个数就是 1<<0 就是 1 个桶,可以最多存 2*6.5,也就是 13 个元素
    • 少于 13 元素,就不用累加 B,否则就需要累加 B,并重复计算,直到提供的桶可以存的下 count 个元素

OK,确定了桶的数量,接下来就可以初始化桶了

// makeBucketArray 为 map buckets 初始化底层数组
// 1<<b 是需要分配的最小数量 buckets
// dirtyalloc 要么是 nil,要么就是之前由 makeBucketArray 使用同样的 t 和 b 参数
// 分配的数组
// 如果 dirtyalloc 是 nil,那么就会分配一个新数组,否则会清空掉原来的 dirtyalloc
// 然后重用这部分内存作为新的底层数组
func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
    // base = 1 << b 也就是桶的数量
    base := bucketShift(b)
    nbuckets := base
    // 对于比较小的 b 来说,不太可能有 overflow buckets
    // 这里省掉一些计算消耗
    if b >= 4 {
        // Add on the estimated number of overflow buckets
        // required to insert the median number of elements
        // used with this value of b.
        nbuckets += bucketShift(b - 4)
        sz := t.bucket.size * nbuckets
        up := roundupsize(sz)
        if up != sz {
            nbuckets = up / t.bucket.size
        }
    }

    if dirtyalloc == nil {
        buckets = newarray(t.bucket, int(nbuckets))
    } else {
        // dirtyalloc 之前就是用上面的 newarray(t.bucket, int(nbuckets))
        // 生成的,所以是非空
        buckets = dirtyalloc
        size := t.bucket.size * nbuckets
        if t.bucket.kind&kindNoPointers == 0 {
            memclrHasPointers(buckets, size)
        } else {
            memclrNoHeapPointers(buckets, size)
        }
    }

    if base != nbuckets {
        // 我们预分配了一些 overflow buckets
        // 为了让追踪这些 overflow buckets 的成本最低
        // 我们这里约定,如果预分配的 overflow bucket 的 overflow 指针是 nil
        // 那么通过增加指针可以找到更多可用的溢出桶.
        // 我们需要一个安全的非空指针来作为 last overflow bucket,直接用 buckets 就行了
        nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
        last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
        last.setoverflow(t, (*bmap)(buckets))
    }
    return buckets, nextOverflow
}

这里需着重看下 b 的值,以 4 为界线

  • 小于 4,不创建溢出桶 ,比如 3,那么就初始化 1<<3 也就是 8 个正常桶(hmap 结构中 buckets 指向的 bmap 数组)
  • 大于或等于 4,创建溢出捅(此时正常桶的个数为 1<<416 个)
    • 溢出桶数量规则,比如 b4,在不考虑内存对齐的情况下,1<<(4-4) 也就是 1
    • 溢出桶和正常桶在内存上是连续的
    • 最后一个溢出桶的溢出指针设置为桶数组的起始地址,可以明确地表示这是链表的末端,从而避免无限循环

仔细阅读 makemap 函数的源码实现,发现在有溢出捅的情况下,在初始化 mapextra 后,仅仅用 nextOverflow 指向第一个空闲的溢出桶,但是 overflow 字段却没有提及。

if h.B != 0 {
	var nextOverflow *bmap
	// 初始化 bucket 和 nextOverflow 
	h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
	if nextOverflow != nil {
		h.extra = new(mapextra)
		h.extra.nextOverflow = nextOverflow
	}
}

这里就稍微向后跳下进度,因为在 mapextraoverflow 是在写入 key/value 时生成的。

func (h *hmap) newoverflow(t *maptype, b *bmap) *bmap {
	var ovf *bmap
	if h.extra != nil && h.extra.nextOverflow != nil {
		// We have preallocated overflow buckets available.
		// See makeBucketArray for more details.
		ovf = h.extra.nextOverflow
		if ovf.overflow(t) == nil {
			// We're not at the end of the preallocated overflow buckets. Bump the pointer.
			h.extra.nextOverflow = (*bmap)(add(unsafe.Pointer(ovf), uintptr(t.bucketsize)))
		} else {
			// This is the last preallocated overflow bucket.
			// Reset the overflow pointer on this bucket,
			// which was set to a non-nil sentinel value.
			ovf.setoverflow(t, nil)
			h.extra.nextOverflow = nil
		}
	} else {
		ovf = (*bmap)(newobject(t.bucket))
	}
	h.incrnoverflow()
	if t.bucket.ptrdata == 0 {
		h.createOverflow()
		*h.extra.overflow = append(*h.extra.overflow, ovf)
	}
	b.setoverflow(t, ovf)
	return ovf
}

也就是在 makemap 初始化的时候,若有溢出捅,正常桶和溢出桶是一块分配的,同时nextOverflow 指向第一个溢出捅,同时把最后一个溢出桶的 overflow 指针指向第一个正常桶。接着在写入的时候,会在正常桶都满的情况下,会去往溢出桶里写:

  • 若有空闲的溢出捅(h.extra.nextOverflow)
    • 那么获取此空闲的溢出桶 ovf = h.extra.nextOverflow
    • 同时还需判断此溢出捅的 overflow 是否为 nil
      • nil, 说明还存在空闲的溢出捅,那么 h.extra.nextOverflow 指向此空闲的溢出捅
      • 不为 nil,说明是最后一个溢出桶(上面提到过在分配溢出捅的时候会把最后一个溢出桶的 oveflow 指向第一个正常桶),同时把当前溢出桶的 overflowh.extra.nextOverflow 都置为 nil
  • 若没有空闲的溢出捅
    • 那么就生成一个新的桶 ovf = (*bmap)(newobject(t.bucket))
  • 更新溢出桶的数量(当 B 小于 16 时是准确值,大于等于 16 时是大概的值)
  • map 中的 key/value 类型都不是指针(map[int64]int8),那么就会把溢出捅的指针放入 extraoverflow 切片数组中,这么做主要是方便 GC,提升性能
    • 当对一个长度 100万key/value 对的 map 中进行 GC 时,若按照传统方法,会依次扫描 bmap 中的 overflow,这会相当耗时
    • 而一旦对这些 overflow 进行管理,那么当 GC 扫描时,只需把 extra.overflow 置黑即可,这可是相当大的性能提升(有兴趣的读者可以写个 map[int64]int8map[string]int8 测试用例,先往里写入 100万key/value 对,手动开启 GC,然后统计下耗时)
  • 最后把待写入的桶的 overflow 指针指向 ovf 这个溢出捅

ok,接下来看看 hmap 在不同类型下的内存布局吧

2.2.1 没有分配预分配溢出桶

B 小于 4,是不会预分配溢出捅

在这里插入图片描述

2.2.2 有分配预分配溢出桶

B 大于或等于 4,则会预分配溢出捅

在这里插入图片描述

2.3 写入

// t 是 map 的类型信息,h 是 map 的实际数据结构,key 是要插入或更新的键。
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
	if h == nil {
		// 如果 map 是 nil,则抛出错误。
		panic(plainError("assignment to entry in nil map"))
	}
	if raceenabled {
		// 如果启用了竞态检测,获取调用者的程序计数器 (PC)。
		callerpc := getcallerpc()
		pc := abi.FuncPCABIInternal(mapassign)
		// 检测写操作是否会引起竞态。
		racewritepc(unsafe.Pointer(h), callerpc, pc)
		// 检测对 key 的读操作。
		raceReadObjectPC(t.key, key, callerpc, pc)
	}
	if msanenabled {
		// MemorySanitizer 检测对 key 的读取。
		msanread(key, t.key.size)
	}
	if asanenabled {
		// AddressSanitizer 检测对 key 的读取。
		asanread(key, t.key.size)
	}
	if h.flags&hashWriting != 0 {
		// 如果 hashWriting 标志已经被设置,说明有并发写操作,抛出错误。
		fatal("concurrent map writes")
	}
	// 计算键的哈希值。
	hash := t.hasher(key, uintptr(h.hash0))

	// 在调用 t.hasher 之后设置 hashWriting 标志,
	// 因为 t.hasher 可能会 panic,如果发生 panic,我们并没有进行写操作。
	h.flags ^= hashWriting

	if h.buckets == nil {
		// 如果 buckets 是 nil,则初始化一个新的 bucket。
		h.buckets = newobject(t.bucket) // newarray(t.bucket, 1)
	}

again:
	// 计算哈希值对应的 bucket 索引。
	bucket := hash & bucketMask(h.B)
	if h.growing() {
		// 如果 map 正在扩容,则执行扩容任务。
		growWork(t, h, bucket)
	}
	b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
	top := tophash(hash)

	var inserti *uint8      // 指向要插入位置的 tophash
	var insertk unsafe.Pointer // 指向要插入的 key 位置
	var elem unsafe.Pointer    // 指向要插入的元素位置

bucketloop:
	for {
		for i := uintptr(0); i < bucketCnt; i++ {
			// 检查每个槽位的 tophash。
			if b.tophash[i] != top {
				// 如果槽位为空,记录第一个空槽的位置。
				if isEmpty(b.tophash[i]) && inserti == nil {
					inserti = &b.tophash[i]
					insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
					elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
				}
				// 如果遇到 emptyRest,说明后续的槽位都是空的,退出循环。
				if b.tophash[i] == emptyRest {
					break bucketloop
				}
				continue
			}
			// 计算当前槽位的 key 的地址。
			k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
			if t.indirectkey() {
				// 如果是间接存储,获取实际的 key 地址。
				k = *((*unsafe.Pointer)(k))
			}
			// 比较键是否相等。
			if !t.key.equal(key, k) {
				continue
			}
			// 找到已存在的键,更新值。
			if t.needkeyupdate() {
				// 如果需要更新键,则复制新的键到当前位置。
				typedmemmove(t.key, k, key)
			}
			// 获取元素的地址。
			elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
			goto done
		}
		// 检查溢出桶。
		ovf := b.overflow(t)
		if ovf == nil {
			break
		}
		b = ovf
	}

	// 没有找到键,分配新的插槽并添加条目。

	// 如果负载因子过高或溢出桶过多,并且不在扩容中,开始扩容。
	if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
		hashGrow(t, h)
		goto again // 扩容会使所有内容失效,因此需要重试。
	}

	if inserti == nil {
		// 当前桶及所有溢出桶已满,分配一个新的溢出桶。
		newb := h.newoverflow(t, b)
		inserti = &newb.tophash[0]
		insertk = add(unsafe.Pointer(newb), dataOffset)
		elem = add(insertk, bucketCnt*uintptr(t.keysize))
	}

	// 在插入位置存储新键和值。
	if t.indirectkey() {
		// 如果键是间接存储,分配新内存并存储指针。
		kmem := newobject(t.key)
		*(*unsafe.Pointer)(insertk) = kmem
		insertk = kmem
	}
	if t.indirectelem() {
		// 如果值是间接存储,分配新内存并存储指针。
		vmem := newobject(t.elem)
		*(*unsafe.Pointer)(elem) = vmem
	}
	// 复制键到插入位置。
	typedmemmove(t.key, insertk, key)
	// 设置插入位置的 tophash。
	*inserti = top
	// 更新元素计数。
	h.count++

done:
	if h.flags&hashWriting == 0 {
		// 确保 hashWriting 标志已设置。
		fatal("concurrent map writes")
	}
	// 清除 hashWriting 标志。
	h.flags &^= hashWriting
	if t.indirectelem() {
		// 如果值是间接存储,返回实际的值指针。
		elem = *((*unsafe.Pointer)(elem))
	}
	return elem
}

阅读完 map 写入的源码后,终于知道 concurrent map writes 这个报错的出处了。因为 map 是非线程安全的,也就是一个 map 是不能同时有写入、读取的。

if h.flags&hashWriting != 0 {
	// 如果 hashWriting 标志已经被设置,说明有并发写操作,抛出错误。
	fatal("concurrent map writes")
}

同时,也补充了在 makemap 初始化中,B0 也就是 1<<01 个桶时的生成之处了。

if h.buckets == nil {
	h.buckets = newobject(t.bucket) // newarray(t.bucket, 1)
}

接下来着重介绍下如何定位桶以及对应得 key 的写入位置。

hash := t.hasher(key, uintptr(h.hash0))
bucket := hash & bucketMask(h.B)
if h.growing() {
	growWork(t, h, bucket)
}
b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
top := tophash(hash)
  • 先计算 key 对应的哈希值,这里会把初始化的 hash0 加进去,使其更加分布均匀
  • 定位具体的桶,也就是比较低 1<<B - 1
    • 比如 B3,也就是 1<<38 个桶
    • 那么 hash & (1<<3 -1 ) 也就是获取 hash 低七位是哪个值,就能定位到哪个桶了
  • 定位具体的 key
    • 先求 hash 的高八位的值
    • 再遍历对应 tophash 数组的第几个位置,进而知道 key 位于哪个槽了(若槽中有值,直接更新,否则就写入)
      • 若是 8tophash 都没有,说明满了,那么就遍历 overflow
      • 如果当前桶及所有溢出桶已满,那么就分配一个新的溢出桶

在这里插入图片描述

在写入的时候,还得考虑到是否扩容

// 如果负载因子过高或溢出桶过多,并且不在扩容中,开始扩容。
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
	hashGrow(t, h)
	goto again // 扩容会使所有内容失效,因此需要重试。
}
  1. 溢出桶的数量超过桶数量
  2. 装载因子超过 6.5

具体的扩容过程,留在本篇博文的最后来讲。

2.4 查询

// mapaccess2 用于在 map 中查找键,返回值和是否找到的布尔值。
func mapaccess2(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, bool) {
	// 如果启用了竞态检测且 map 非空,获取调用者 PC 和当前函数 PC。
	if raceenabled && h != nil {
		callerpc := getcallerpc()
		pc := abi.FuncPCABIInternal(mapaccess2)
		// 检测 map 的读操作是否会引起竞态。
		racereadpc(unsafe.Pointer(h), callerpc, pc)
		raceReadObjectPC(t.key, key, callerpc, pc)
	}
	// 如果启用了 MemorySanitizer 且 map 非空,读取键的内存。
	if msanenabled && h != nil {
		msanread(key, t.key.size)
	}
	// 如果启用了 AddressSanitizer 且 map 非空,读取键的内存。
	if asanenabled && h != nil {
		asanread(key, t.key.size)
	}
	// 如果 map 是 nil 或者元素个数为 0。
	if h == nil || h.count == 0 {
		// 如果哈希函数可能 panic,则计算哈希以检查。
		if t.hashMightPanic() {
			t.hasher(key, 0) // 见 issue 23734
		}
		// 返回零值指针和 false,表示未找到。
		return unsafe.Pointer(&zeroVal[0]), false
	}
	// 检查是否有并发读写。
	if h.flags&hashWriting != 0 {
		fatal("concurrent map read and map write")
	}
	// 计算键的哈希值。
	hash := t.hasher(key, uintptr(h.hash0))
	m := bucketMask(h.B)
	// 找到对应的 bucket。
	b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
	if c := h.oldbuckets; c != nil {
		// 如果存在旧的 buckets,检查是否需要缩小掩码。
		if !h.sameSizeGrow() {
			m >>= 1
		}
		oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
		// 如果旧的 bucket 未被迁移,则使用旧的。
		if !evacuated(oldb) {
			b = oldb
		}
	}
	top := tophash(hash)
bucketloop:
	// 遍历 bucket 和溢出桶。
	for ; b != nil; b = b.overflow(t) {
		for i := uintptr(0); i < bucketCnt; i++ {
			// 检查 tophash 是否匹配。
			if b.tophash[i] != top {
				// 如果遇到 emptyRest,说明后续无有效数据,退出循环。
				if b.tophash[i] == emptyRest {
					break bucketloop
				}
				continue
			}
			// 获取当前键的地址。
			k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
			if t.indirectkey() {
				k = *((*unsafe.Pointer)(k))
			}
			// 比较键是否相等。
			if t.key.equal(key, k) {
				// 获取元素的地址。
				e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
				if t.indirectelem() {
					e = *((*unsafe.Pointer)(e))
				}
				// 返回元素指针和 true。
				return e, true
			}
		}
	}
	// 未找到键,返回零值指针和 false。
	return unsafe.Pointer(&zeroVal[0]), false
}

发现和写入的逻辑差不多

  • 计算 key 的哈希值 hash
  • 依据 hashB 位定位所在桶
  • 依据 hash8 位定位在桶中的存储位置
  • 若当前桶未找到则查找对应的溢出捅
  • 若对应位置有数据,则对比此位置上 key,以确定是否是要查找的数据
  • map 处于扩容阶段,那么优先从 oldbuckets 查找
  • 若未找到键,则返回零值指针和 false

等等,查询的时候还有个知识点没介绍到,那就是遍历 map 无序性,也就是说遍历 map 时顺序打印出来的 key/value 是和写入的顺序不一致的,而且每次打印的都不一样,顺序是随机的

func TestRandMap(t *testing.T) {
	m := map[int]int{
		0: 0,
		1: 1,
		2: 2,
		3: 3,
		4: 4,
		5: 5,
		6: 6,
	}
	for i := 0; i < len(m); i++ {
		for k, v := range m {
			fmt.Printf("key=%d,value=%d ", k, v)
		}
		fmt.Println()
	}
}

输出如下

key=5,value=5 key=6,value=6 key=0,value=0 key=1,value=1 key=2,value=2 key=3,value=3 key=4,value=4 
key=3,value=3 key=4,value=4 key=5,value=5 key=6,value=6 key=0,value=0 key=1,value=1 key=2,value=2 
key=0,value=0 key=1,value=1 key=2,value=2 key=3,value=3 key=4,value=4 key=5,value=5 key=6,value=6 
key=0,value=0 key=1,value=1 key=2,value=2 key=3,value=3 key=4,value=4 key=5,value=5 key=6,value=6 
key=0,value=0 key=1,value=1 key=2,value=2 key=3,value=3 key=4,value=4 key=5,value=5 key=6,value=6 
key=6,value=6 key=0,value=0 key=1,value=1 key=2,value=2 key=3,value=3 key=4,value=4 key=5,value=5 

Golang 是如何实现的呢?

type hiter struct {
    key         unsafe.Pointer // 当前键的指针,必须在第一个位置。写入 nil 表示迭代结束。
    elem        unsafe.Pointer // 当前值的指针,必须在第二个位置。
    t           *maptype       // `map` 类型的描述信息。
    h           *hmap          // `map` 的内部结构指针。
    buckets     unsafe.Pointer // 迭代器初始化时的 bucket 指针。
    bptr        *bmap          // 当前 bucket 的指针。
    overflow    *[]*bmap       // 用于保持 hmap.buckets 的溢出桶存活。
    oldoverflow *[]*bmap       // 用于保持 hmap.oldbuckets 的溢出桶存活。
    startBucket uintptr        // 迭代开始时的桶索引。
    offset      uint8          // 桶内的起始偏移量(随机化)。
    wrapped     bool           // 是否已经从桶数组的末尾绕回到开头。
    B           uint8          // 当前 `map` 的 bucket 数量的对数(`B` 表示 `2^B` 个桶)。
    i           uint8          // 当前桶内的偏移量(键值对索引)。
    bucket      uintptr        // 当前正在迭代的桶索引。
    checkBucket uintptr        // 用于检查的桶索引。
}

每次在遍历 map 时,会维护一个迭代器 hiter,着重关注下 startBucketoffset 这两个字段

  • 遍历 map 时从哪个桶开始遍历
  • 遍历桶时,从哪个位置开始遍历

接下来在看看如何初始化这个字段

func mapiterinit(t *maptype, h *hmap, it *hiter) {
    // 如果启用了数据竞争检测,并且哈希表不为空
    if raceenabled && h != nil {
        // 获取调用者的程序计数器
        callerpc := getcallerpc()
        // 记录读取操作
        racereadpc(unsafe.Pointer(h), callerpc, abi.FuncPCABIInternal(mapiterinit))
    }

    // 设置迭代器的类型信息
    it.t = t
    // 如果哈希表为空或计数为零,则直接返回
    if h == nil || h.count == 0 {
        return
    }

    // 验证迭代器的大小是否正确
    if unsafe.Sizeof(hiter{})/goarch.PtrSize != 12 {
        throw("hash_iter size incorrect") // 参见 cmd/compile/internal/reflectdata/reflect.go
    }
    // 设置迭代器的哈希表指针
    it.h = h

    // 获取当前哈希表的桶状态
    it.B = h.B
    it.buckets = h.buckets
    if t.bucket.ptrdata == 0 {
        // 如果桶没有指针数据,则分配当前溢出桶数组并记住当前和旧的指针
        // 这确保即使在迭代过程中哈希表扩展和/或增加溢出桶时,
        // 也能保留所有相关的溢出桶。
        h.createOverflow()
        it.overflow = h.extra.overflow
        it.oldoverflow = h.extra.oldoverflow
    }

    // 决定从哪里开始迭代
    var r uintptr
    if h.B > 31-bucketCntBits {
        // 如果桶的数量大于31减去桶数位数,则使用64位随机数
        r = uintptr(fastrand64())
    } else {
        // 否则使用32位随机数
        r = uintptr(fastrand())
    }
    // 随机选择一个起始桶
    it.startBucket = r & bucketMask(h.B)
    // 随机选择桶内的起始偏移量
    it.offset = uint8(r >> h.B & (bucketCnt - 1))

    // 初始化迭代器状态
    it.bucket = it.startBucket

    // 标记有一个迭代器存在
    // 可以与另一个 mapiterinit() 并发运行。
    if old := h.flags; old&(iterator|oldIterator) != iterator|oldIterator {
        // 原子操作设置标志位,表明存在迭代器
        atomic.Or8(&h.flags, iterator|oldIterator)
    }

    // 预先执行一次迭代,初始化迭代器的 key 和 elem 指针
    mapiternext(it)
}

重点是这处

// 随机选择一个起始桶
it.startBucket = r & bucketMask(h.B)
// 随机选择桶内的起始偏移量
it.offset = uint8(r >> h.B & (bucketCnt - 1))

也就是每次遍历的时候,起始桶的选择是随机的,同时桶内的起始偏移量也是随机的,这样就实现了无序性。

2.5 删除

func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
    // 如果启用了数据竞争检测,并且哈希表不为空
    if raceenabled && h != nil {
        // 获取调用者的程序计数器
        callerpc := getcallerpc()
        pc := abi.FuncPCABIInternal(mapdelete)
        // 记录写操作
        racewritepc(unsafe.Pointer(h), callerpc, pc)
        // 记录对 key 的读操作
        raceReadObjectPC(t.key, key, callerpc, pc)
    }

    // 如果启用了 MemorySanitizer(MSAN)(MSAN)检测,并且哈希表不为空
    if msanenabled && h != nil {
        // 记录对 key 的读操作
        msanread(key, t.key.size)
    }

    // 如果启用了地址清毒(ASAN)检测,并且哈希表不为空
    if asanenabled && h != nil {
        // 记录对 key 的读操作
        asanread(key, t.key.size)
    }

    // 如果哈希表为空或计数为零,则直接返回
    if h == nil || h.count == 0 {
        if t.hashMightPanic() {
            // 计算哈希值,处理可能的 panic
            t.hasher(key, 0) // 参见 issue 23734
        }
        return
    }

    // 如果哈希表标志位中有 hashWriting,则表示并发写操作,抛出异常
    if h.flags&hashWriting != 0 {
        fatal("concurrent map writes")
    }

    // 计算 key 的哈希值
    hash := t.hasher(key, uintptr(h.hash0))

    // 设置 hashWriting 标志位,在调用 t.hasher 之后设置,因为 t.hasher 可能会 panic
    h.flags ^= hashWriting

    // 计算 key 对应的桶索引
    bucket := hash & bucketMask(h.B)
    // 如果正在扩展哈希表,执行扩展操作
    if h.growing() {
        growWork(t, h, bucket)
    }
    // 获取对应桶的指针
    b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
    bOrig := b
    // 计算 tophash 值
    top := tophash(hash)
search:
    // 遍历桶及其溢出桶
    for ; b != nil; b = b.overflow(t) {
        for i := uintptr(0); i < bucketCnt; i++ {
            // 如果 tophash 不匹配,继续下一个
            if b.tophash[i] != top {
                if b.tophash[i] == emptyRest {
                    break search
                }
                continue
            }
            // 获取 key 的指针
            k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
            k2 := k
            if t.indirectkey() {
                k2 = *((*unsafe.Pointer)(k2))
            }
            // 如果 key 不匹配,继续下一个
            if !t.key.equal(key, k2) {
                continue
            }
            // 清空 key 的指针
            if t.indirectkey() {
                *(*unsafe.Pointer)(k) = nil
            } else if t.key.ptrdata != 0 {
                memclrHasPointers(k, t.key.size)
            }
            // 清空 elem 的指针
            e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
            if t.indirectelem() {
                *(*unsafe.Pointer)(e) = nil
            } else if t.elem.ptrdata != 0 {
                memclrHasPointers(e, t.elem.size)
            } else {
                memclrNoHeapPointers(e, t.elem.size)
            }
            // 标记为已删除
            b.tophash[i] = emptyOne
            // 如果桶末尾连续出现 emptyOne 状态,将其改为 emptyRest 状态
            if i == bucketCnt-1 {
                if b.overflow(t) != nil && b.overflow(t).tophash[0] != emptyRest {
                    goto notLast
                }
            } else {
                if b.tophash[i+1] != emptyRest {
                    goto notLast
                }
            }
            for {
                b.tophash[i] = emptyRest
                if i == 0 {
                    if b == bOrig {
                        break // 初始桶的开始,迭代结束
                    }
                    // 找到前一个桶,从其最后一个条目继续
                    c := b
                    for b = bOrig; b.overflow(t) != c; b = b.overflow(t) {
                    }
                    i = bucketCnt - 1
                } else {
                    i--
                }
                if b.tophash[i] != emptyOne {
                    break
                }
            }
        notLast:
            // 减少哈希表计数
            h.count--
            // 如果哈希表为空,重置哈希种子
            if h.count == 0 {
                h.hash0 = fastrand()
            }
            break search
        }
    }

    // 检查 hashWriting 标志位,确保其被清除
    if h.flags&hashWriting == 0 {
        fatal("concurrent map writes")
    }
    // 清除 hashWriting 标志位
    h.flags &^= hashWriting
}

删除 key 的操作,在定位桶和 key 的流程上和写入及查询差不多。

找到对应的 key 以后,如果此位置上存储的是指针,那么就把指针置为 nil。如果是值就清空它所在的内存。还要清理 tophash 里面的值最后把 mapkey 总个数计数器 count 减1 。

若是处在扩容过程中,那么删除操作会在扩容以后在新的 bmap 里面删除。

2.6 扩容

前面在 2.3 写入章节里介绍过,扩容满足以下两个条件的任意一个即可

  1. 溢出桶的数量超过桶数量
  2. 装载因子超过 6.5

针对这两种情况,实现的扩容机制是不一样的

2.6.1 等量扩容

若是溢出桶的数量超过桶数量,那么就会创建新桶保存数据,垃圾回收会清理老的溢出桶并释放内存。

什么情况会触发等量扩容(sameSizeGrow)呢?

如果在写入后又删除了大量的数据,这样始终不满足条件 2,且在之后的写入过程中,由于之前的桶已满,那么只能新增溢出桶并写入。

2.6.2 增量扩容

触发扩容的时机是在写入的时候

if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
	hashGrow(t, h)
	goto again // Growing the table invalidates everything, so try again
}

也就是当前 hmap 没有在扩容,并且满足上面条件任意一个,接下来看看 hasGrow 逻辑

func hashGrow(t *maptype, h *hmap) {
    // 如果哈希表的负载因子已经达到临界值,则增加桶的数量。
    // 否则,如果有太多的溢出桶,则保持相同数量的桶,并等量扩展。
    bigger := uint8(1)
    if !overLoadFactor(h.count+1, h.B) {
        // 如果哈希表没有超过负载因子,只是有太多的溢出桶,则不增加桶的数量。
        bigger = 0
        h.flags |= sameSizeGrow // 设置同大小增长标志
    }
    
    oldbuckets := h.buckets // 保存当前的桶数组
    newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil) // 创建新的桶数组

    // 设置新的哈希表标志
    flags := h.flags &^ (iterator | oldIterator) // 清除迭代器标志
    if h.flags&iterator != 0 {
        flags |= oldIterator // 如果当前存在迭代器,则设置旧迭代器标志
    }
    
    // 提交扩展(与GC原子操作)
    h.B += bigger // 增加桶数量的指数
    h.flags = flags // 更新标志
    h.oldbuckets = oldbuckets // 保存旧的桶数组
    h.buckets = newbuckets // 设置新的桶数组
    h.nevacuate = 0 // 重置迁移计数
    h.noverflow = 0 // 重置溢出桶计数

    // 如果有溢出桶,将当前溢出桶提升到旧桶中
    if h.extra != nil && h.extra.overflow != nil {
        if h.extra.oldoverflow != nil {
            throw("oldoverflow is not nil") // 如果旧溢出桶不为空,抛出异常
        }
        h.extra.oldoverflow = h.extra.overflow // 提升当前溢出桶到旧溢出桶
        h.extra.overflow = nil // 清空当前溢出桶
    }
    
    // 如果有下一个溢出桶,设置到额外信息中
    if nextOverflow != nil {
        if h.extra == nil {
            h.extra = new(mapextra) // 如果额外信息为空,则创建一个新的
        }
        h.extra.nextOverflow = nextOverflow // 设置下一个溢出桶
    }

    // 实际的数据复制由 growWork() 和 evacuate() 逐步完成
}

函数的开头也交代了何时增量扩容、何时等量扩容

bigger := uint8(1)
if !overLoadFactor(h.count+1, h.B) {
    // 如果哈希表没有超过负载因子,只是有太多的溢出桶,则不增加桶的数量。
    bigger = 0
    h.flags |= sameSizeGrow // 设置同大小增长标志
}

增量扩容是指新的正常桶比原来扩大一倍

bigger := uint8(1)
h.B += bigger // 增加桶数量的指数

比如原来 B3 共计 8 个桶,那么增量扩容后正常桶的数量为 1<<(3+1)16

同时,h.oldbuckets 保存旧的 h.buckets 桶数组,h.buckets 保存新的桶数组

oldbuckets := h.buckets  // 保存当前的桶数组
newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil) // 创建新的桶数组
h.oldbuckets = oldbuckets // 保存旧的桶数组
h.buckets = newbuckets // 设置新的桶数组

在这里插入图片描述
在此函数的最后,也阐明了实际的数据迁移是由 growWork()evacuate() 逐步完成。

2.6.3 扩容机制

接下来看看 growWork()

在前面介绍写入删除的时候,都会检查当前是否在扩容

if h.growing() {
  // 如果 map 正在扩容,则执行扩容任务。
  growWork(t, h, bucket)
}

// growing reports whether h is growing. The growth may be to the same size or bigger.
func (h *hmap) growing() bool {
	return h.oldbuckets != nil
}

也就是通过 h.oldbuckets 是否不为 nil 来判断当前 map 是否在扩容,若 map 正在扩容,则执行扩容。

func growWork(t *maptype, h *hmap, bucket uintptr) {
	// 确保我们移动的 oldbucket 对应的是我们马上就要用到的那一个
	evacuate(t, h, bucket&h.oldbucketmask())

	// 如果还在 growing 状态,再多移动一个 oldbucket
	if h.growing() {
		evacuate(t, h, h.nevacuate)
	}
}

这里可以看到每次迁移两个桶,一个是当前的桶,另外一个是 hmap 里指向的 nevacuate 的桶,这是增量迁移。这个和 Redisrehash 惰性迁移一致,只是 Golang 中在写入和删除的时候迁移。这样就既不影响写入、删除,又进行了扩容,也算是一种取舍吧。

真正的迁移是在 evacuate() 函数中

func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
    // 获取旧桶的指针
    b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
    
    // 计算新桶的数量
    newbit := h.noldbuckets()
    
    // 如果旧桶还没有被迁移
    if !evacuated(b)) {
        // TODO: 如果没有迭代器使用旧桶,可以复用溢出桶而不是使用新桶。
        
        // xy 包含 x 和 y(低位和高位)的迁移目标。
        var xy [2]evacDst
        x := &xy[0]
        x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
        x.k = add(unsafe.Pointer(x.b), dataOffset)
        x.e = add(x.k, bucketCnt*uintptr(t.keysize))

        if !h.sameSizeGrow() {
            // 仅在增长时计算 y 指针。
            // 否则 GC 可能会看到错误的指针。
            y := &xy[1]
            y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
            y.k = add(unsafe.Pointer(y.b), dataOffset)
            y.e = add(y.k, bucketCnt*uintptr(t.keysize))
        }

        // 处理旧桶中的每个溢出桶
        for ; b != nil; b = b.overflow(t) {
            k := add(unsafe.Pointer(b), dataOffset)
            e := add(k, bucketCnt*uintptr(t.keysize))
            for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) {
                top := b.tophash[i]
                if isEmpty(top) {
                    b.tophash[i] = evacuatedEmpty
                    continue
                }
                if top < minTopHash {
                    throw("bad map state")
                }
                k2 := k
                if t.indirectkey() {
                    k2 = *((*unsafe.Pointer)(k2))
                }
                var useY uint8
                if !h.sameSizeGrow() {
                    // 计算哈希以决定是将此键/元素发送到桶 x 还是桶 y。
                    hash := t.hasher(k2, uintptr(h.hash0))
                    if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) {
                        // 如果 key != key (NaNs),那么哈希可能会完全不同于旧哈希。
                        // 而且,它不是可重现的。在存在迭代器的情况下,重现性是必须的,
                        // 因为我们的迁移决策必须与迭代器做出的决策相匹配。
                        // 幸运的是,我们可以自由地将这些键任意发送。
                        // 我们让 tophash 的低位驱动迁移决策。
                        // 我们为下一级重新计算一个新的随机 tophash,
                        // 以便这些键在多次扩展后均匀分布在所有桶中。
                        useY = top & 1
                        top = tophash(hash)
                    } else {
                        if hash&newbit != 0 {
                            useY = 1
                        }
                    }
                }

                if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY {
                    throw("bad evacuatedN")
                }

                b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY
                dst := &xy[useY]                 // 迁移目标

                if dst.i == bucketCnt {
                    dst.b = h.newoverflow(t, dst.b)
                    dst.i = 0
                    dst.k = add(unsafe.Pointer(dst.b), dataOffset)
                    dst.e = add(dst.k, bucketCnt*uintptr(t.keysize))
                }
                dst.b.tophash[dst.i&(bucketCnt-1)] = top // 掩码 dst.i 作为一种优化,以避免边界检查
                if t.indirectkey() {
                    *(*unsafe.Pointer)(dst.k) = k2 // 复制指针
                } else {
                    typedmemmove(t.key, dst.k, k) // 复制元素
                }
                if t.indirectelem() {
                    *(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)
                } else {
                    typedmemmove(t.elem, dst.e, e)
                }
                dst.i++
                // 这些更新可能会将这些指针越过键或元素数组的末尾。
                // 不过我们在桶的末尾有溢出指针来防止指向桶的末尾之外。
                dst.k = add(dst.k, uintptr(t.keysize))
                dst.e = add(dst.e, uintptr(t.elemsize))
            }
        }
        // 取消链接溢出桶并清除键/元素以帮助 GC。
        if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 {
            b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))
            // 保留 b.tophash,因为迁移状态在此处维护。
            ptr := add(b, dataOffset)
            n := uintptr(t.bucketsize) - dataOffset
            memclrHasPointers(ptr, n)
        }
    }

    // 更新迁移标记
    if oldbucket == h.nevacuate {
        advanceEvacuationMark(h, t, newbit)
    }
}

这里需着重关注下这个 evacDst 数据结构

// evacDst 结构体表示在调整大小过程中,数据迁移的目的地
// 它包含了当前目标桶、索引以及指向键/元素存储的指针信息
type evacDst struct {
    b *bmap          // 当前目标桶(指向 bmap 结构的指针)
    i int            // 键/元素在 b 中的索引(在桶中存储键/元素的位置)
    k unsafe.Pointer // 指向当前键存储的指针(指向键内存位置的原始指针)
    e unsafe.Pointer // 指向当前元素存储的指针(指向元素内存位置的原始指针)
}

这个 evacDst 的主要作用是 rehash 和数据迁移。

// xy 包含 x 和 y(低位和高位)的迁移目标。
var xy [2]evacDst
x := &xy[0]
x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
x.k = add(unsafe.Pointer(x.b), dataOffset)
x.e = add(x.k, bucketCnt*uintptr(t.keysize))

if !h.sameSizeGrow() {
    // 仅在增长时计算 y 指针。
    // 否则 GC 可能会看到错误的指针。
    y := &xy[1]
    y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
    y.k = add(unsafe.Pointer(y.b), dataOffset)
    y.e = add(y.k, bucketCnt*uintptr(t.keysize))
}

这里的 xy 分别代表扩容后新桶的前半段与后半段,若是等量扩容的话,那么只有 x

因为在等量扩容下,旧桶与新桶之间是一对一的关系,因此这里不过做就做过多介绍。

这里说下增量扩容,需要重新计算 keyhash 哈希值,然后确认在低位桶还是在高位桶。

newbit := h.noldbuckets()
hash := t.hasher(k2, uintptr(h.hash0))
if hash&newbit != 0 {
  useY = 1
}

这里我把相关代码给摘了出来,先计算 newbit,也就是水位值。为何这么说呢,看看这个值是如何计算的。

func (h *hmap) noldbuckets() uintptr {
    oldB := h.B
    if !h.sameSizeGrow() {
        oldB--
    }
    return bucketShift(oldB)
}

// bucketShift returns 1<<b, optimized for code generation.
func bucketShift(b uint8) uintptr {
    return uintptr(1) << (b & (goarch.PtrSize*8 - 1))
}

比如增量扩容后 B4newbit 就是 1<<(4-1) 也就是 8,二进制为 1000。我们知道二进制与 & 操作是相同位的值都为 1 那么结果为 1,否则为 0,而 Golang 是采用位运算来获取桶的编号的。

先获取 key 的哈希值,然后拿低 B 位再和 newbit 做与运算,一旦结果为 0 说明桶是小于水位值的,也就是低位桶,否则就是高位桶。

举个示例:旧桶 B2,也就是 4 个桶,其中在第二个桶中有 2key 的哈希值低 3 位分别为 010110。增量扩容后, B 变成了 3,所以 010110 便分别落入 26 号桶。不过还得告诉程序扩容后落在地位桶 x 还是高位桶 y。这里把上面的算法套用一下就知道了,010&100 结果为 0 落入低位桶 x110&100 结果为 1 落入高位桶 y

在这里插入图片描述

b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY
dst := &xy[useY]                 // 迁移目标

useY 的作用就是这里了,目标桶依据 useY 是落入的地位桶还是高位桶。之前介绍过 xy 数组的第一位为低位桶(新桶的前半段),第二位为高位桶(新桶的后半段)。

const(
	evacuatedX     = 2 // key/elem is valid.  Entry has been evacuated to first half of larger table.
	evacuatedY     = 3 // same as above, but evacuated to second half of larger table.
)

这里就是标记旧桶中 b.tophash[i] 已被迁移,至于迁移到低位桶还是高位桶就依据 evacuatedX + useY 来确定了

  • evacuatedX,低位桶,也就是新桶的前半段
  • evacuatedY,高位桶,也就是新桶的后半段,当 userY1 时,evacuatedX+userY 就是高位桶
if dst.i == bucketCnt {
    dst.b = h.newoverflow(t, dst.b)
    dst.i = 0
    dst.k = add(unsafe.Pointer(dst.b), dataOffset)
    dst.e = add(dst.k, bucketCnt*uintptr(t.keysize))
}

如果迁移的桶都满了,那么新建个溢出捅,重置索引,并把 dst.kdst.e 指向新的键和值存储位置。

 dst.b.tophash[dst.i&(bucketCnt-1)] = top // 掩码 dst.i 作为一种优化,以避免边界检查
 if t.indirectkey() {
     *(*unsafe.Pointer)(dst.k) = k2 // 复制指针
 } else {
     typedmemmove(t.key, dst.k, k) // 复制元素
 }
 if t.indirectelem() {
     *(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)
 } else {
     typedmemmove(t.elem, dst.e, e)
 }
 dst.i++

 dst.k = add(dst.k, uintptr(t.keysize))
 dst.e = add(dst.e, uintptr(t.elemsize))

前面准备工作都做好了,这里就是迁移数据了。

if oldbucket == h.nevacuate {
	advanceEvacuationMark(h, t, newbit)
}

这里主要是更新哈希表的迁移进度标记 (nevacuate),以便逐步完成哈希表的增长 rehash 过程。

前面在介绍 growWork() 函数提到过,扩容时每次迁移两个值,一个是当前操作的值,另一个就是按照迁移进度正常迁移的值。这里就是对比下,是否刚好这两个值是一样的,是的话则更新下进度标记。

func advanceEvacuationMark(h *hmap, t *maptype, newbit uintptr) {
    // 将当前迁移标记推进一个桶
    h.nevacuate++
    // 实验表明1024是一个过大的值,但作为保障以确保O(1)行为
    stop := h.nevacuate + 1024
    // 确保停止标记不超过新桶数量
    if stop > newbit {
        stop = newbit
    }
    // 遍历并推进迁移标记,直到到达停止标记或遇到尚未迁移的桶
    for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {
        h.nevacuate++
    }
    // 如果迁移标记到达了新桶数量,表示所有旧桶都已被迁移
    if h.nevacuate == newbit { // newbit 表示旧桶的数量
        // 迁移完成,释放旧的主桶数组
        h.oldbuckets = nil
        // 同时可以丢弃旧的溢出桶,如果它们没有被迭代器引用
        if h.extra != nil {
            h.extra.oldoverflow = nil
        }
        // 清除 sameSizeGrow 标志,表示增长过程已完成
        h.flags &^= sameSizeGrow
    }
}

这个函数的作用是增加哈希的 nevacuate 计数器,并在所有的旧桶都被迁移后清空 bmapoldbucketsoldoverflow

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/763793.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

文件上传漏洞---Pyload

文章目录 前言一、pandas是什么&#xff1f;二、使用步骤 1.引入库2.读入数据总结 前言 本文重点从靶场案例分析文件上传漏洞常见的Pylod&#xff0c;本文演示靶场upload-labs 一.文件类型---Pyload 不同的文件对应不同的文件类型&#xff0c;后端代码通过限制特定的文件类型…

【C++】C++指针在线程中调用与受保护内存空间读取方法

引言 在C的多线程编程中&#xff0c;正确地管理内存和同步访问是确保程序稳定性和安全性的关键。特别是当涉及到指针在线程中的调用时&#xff0c;对受保护内存空间的访问必须谨慎处理&#xff0c;以防止数据竞争、死锁和内存损坏等问题。本文将详细探讨C指针在线程中调用时如何…

提升入住率|智慧酒店解决方案,打造有温度的居住体验!

近年来&#xff0c;智慧酒店被越来越多的人关注&#xff0c;由生物识别、物联网技术和互联网技术融合产生的智慧酒店解决方案&#xff0c;不仅可以提升顾客在酒店的入住体验&#xff0c;还可以帮助酒店降低运营成本&#xff0c;这也让越来越的酒店选择了智慧酒店的赛道&#xf…

c++读取文件时出现中文乱码

原因&#xff1a;UTF-8格式不支持汉字编码 解决&#xff1a;改成ANSI&#xff0c;因为ANSI编码支持汉字编码

新款奔驰GLE350升级原厂空气悬挂系统有哪些功能

奔驰 GLE350 升级原厂空气悬挂带来了一系列显著的优势和功能&#xff1a; 1. 舒适性提升 • 能够根据不同的路况和驾驶模式自动调节悬挂硬度和高度&#xff0c;有效过滤路面颠簸&#xff0c;为驾乘者提供更加平稳、舒适的行驶体验。 2. 行驶高度调节 • 驾驶者可以手动或自…

明日周刊-第14期

不好意思又拖更了哈哈哈。不过赶在7月的第一天&#xff0c;打算更新一下。建党节&#xff0c;值得纪念的一天。 文章目录 一周热点资源分享言论歌曲推荐 一周热点 国内科技新闻 深中通道建成通车 时间&#xff1a;2024年6月30日 内容&#xff1a;深圳至中山跨江通道正式建成开…

【06】SpringBoot与Web开发

1、基于Restful风格的接口 RestController RequestMapping("/demo") public class DemoController {GetMapping("/hello")public String getHello(){return "SpringBoot HelloWorld! 123";}GetMapping("/{id}")public User getUser(P…

【支撑文档】系统安全保证措施(word原件)

软件安全保证措施word 软件所有全套资料获取进主页或者本文末个人名片直接。

图形的搭建

例一&#xff1a; 输入描述&#xff1a; 多组输入&#xff0c;一个整数&#xff08;2~20&#xff09;&#xff0c;表示输出的行数&#xff0c;也表示组成“X”的反斜线和正斜线的长度。 输出描述&#xff1a; 针对每行输入&#xff0c;输出用“*”组成的X形图案。 示例一&…

【C语言】19.预处理详解

文章目录 1.预定义符号2.#define定义常量3.#define定义宏4.带有副作用的宏参数5.宏替换的规则6.宏函数的对比7.#和##7.1 #运算符7.2 ## 运算符 8.命名约定9.#undef10.命令行定义11.条件编译12.头文件的包含12.1 头⽂件被包含的⽅式12.1.1 本地⽂件包含12.1.2 库⽂件包含 12.2 嵌…

基于协同过滤的航空票务推荐系统的设计与实现(飞机票推荐系统)

&#x1f497;博主介绍&#x1f497;&#xff1a;✌在职Java研发工程师、专注于程序设计、源码分享、技术交流、专注于Java技术领域和毕业设计✌ 温馨提示&#xff1a;文末有 CSDN 平台官方提供的老师 Wechat / QQ 名片 :) Java精品实战案例《700套》 2025最新毕业设计选题推荐…

鸿蒙OS开发者高级学习第2课:自由流转(含习题答案)

自由流转两种形态&#xff1a;相继使用&#xff08;跨端迁移&#xff09;&#xff1b;同时使用&#xff08; 多端协同&#xff09; 习题&#xff1a;

【云原生】服务网格(Istio)如何简化微服务通信

&#x1f407;明明跟你说过&#xff1a;个人主页 &#x1f3c5;个人专栏&#xff1a;《未来已来&#xff1a;云原生之旅》&#x1f3c5; &#x1f516;行路有良友&#xff0c;便是天堂&#x1f516; 目录 一、引言 1、微服务架构的兴起 2、Istio&#xff1a;服务网格的佼…

《昇思25天学习打卡营第27天 | 昇思MindSporeShuffleNet图像分类》

27天 本节学习了ShuffleNet图像分类 ShuffleNetV1是旷视科技提出的一种计算高效的CNN模型&#xff0c;和MobileNet, SqueezeNet等一样主要应用在移动端&#xff0c;模型的设计目标就是利用有限的计算资源来达到最好的模型精度。设计核心是引入了两种操作&#xff1a;Pointwis…

Fanuc DPRNT宏程序串口采集

此种方式可用于设备没有网口的情形 该方式的核心原理是利用设备串口的输出能力&#xff0c;进行串口输出。但这里有一点需要注意&#xff0c;这种方式串口输出不具备实时性。因为串口输出是设备主动输出&#xff0c;采集程序只是被动接收而已&#xff0c;所以没有办法做到实时…

网络爬虫(二) 哔哩哔哩热榜高频词按照图片形状排列

我们有时候需要爬取结果生成为自定义的词云图 生成自定义的词云图通常需要以下步骤&#xff1a; 1. 爬取数据&#xff1a;使用爬虫工具或库&#xff0c;如requests、BeautifulSoup等&#xff0c;可以爬取网页、论坛、社交媒体等平台上的文本数据。 2. 数据预处理&#xff1a…

自动驾驶---Motion Planning之多段五次多项式

1 前言 在之前的博客系列文章中和读者朋友们聊过Apollo的 Motion Planning方案: 《自动驾驶---Motion Planning之LaneChange》 《自动驾驶---Motion Planning之Path Boundary》 《自动驾驶---Motion Planning之Speed Boundary》 《自动驾驶---Motion Planning之轨迹Path优化》…

20240701在飞凌的OK3588-C开发板的Android12系统下使用i2cdetect确认I2C总线

console:/ # i2cdetect -y -r 0 console:/ # i2cdetect -l console:/ # i2cdetect -F 0 20240701在飞凌的OK3588-C开发板的Android12系统下使用i2cdetect确认I2C总线 2024/7/1 11:30 在CAM1、CAM2挂载OV13850。 在CAM3、CAM4和CAM5挂载OV5645了。 console:/ # i2 i2cdetect i2…

音乐:触动心灵的艺术语言

Enjoy your music 音 乐 作为一种跨越时空和文化的艺术形式&#xff0c;拥有着无穷的魅力和力量。 它不仅能够带给我们愉悦的听觉享受&#xff0c;还对我们的身心健康、认知发展和社会交往产生着深远的影响。 一、音乐的基本元素 音乐由多个基本元素构成&#xff0c;包括…

【黑龙江等保测评具体是怎样做的?】

实现等保测评一般包括下列步骤&#xff1a; 1.黑龙江等保测评的目标&#xff1a;要明确评价的对象&#xff0c;即评价的范围和重点&#xff0c;以及要达到的层次。这样就可以保证评估工作是根据企业的实际需要来开展的。 2.黑龙江等保测评的现场测评&#xff1a;搜集有关的安…