323 lines
7.2 KiB
Go
323 lines
7.2 KiB
Go
package mutexes
|
|
|
|
import (
|
|
"runtime"
|
|
"sync"
|
|
"sync/atomic"
|
|
)
|
|
|
|
// locktype defines maskable mutexmap lock types.
|
|
type locktype uint8
|
|
|
|
const (
|
|
// possible lock types.
|
|
lockTypeRead = locktype(1) << 0
|
|
lockTypeWrite = locktype(1) << 1
|
|
lockTypeMap = locktype(1) << 2
|
|
|
|
// possible mutexmap states.
|
|
stateUnlockd = uint8(0)
|
|
stateRLocked = uint8(1)
|
|
stateLocked = uint8(2)
|
|
stateInUse = uint8(3)
|
|
)
|
|
|
|
// permitLockType returns if provided locktype is permitted to go ahead in current state.
|
|
func permitLockType(state uint8, lt locktype) bool {
|
|
switch state {
|
|
// Unlocked state
|
|
// (all allowed)
|
|
case stateUnlockd:
|
|
return true
|
|
|
|
// Keys locked, no state lock.
|
|
// (don't allow map locks)
|
|
case stateInUse:
|
|
return lt&lockTypeMap == 0
|
|
|
|
// Read locked
|
|
// (only allow read locks)
|
|
case stateRLocked:
|
|
return lt&lockTypeRead != 0
|
|
|
|
// Write locked
|
|
// (none allowed)
|
|
case stateLocked:
|
|
return false
|
|
|
|
// shouldn't reach here
|
|
default:
|
|
panic("unexpected state")
|
|
}
|
|
}
|
|
|
|
// MutexMap is a structure that allows having a map of self-evicting mutexes
|
|
// by key. You do not need to worry about managing the contents of the map,
|
|
// only requesting RLock/Lock for keys, and ensuring to call the returned
|
|
// unlock functions.
|
|
type MutexMap struct {
|
|
mus map[string]RWMutex
|
|
mapMu sync.Mutex
|
|
pool sync.Pool
|
|
queue []func()
|
|
evict []func()
|
|
count int32
|
|
maxmu int32
|
|
state uint8
|
|
}
|
|
|
|
// NewMap returns a new MutexMap instance with provided max no. open mutexes.
|
|
func NewMap(max int32) MutexMap {
|
|
if max < 1 {
|
|
// Default = 128 * GOMAXPROCS
|
|
procs := runtime.GOMAXPROCS(0)
|
|
max = int32(procs * 128)
|
|
}
|
|
return MutexMap{
|
|
mus: make(map[string]RWMutex),
|
|
pool: sync.Pool{
|
|
New: func() interface{} {
|
|
return NewRW()
|
|
},
|
|
},
|
|
maxmu: max,
|
|
}
|
|
}
|
|
|
|
// acquire will either acquire a mutex from pool or alloc.
|
|
func (mm *MutexMap) acquire() RWMutex {
|
|
return mm.pool.Get().(RWMutex)
|
|
}
|
|
|
|
// release will release provided mutex to pool.
|
|
func (mm *MutexMap) release(mu RWMutex) {
|
|
mm.pool.Put(mu)
|
|
}
|
|
|
|
// spinLock will wait (using a mutex to sleep thread) until 'cond()' returns true,
|
|
// returning with map lock. Note that 'cond' is performed within a map lock.
|
|
func (mm *MutexMap) spinLock(cond func() bool) {
|
|
mu := mm.acquire()
|
|
defer mm.release(mu)
|
|
|
|
for {
|
|
// Get map lock
|
|
mm.mapMu.Lock()
|
|
|
|
// Check if return
|
|
if cond() {
|
|
return
|
|
}
|
|
|
|
// Queue ourselves
|
|
unlock := mu.Lock()
|
|
mm.queue = append(mm.queue, unlock)
|
|
mm.mapMu.Unlock()
|
|
|
|
// Wait on notify
|
|
mu.Lock()()
|
|
}
|
|
}
|
|
|
|
// lockMutex will acquire a lock on the mutex at provided key, handling earlier allocated mutex if provided. Unlocks map on return.
|
|
func (mm *MutexMap) lockMutex(key string, lt locktype) func() {
|
|
var unlock func()
|
|
|
|
// Incr counter
|
|
mm.count++
|
|
|
|
// Check for existing mutex at key
|
|
mu, ok := mm.mus[key]
|
|
if !ok {
|
|
// Alloc from pool
|
|
mu = mm.acquire()
|
|
mm.mus[key] = mu
|
|
|
|
// Queue mutex for eviction
|
|
mm.evict = append(mm.evict, func() {
|
|
delete(mm.mus, key)
|
|
mm.pool.Put(mu)
|
|
})
|
|
}
|
|
|
|
// If no state, set in use.
|
|
// State will already have been
|
|
// set if this is from LockState{}
|
|
if mm.state == stateUnlockd {
|
|
mm.state = stateInUse
|
|
}
|
|
|
|
switch {
|
|
// Read lock
|
|
case lt&lockTypeRead != 0:
|
|
unlock = mu.RLock()
|
|
|
|
// Write lock
|
|
case lt&lockTypeWrite != 0:
|
|
unlock = mu.Lock()
|
|
|
|
// shouldn't reach here
|
|
default:
|
|
panic("unexpected lock type")
|
|
}
|
|
|
|
// Unlock map + return
|
|
mm.mapMu.Unlock()
|
|
return func() {
|
|
mm.mapMu.Lock()
|
|
unlock()
|
|
go mm.onUnlock()
|
|
}
|
|
}
|
|
|
|
// onUnlock is performed as the final (async) stage of releasing an acquired key / map mutex.
|
|
func (mm *MutexMap) onUnlock() {
|
|
// Decr counter
|
|
mm.count--
|
|
|
|
if mm.count < 1 {
|
|
// Perform all queued evictions
|
|
for i := 0; i < len(mm.evict); i++ {
|
|
mm.evict[i]()
|
|
}
|
|
|
|
// Notify all waiting goroutines
|
|
for i := 0; i < len(mm.queue); i++ {
|
|
mm.queue[i]()
|
|
}
|
|
|
|
// Reset the map state
|
|
mm.evict = nil
|
|
mm.queue = nil
|
|
mm.state = stateUnlockd
|
|
}
|
|
|
|
// Finally, unlock
|
|
mm.mapMu.Unlock()
|
|
}
|
|
|
|
// RLockMap acquires a read lock over the entire map, returning a lock state for acquiring key read locks.
|
|
// Please note that the 'unlock()' function will block until all keys locked from this state are unlocked.
|
|
func (mm *MutexMap) RLockMap() *LockState {
|
|
return mm.getMapLock(lockTypeRead)
|
|
}
|
|
|
|
// LockMap acquires a write lock over the entire map, returning a lock state for acquiring key read/write locks.
|
|
// Please note that the 'unlock()' function will block until all keys locked from this state are unlocked.
|
|
func (mm *MutexMap) LockMap() *LockState {
|
|
return mm.getMapLock(lockTypeWrite)
|
|
}
|
|
|
|
// RLock acquires a mutex read lock for supplied key, returning an RUnlock function.
|
|
func (mm *MutexMap) RLock(key string) (runlock func()) {
|
|
return mm.getLock(key, lockTypeRead)
|
|
}
|
|
|
|
// Lock acquires a mutex write lock for supplied key, returning an Unlock function.
|
|
func (mm *MutexMap) Lock(key string) (unlock func()) {
|
|
return mm.getLock(key, lockTypeWrite)
|
|
}
|
|
|
|
// getLock will fetch lock of provided type, for given key, returning unlock function.
|
|
func (mm *MutexMap) getLock(key string, lt locktype) func() {
|
|
// Spin until achieve lock
|
|
mm.spinLock(func() bool {
|
|
return permitLockType(mm.state, lt) &&
|
|
mm.count < mm.maxmu // not overloaded
|
|
})
|
|
|
|
// Perform actual mutex lock
|
|
return mm.lockMutex(key, lt)
|
|
}
|
|
|
|
// getMapLock will acquire a map lock of provided type, returning a LockState session.
|
|
func (mm *MutexMap) getMapLock(lt locktype) *LockState {
|
|
// Spin until achieve lock
|
|
mm.spinLock(func() bool {
|
|
return permitLockType(mm.state, lt|lockTypeMap) &&
|
|
mm.count < mm.maxmu // not overloaded
|
|
})
|
|
|
|
// Incr counter
|
|
mm.count++
|
|
|
|
switch {
|
|
// Set read lock state
|
|
case lt&lockTypeRead != 0:
|
|
mm.state = stateRLocked
|
|
|
|
// Set write lock state
|
|
case lt&lockTypeWrite != 0:
|
|
mm.state = stateLocked
|
|
|
|
default:
|
|
panic("unexpected lock type")
|
|
}
|
|
|
|
// Unlock + return
|
|
mm.mapMu.Unlock()
|
|
return &LockState{
|
|
mmap: mm,
|
|
ltyp: lt,
|
|
}
|
|
}
|
|
|
|
// LockState represents a window to a locked MutexMap.
|
|
type LockState struct {
|
|
wait sync.WaitGroup
|
|
mmap *MutexMap
|
|
done uint32
|
|
ltyp locktype
|
|
}
|
|
|
|
// Lock: see MutexMap.Lock() definition. Will panic if map only read locked.
|
|
func (st *LockState) Lock(key string) (unlock func()) {
|
|
return st.getLock(key, lockTypeWrite)
|
|
}
|
|
|
|
// RLock: see MutexMap.RLock() definition.
|
|
func (st *LockState) RLock(key string) (runlock func()) {
|
|
return st.getLock(key, lockTypeRead)
|
|
}
|
|
|
|
// UnlockMap will close this state and release the currently locked map.
|
|
func (st *LockState) UnlockMap() {
|
|
// Set state to finished (or panic if already done)
|
|
if !atomic.CompareAndSwapUint32(&st.done, 0, 1) {
|
|
panic("called UnlockMap() on expired state")
|
|
}
|
|
|
|
// Wait until done
|
|
st.wait.Wait()
|
|
|
|
// Async reset map
|
|
st.mmap.mapMu.Lock()
|
|
go st.mmap.onUnlock()
|
|
}
|
|
|
|
// getLock: see MutexMap.getLock() definition.
|
|
func (st *LockState) getLock(key string, lt locktype) func() {
|
|
st.wait.Add(1) // track lock
|
|
|
|
// Check if closed, or if write lock is allowed
|
|
if atomic.LoadUint32(&st.done) == 1 {
|
|
panic("map lock closed")
|
|
} else if lt&lockTypeWrite != 0 &&
|
|
st.ltyp&lockTypeWrite == 0 {
|
|
panic("called .Lock() on rlocked map")
|
|
}
|
|
|
|
// Spin until achieve map lock
|
|
st.mmap.spinLock(func() bool {
|
|
return st.mmap.count < st.mmap.maxmu
|
|
}) // i.e. not overloaded
|
|
|
|
// Perform actual mutex lock
|
|
unlock := st.mmap.lockMutex(key, lt)
|
|
|
|
return func() {
|
|
unlock()
|
|
st.wait.Done()
|
|
}
|
|
}
|