LocalCache
缓存可以是单机缓存(egs: LocalCache),也可以是分布式缓存(egs: Redis),缓存存在的意义在于缓解后端数据库的压力
例如下面的场景:
如果Redis发生了缓存雪崩或其它异常,那么一瞬间,所有的请求都会直接打到Mysql数据库,很容易把Mysql打挂
针对这样的场景,一种比较方便的解决方案是使用LocalCache,也就是给每个Service实例加上LocalCache,服务可以使用单机的内存来缓存不易于变更的数据,从而使绝大多数的重复请求不会直接到Redis,如下图所示
知识点汇总
想要读懂FreeCache的代码,我们需要以下的一些知识储备,我列在这里,如果对文中的部分名词不太清楚的同学可以点到相应的链接自行学习
- 32位系统上使用
atomic
访问64位字,如果该64位字的数据地址非8字节对齐,则会出现panic
,这个在freecahce
中的segment
结构体上有所体现,segment
结构体中的下划线的作用就是保证segment
结构体中使用atomic
操作字段的安全性,详细请参考issue freecache
中使用内存对齐技术提高CPU访问数据的速度,具体参考- 在
Go
中,对于数组和切片的GC
扫描可以近似看作O(1)
的时间具体参考
FreeCache的特别之处
- 可以存储数以百万的数据
- 几乎为0的
GC
压力 - 并发安全
- 纯
Go
实现 - 近似
LRU
算法 - 严格限制内存使用
- 支持迭代
FreeCache整体框架
根据上面的数据结构,先概述下freecache
查询数据的流程,对于某个key
,首先用Sum64
算法hash出一个数字num1
,从而得到该key
要存入的segment
,每一个segment
对应256
个slot
,取num1
的低8
位得到slotId
后,再根据num1
的低16
位(hash16
)得到写入数据在该slotId
下的entryPtr
,从而找到数据在ringBuf
中的存储起始位置offset
,进而就可以进行一些数据方面的操作了
源码分析
对于流程有了一个大概的熟悉之后,就可以顺着这条链路来看freecache
的每一个模块了
const segmentCount = 256
type Cache struct {
locks [segmentCount]sync.Mutex
segments [segmentCount]segment
}
上面这个结构体是freecache
组件的入口,可以看出有256
个segment
及依附在这256
个segment
上的锁,因为每一个segment
底层都会使用[]byte
类型的切片存储数据,为了防止并发造成的数据错乱问题,必须使用锁来保护
freecache
源码比较难懂的地方一个是evacuate
方法,另一个是底层的RingBuf
(下标的变化比较难懂),下面也会着重讲解下这块的内容
- 核心方法
func Set(key, value []byte, expireSeconds int) (err error)
func Get(key []byte) (value []byte, err error)
Set
Set
方法主要的逻辑分为以下几块:
- 判断参数是否合法
freecache
对于底层存储的数据有大小的限制,key
的长度不能超过65535
,且key
和value
的总长度不能超过freecache
预设大小的1/1024
,代码如下
if len(key) > 65535 {
return ErrLargeKey
}
maxKeyValLen := len(seg.rb.data)/4 - ENTRY_HDR_SIZE
if len(key)+len(value) > maxKeyValLen {
// Do not accept large entry.
return ErrLargeEntry
}
- 寻找当前的
key
存入的slot
及在该slot
中的位置
寻找key
存入的slot
使用的是偏移量的方式,什么意思呢?可以先看下segment
的结构,标*为重要字段,其他字段都是为打点(Metrics)服务的字段
type segment struct {
*rb RingBuf // 存储byte数据的ringBuf
segId int // 当前segment的id
_ uint32 // 为了保障atomic在32位系统上访问64位字的安全性
missCount int64 // 当前segment没有找到key的次数
hitCount int64 // 当前segment找到key的次数
entryCount int64 // 当前segment存入(key, value)对的个数
totalCount int64 // 当前segment存过的所有(key, value)对,包括已经删除的
totalTime int64 // 存储所有的(key, value)对访问时间的总和,便于近似LRU操作
timer Timer // 当前segment的计时组件
totalEvacuate int64 // 执行近似LRU策略的次数
totalExpired int64 // 过期的(key, value)对的个数
overwrites int64 // 覆盖写的次数
touched int64 // 更新过期(key, value)对的过期时间函数(Touch)的计数器
*vacuumLen int64 // 当前segment的剩余容量
*slotLens [256]int32 // 存储所有slot实际存储的数据长度
*slotCap int32 // 每一个slot占用的容量
*slotsData []entryPtr // 被256个slot共享的底层切片
}
每一个segment
对应256
个slot
,所有的slot
共享slotData
,那这个映射是如何做的呢,我们看下面这张图
其实就是先找到当前slot
在slotData
中的偏移量offset
,然后切出[offset, offset+slotLen)
,容量为slotCap
的一段连续的切片,代码如下
func (seg *segment) getSlot(slotId uint8) []entryPtr {
slotOff := int32(slotId) * seg.slotCap
return seg.slotsData[slotOff : slotOff+seg.slotLens[slotId] : slotOff+seg.slotCap]
}
找到key
在slot
中位置的方法是使用二分查找,二分依赖的就是上面提到的hash16
的值,代码如下
func (seg *segment) lookup(slot []entryPtr, hash16 uint16, key []byte) (idx int, match bool) {
idx = entryPtrIdx(slot, hash16)
for idx < len(slot) {
ptr := &slot[idx]
if ptr.hash16 != hash16 {
break
}
match = int(ptr.keyLen) == len(key) && seg.rb.EqualAt(key, ptr.offset+ENTRY_HDR_SIZE)
if match {
return
}
idx++
}
return
}
上面的这段代码可以分成两块来看:
第一块: 找到hash16
值为函数参数的起始entryPtr
的位置
func entryPtrIdx(slot []entryPtr, hash16 uint16) (idx int) {
high := len(slot)
for idx < high {
mid := (idx + high) >> 1
oldEntry := &slot[mid]
if oldEntry.hash16 < hash16 {
idx = mid + 1
} else {
high = mid
}
}
return
}
第二块: 在hash16
值相同的entryPtr
中找到key
与函数参数匹配的entryPtr
func (rb *RingBuf) EqualAt(p []byte, off int64) bool {
if off+int64(len(p)) > rb.end || off < rb.begin {
return false
}
readOff := rb.getDataOff(off)
readEnd := readOff + len(p)
if readEnd <= len(rb.data) {
return bytes.Equal(p, rb.data[readOff:readEnd])
} else {
firstLen := len(rb.data) - readOff
equal := bytes.Equal(p[:firstLen], rb.data[readOff:])
if equal {
secondLen := len(p) - firstLen
equal = bytes.Equal(p[firstLen:], rb.data[:secondLen])
}
return equal
}
}
第二块代码中的函数作用是判断底层ringBuf
从off
位置开始与p
是否匹配,首先是参数判断,在ringBuf
中维护了begin
和end
的概念,off
的范围必须在[begin, end)
之间,之后调用getDataOff
方法获取off
位置实际读取的开始位置(这里属于较为绕的一块,会在下面专门讲下ringBuf
的设计),之后通过p
的长度获取读取的结束位置,然后看数据是否分为两段,如果没有,则直接比较,否则分段进行比较
- 赋值数据的头部信息
如果说并没有在指定的slot
中找到当前key
,则需要一个新的头部信息
,否则需要对原有的头部信息
进行更新,先说下头部信息
是什么吧,看下面这个结构体
type entryHdr struct {
accessTime uint32 // 数据的访问时间
expireAt uint32 // 数据的过期时间
keyLen uint16 // 数据key的长度
hash16 uint16 // 数据hash16的值
valLen uint32 // 数据val的长度
valCap uint32 // 为数据val分配的容量
deleted bool // 数据是否被删除
slotId uint8 // 数据存入的slotId
reserved uint16 // 预留字段,也用来做内存对齐
}
在entryPtr
中,我们可以看到既有valLen
也有valCap
,那么valCap
的作用是什么呢?其实是给val
预分配的一段空间,看下面这张图
valCap
等于valLen
+黄色区域
,freecache
对于每一个存入的数据都是以上图的形式进行组织,其中头部的长度是固定的ENTRY_HDR_SIZE=24
对于头部信息的更新看下面的这个代码块
var hdrBuf [ENTRY_HDR_SIZE]byte
hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0]))
if match {
matchedPtr := &slot[idx]
seg.rb.ReadAt(hdrBuf[:], matchedPtr.offset)
hdr.slotId = slotId
hdr.hash16 = hash16
hdr.keyLen = uint16(len(key))
originAccessTime := hdr.accessTime
hdr.accessTime = now
hdr.expireAt = expireAt
hdr.valLen = uint32(len(value))
if hdr.valCap >= hdr.valLen {
//in place overwrite
atomic.AddInt64(&seg.totalTime, int64(hdr.accessTime)-int64(originAccessTime))
seg.rb.WriteAt(hdrBuf[:], matchedPtr.offset)
seg.rb.WriteAt(value, matchedPtr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen))
atomic.AddInt64(&seg.overwrites, 1)
return
}
// avoid unnecessary memory copy.
seg.delEntryPtr(slotId, slot, idx)
match = false
// increase capacity and limit entry len.
for hdr.valCap < hdr.valLen {
hdr.valCap *= 2
}
if hdr.valCap > uint32(maxKeyValLen-len(key)) {
hdr.valCap = uint32(maxKeyValLen - len(key))
}
} else {
hdr.slotId = slotId
hdr.hash16 = hash16
hdr.keyLen = uint16(len(key))
hdr.accessTime = now
hdr.expireAt = expireAt
hdr.valLen = uint32(len(value))
hdr.valCap = uint32(len(value))
if hdr.valCap == 0 { // avoid infinite loop when increasing capacity.
hdr.valCap = 1
}
}
代码有点长,但是思路还是很清晰的,首先定义了一个长度为ENTRY_HDR_SIZE
的数组,之后的做法比较巧妙,通过使用unsafe
包将指向数组头部的指针赋值给hdr
,使下面对于hdr
的操作可以直接映射到数组上,是一个比较精巧的写法,判断是否在slot
中找到了要找的key
,如果找到了,则判断预留的valCap
是否可以装下更新的valLen
,如何可以的话,则更新下头部信息和val
的信息就可以直接返回了,否则,对valCap
进行扩充,每次乘2
,直到可以装下现在的valLen
为止,但是不能超过之前计算的maxKeyValLen
,这里需要注意,还需要将该key
在之前slot
中的信息删除seg.delEntryPtr(slotId, slot, idx)
,但是底层的ringBuf
不进行删除,什么意思呢?前面多次提到entryPtr
但没有进行详述,这里我们先看下在slot
中存储的结构体entryPtr
的具体信息吧
type entryPtr struct {
offset int64 // 数据在ringBuf中的偏移量
hash16 uint16 // 数据hash16的值
keyLen uint16 // 数据key的长度
reserved uint32 // 预留字段,用于内存对齐
}
(注: keyLen是一个优化点,在lookup
函数中找到起始位置后,可以先通过比较keyLen与要找的key的长度是否一样进行过滤,免去了不必要的字节比较)
可以看出entryPtr
主要保存的是数据在ringBuf
中的偏移量offset
,所以如果上面在slot
中删除这个entryPtr
之后,只需要插入新的entryPtr
,但是offset
不同就好了,而底层的ringBuf
中的那个 脏数据 是一直在的,因为下面的evacuate
函数会对已经删除的数据进行回收, 脏数据 会被新的数据覆盖,所以这里不需要做多余的数据拷贝
- 进行数据回收
首先得到数据写入的总长度
entryLen := ENTRY_HDR_SIZE + int64(len(key)) + int64(hdr.valCap)
也就是len(头部信息)
+ len(key)
+ len(valCap)
,进行相应的数据剔除操作,当然是发生在容量不足的情况下,代码如下,也是freecache
中比较绕的一个点
func (seg *segment) evacuate(entryLen int64, slotId uint8, now uint32) (slotModified bool) {
var oldHdrBuf [ENTRY_HDR_SIZE]byte
consecutiveEvacuate := 0
for seg.vacuumLen < entryLen {
oldOff := seg.rb.End() + seg.vacuumLen - seg.rb.Size()
seg.rb.ReadAt(oldHdrBuf[:], oldOff)
oldHdr := (*entryHdr)(unsafe.Pointer(&oldHdrBuf[0]))
oldEntryLen := ENTRY_HDR_SIZE + int64(oldHdr.keyLen) + int64(oldHdr.valCap)
if oldHdr.deleted {
consecutiveEvacuate = 0
atomic.AddInt64(&seg.totalTime, -int64(oldHdr.accessTime))
atomic.AddInt64(&seg.totalCount, -1)
seg.vacuumLen += oldEntryLen
continue
}
expired := oldHdr.expireAt != 0 && oldHdr.expireAt < now
leastRecentUsed := int64(oldHdr.accessTime)*atomic.LoadInt64(&seg.totalCount) <= atomic.LoadInt64(&seg.totalTime)
if expired || leastRecentUsed || consecutiveEvacuate > 5 {
seg.delEntryPtrByOffset(oldHdr.slotId, oldHdr.hash16, oldOff)
if oldHdr.slotId == slotId {
slotModified = true
}
consecutiveEvacuate = 0
atomic.AddInt64(&seg.totalTime, -int64(oldHdr.accessTime))
atomic.AddInt64(&seg.totalCount, -1)
seg.vacuumLen += oldEntryLen
if expired {
atomic.AddInt64(&seg.totalExpired, 1)
} else {
atomic.AddInt64(&seg.totalEvacuate, 1)
}
} else {
// evacuate an old entry that has been accessed recently for better cache hit rate.
newOff := seg.rb.Evacuate(oldOff, int(oldEntryLen))
seg.updateEntryPtr(oldHdr.slotId, oldHdr.hash16, oldOff, newOff)
consecutiveEvacuate++
atomic.AddInt64(&seg.totalEvacuate, 1)
}
}
return
}
这块儿的逻辑比较 重 ,用流程图的方式讲解会比较清楚
ringBuf
中的Evacuate
方法我们一会在下面着重讲解
- 将
key
,value
信息写入
在执行完evacuate
之后,我们需要再看下当前key
要在slot
中的插入位置,因为evacuate
中如果涉及当前slot
中信息的删除操作,之前计算的idx
会发生变化,最后我们在slot
及ringBuf
中写入需要录入的信息就完成了一次数据的插入动作
好啦,上面介绍了这么多,主要是以宏观的角度自上而下进行了分析,但是对于ringBuf
这个数据结构大家还是比较迷,这里专门来讲下这个数据结构,老规矩,先来看下它的结构体定义
type RingBuf struct {
begin int64 // 数据的起始位置,取模len(data)为数据的真实位置
end int64 // 数据的结束位置,取模len(data)为数据的真实位置
data []byte // 数据存放的底层byte数组
index int // 维护数据下一次要写入的真实位置&所有数据的起始位置
}
接下来我们重点看ringBuf
中几个核心函数
getDataOff
func (rb *RingBuf) getDataOff(off int64) int {
var dataOff int
if rb.end-rb.begin < int64(len(rb.data)) {
dataOff = int(off - rb.begin)
} else {
dataOff = rb.index + int(off-rb.begin)
}
if dataOff >= len(rb.data) {
dataOff -= len(rb.data)
}
return dataOff
}
这个函数的作用是获取给定off
在底层data
中的真实偏移量,分为两种情况,第一种是数据还没有填满,也就是index
还没有绕data
一圈,此时的dataOff
就是off - rb.begin
,对应下图的这种情况
如果index
绕data
1 或 n 圈后,此时的off
和begin
的值就可能不在[0, len(data)-1)
的范围内,但index
维护了数据真实的起始位置,故dataOff
为rb.index + int(off-rb.begin)
,对应下图这种情况
真实的data
长度为5
,虚线部分是假想的,实际情况下end
应该在1
处,5
和6
两个位置的元素对应data
中0
和1
两个位置,是一个环
ReadAt
func (rb *RingBuf) ReadAt(p []byte, off int64) (n int, err error) {
if off > rb.end || off < rb.begin {
err = ErrOutOfRange
return
}
readOff := rb.getDataOff(off)
readEnd := readOff + int(rb.end-off)
if readEnd <= len(rb.data) {
n = copy(p, rb.data[readOff:readEnd])
} else {
n = copy(p, rb.data[readOff:])
if n < len(p) {
n += copy(p[n:], rb.data[:readEnd-len(rb.data)])
}
}
if n < len(p) {
err = io.EOF
}
return
}
知道了getDataOff
的作用,看ReadAt
就比较得心应手了,首先获取到真实的读取位置readOff
,readEnd
为真实读取位置加要读取的数据长度,如果readEnd
没有超出data
的长度,则直接读取,否则,先读取[readBegin, len(data))
部分的内容,然后读取[0, readEnd-len(data))
部分的内容
Write
func (rb *RingBuf) Write(p []byte) (n int, err error) {
if len(p) > len(rb.data) {
err = ErrOutOfRange
return
}
for n < len(p) {
written := copy(rb.data[rb.index:], p[n:])
rb.end += int64(written)
n += written
rb.index += written
if rb.index >= len(rb.data) {
rb.index -= len(rb.data)
}
}
if int(rb.end-rb.begin) > len(rb.data) {
rb.begin = rb.end - int64(len(rb.data))
}
return
}
Write
操作是附加写,这里要注意,在freecache
中,越往后的数据为越新的数据,所以index
也要移动len(p)
次,将写入的数据放到最后
Evacuate
func (rb *RingBuf) Evacuate(off int64, length int) (newOff int64) {
if off+int64(length) > rb.end || off < rb.begin {
return -1
}
readOff := rb.getDataOff(off)
if readOff == rb.index {
// no copy evacuate
rb.index += length
if rb.index >= len(rb.data) {
rb.index -= len(rb.data)
}
} else if readOff < rb.index {
var n = copy(rb.data[rb.index:], rb.data[readOff:readOff+length])
rb.index += n
if rb.index == len(rb.data) {
rb.index = copy(rb.data, rb.data[readOff+n:readOff+length])
}
} else {
var readEnd = readOff + length
var n int
if readEnd <= len(rb.data) {
n = copy(rb.data[rb.index:], rb.data[readOff:readEnd])
rb.index += n
} else {
n = copy(rb.data[rb.index:], rb.data[readOff:])
rb.index += n
var tail = length - n
n = copy(rb.data[rb.index:], rb.data[:tail])
rb.index += n
if rb.index == len(rb.data) {
rb.index = copy(rb.data, rb.data[n:tail])
}
}
}
newOff = rb.end
rb.end += int64(length)
if rb.begin < rb.end-int64(len(rb.data)) {
rb.begin = rb.end - int64(len(rb.data))
}
return
}
Evacuate
方法是整个ringBuf
中逻辑最为复杂的一个方法,它的作用是将off
位置开始长度为length
的数据移动到所有数据的尾部,同其它方法一样,首先也要获取真实的读取位置readOff
,之后分了三种情况,如果readOff
和index
的位置相同,那么其实只需要将index
循环移动length
长度即可,如果readOff
小于index
,根据getDataOff
函数可以知道,此时的index
还未绕data
一周(绕data
一周的dataOff
为rb.index + int(off-rb.begin)
,必然>=index
),所以直接将[readOff, readOff+length)
的数据进行循环追加即可,可以保证移动数据的正确性,如果readOff
大于index
,此时需要判断下readEnd
和len(data)
的关系,如果是<=,则直接从index
位置拷贝数据即可,否则需要分两段进行覆盖,最后获取[off, off+length)
数据新的偏移量newOff
,以及更新end
和begin
的值
(注: Set
方法中的evacuate
方法有一个判断是如果连续执行了5
次以上的Evacuate
方法,则删除当前的第一个entryPtr
,这里可能会有疑惑是,如果连续执行5
次Evacuate
方法,会不会有数据的覆盖写造成脏数据的情况,答案是不会的,在执行一次Evacuate
之后,新的oldOff
会是之前移动元素的下一个,所以并不会有覆盖写)
Skip
func (rb *RingBuf) Skip(length int64) {
rb.end += length
rb.index += int(length)
for rb.index >= len(rb.data) {
rb.index -= len(rb.data)
}
if int(rb.end-rb.begin) > len(rb.data) {
rb.begin = rb.end - int64(len(rb.data))
}
}
Skip
函数主要用于分配的valCap
比真实的valLen
大的情况,写入头部信息
、key
、val
之后,再向后跳valCap-valLen
即可
Get
理清上面Set
方法及ringBuf
的脉络之后,对于Get
方法的理解就是降维打击了,Get
方法的流程总体分为两大块(函数中有一个peek
字段用于表示是否需要打点信息,不太关键,小伙伴们自行理解)
- 定位数据并获取相应的
头部信息
,代码如下
hdr, ptr, err := seg.locate(key, hashVal, peek)
if err != nil {
return
}
func (seg *segment) locate(key []byte, hashVal uint64, peek bool) (hdr *entryHdr, ptr *entryPtr, err error) {
slotId := uint8(hashVal >> 8)
hash16 := uint16(hashVal >> 16)
slot := seg.getSlot(slotId)
idx, match := seg.lookup(slot, hash16, key)
if !match {
err = ErrNotFound
if !peek {
atomic.AddInt64(&seg.missCount, 1)
}
return
}
ptr = &slot[idx]
var hdrBuf [ENTRY_HDR_SIZE]byte
seg.rb.ReadAt(hdrBuf[:], ptr.offset)
hdr = (*entryHdr)(unsafe.Pointer(&hdrBuf[0]))
if !peek {
now := seg.timer.Now()
if hdr.expireAt != 0 && hdr.expireAt <= now {
seg.delEntryPtr(slotId, slot, idx)
atomic.AddInt64(&seg.totalExpired, 1)
err = ErrNotFound
atomic.AddInt64(&seg.missCount, 1)
return
}
atomic.AddInt64(&seg.totalTime, int64(now-hdr.accessTime))
hdr.accessTime = now
seg.rb.WriteAt(hdrBuf[:], ptr.offset)
}
return hdr, ptr, err
}
和Set
方法一样,也是先获取到key
存入的slot
及在该slot
中的位置,如果没有找到则直接返回报错信息,否则获取到数据的头部信息
后,判断当前数据是否已经过期,过期数据同样直接返回报错信息,如果没有过期,则将数据的头部信息
及entryPtr
返回
- 从
ringBuf
中读取数据
seg.rb.ReadAt(value, ptr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen))
可以发现,在Get
方法中并没有将访问过的数据直接移动到所有数据的尾部,而真正的LRU是会这样做的,这也解释了为什么freecache
的LRU算法被称为近似的LRU
- 非核心方法
func Touch(key []byte, expireSeconds int) (err error)
func GetFn(key []byte, fn func([]byte) error) (err error)
...
非核心方法主要包括一些聚合函数,例如GetOrSet
或者SetAndGet
,还有一些带兜底的函数,例如GetFn
,此外最多的就是打点的函数如ExpiredCount
等,因为底层调用同核心方法基本类似,这里不做过多的阐述
总结
freecache
这块的内容就讲到这里啦,小伙伴们对文中提到的点如果有疑惑的地方,可以在评论区提出来,当然,文章中如果有阐述不对的点,也请小伙伴们及时指正~