mirror of
1
Fork 0
gotosocial/vendor/codeberg.org/gruf/go-mutexes/map.go

287 lines
6.9 KiB
Go
Raw Normal View History

package mutexes
import (
"sync"
"sync/atomic"
"unsafe"
)
2022-01-24 17:35:13 +01:00
const (
// possible lock types.
lockTypeRead = uint8(1) << 0
lockTypeWrite = uint8(1) << 1
2022-01-24 17:35:13 +01:00
// frequency of GC cycles
// per no. unlocks. i.e.
// every 'gcfreq' unlocks.
gcfreq = 1024
2022-01-24 17:35:13 +01:00
)
// MutexMap is a structure that allows read / write locking
// per key, performing as you'd expect a map[string]*RWMutex
// to perform, without you needing to worry about deadlocks
// between competing read / write locks and the map's own mutex.
// It uses memory pooling for the internal "mutex" (ish) types
// and performs self-eviction of keys.
//
// Under the hood this is achieved using a single mutex for the
// map, state tracking for individual keys, and some sync.Cond{}
// like structures for sleeping / awaking awaiting goroutines.
type MutexMap struct {
mapmu sync.Mutex
mumap map[string]*rwmutex
mupool rwmutexPool
count uint32
}
// checkInit ensures MutexMap is initialized (UNSAFE).
func (mm *MutexMap) checkInit() {
if mm.mumap == nil {
mm.mumap = make(map[string]*rwmutex)
}
}
// Lock acquires a write lock on key in map, returning unlock function.
func (mm *MutexMap) Lock(key string) func() {
return mm.lock(key, lockTypeWrite)
}
// RLock acquires a read lock on key in map, returning runlock function.
func (mm *MutexMap) RLock(key string) func() {
return mm.lock(key, lockTypeRead)
2022-01-24 17:35:13 +01:00
}
func (mm *MutexMap) lock(key string, lt uint8) func() {
// Perform first map lock
// and check initialization
// OUTSIDE the main loop.
mm.mapmu.Lock()
mm.checkInit()
2022-01-24 17:35:13 +01:00
for {
// Check map for mu.
mu := mm.mumap[key]
2022-01-24 17:35:13 +01:00
if mu == nil {
// Allocate new mutex.
mu = mm.mupool.Acquire()
mm.mumap[key] = mu
2022-01-24 17:35:13 +01:00
}
if !mu.Lock(lt) {
// Wait on mutex unlock, after
// immediately relocking map mu.
mu.WaitRelock(&mm.mapmu)
continue
}
// Done with map.
mm.mapmu.Unlock()
2022-01-24 17:35:13 +01:00
// Return mutex unlock function.
return func() { mm.unlock(key, mu) }
2022-01-24 17:35:13 +01:00
}
}
func (mm *MutexMap) unlock(key string, mu *rwmutex) {
// Get map lock.
mm.mapmu.Lock()
2022-01-24 17:35:13 +01:00
// Unlock mutex.
if mu.Unlock() {
2022-01-24 17:35:13 +01:00
// Mutex fully unlocked
// with zero waiters. Self
// evict and release it.
delete(mm.mumap, key)
mm.mupool.Release(mu)
}
2022-01-24 17:35:13 +01:00
if mm.count++; mm.count%gcfreq == 0 {
// Every 'gcfreq' unlocks perform
// a garbage collection to keep
// us squeaky clean :]
mm.mupool.GC()
}
// Done with map.
mm.mapmu.Unlock()
}
// rwmutexPool is a very simply memory rwmutexPool.
type rwmutexPool struct {
current []*rwmutex
victim []*rwmutex
2022-01-24 17:35:13 +01:00
}
// Acquire will returns a rwmutexState from rwmutexPool (or alloc new).
func (p *rwmutexPool) Acquire() *rwmutex {
// First try the current queue
if l := len(p.current) - 1; l >= 0 {
mu := p.current[l]
p.current = p.current[:l]
return mu
}
// Next try the victim queue.
if l := len(p.victim) - 1; l >= 0 {
mu := p.victim[l]
p.victim = p.victim[:l]
return mu
}
2022-01-24 17:35:13 +01:00
// Lastly, alloc new.
mu := new(rwmutex)
return mu
}
// Release places a sync.rwmutexState back in the rwmutexPool.
func (p *rwmutexPool) Release(mu *rwmutex) {
p.current = append(p.current, mu)
2022-01-24 17:35:13 +01:00
}
// GC will clear out unused entries from the rwmutexPool.
func (p *rwmutexPool) GC() {
current := p.current
p.current = nil
p.victim = current
2022-01-24 17:35:13 +01:00
}
// rwmutex represents a RW mutex when used correctly within
// a MapMutex. It should ONLY be access when protected by
// the outer map lock, except for the 'notifyList' which is
// a runtime internal structure borrowed from the sync.Cond{}.
//
// this functions very similarly to a sync.Cond{}, but with
// lock state tracking, and returning on 'Broadcast()' whether
// any goroutines were actually awoken. it also has a less
// confusing API than sync.Cond{} with the outer locking
// mechanism we use, otherwise all Cond{}.L would reference
// the same outer map mutex.
type rwmutex struct {
n notifyList // 'trigger' mechanism
l int32 // no. locks
t uint8 // lock type
2022-01-24 17:35:13 +01:00
}
// Lock will lock the mutex for given lock type, in the
// sense that it will update the internal state tracker
// accordingly. Return value is true on successful lock.
func (mu *rwmutex) Lock(lt uint8) bool {
switch mu.t {
case lockTypeRead:
// already read locked,
// only permit more reads.
if lt != lockTypeRead {
return false
}
case lockTypeWrite:
// already write locked,
// no other locks allowed.
return false
default:
// Fully unlocked,
// set incoming type.
mu.t = lt
}
// Update
// count.
mu.l++
return true
}
// Unlock will unlock the mutex, in the sense that it
// will update the internal state tracker accordingly.
// On totally unlocked state, it will awaken all
// sleeping goroutines waiting on this mutex.
func (mu *rwmutex) Unlock() bool {
switch mu.l--; {
case mu.l > 0 && mu.t == lockTypeWrite:
panic("BUG: multiple writer locks")
case mu.l < 0:
panic("BUG: negative lock count")
case mu.l == 0:
// Fully unlocked.
mu.t = 0
// Awake all blocked goroutines and check
// for change in the last notified ticket.
before := atomic.LoadUint32(&mu.n.notify)
runtime_notifyListNotifyAll(&mu.n)
after := atomic.LoadUint32(&mu.n.notify)
// If ticket changed, this indicates
// AT LEAST one goroutine was awoken.
//
// (before != after) => (waiters > 0)
// (before == after) => (waiters = 0)
return (before == after)
default:
// i.e. mutex still
// locked by others.
return false
}
}
// WaitRelock expects a mutex to be passed in, already in the
// locked state. It incr the notifyList waiter count before
// unlocking the outer mutex and blocking on notifyList wait.
// On awake it will decr wait count and relock outer mutex.
func (mu *rwmutex) WaitRelock(outer *sync.Mutex) {
// add ourselves to list while still
// under protection of outer map lock.
t := runtime_notifyListAdd(&mu.n)
// Finished with
// outer map lock.
outer.Unlock()
// Block until awoken by another
// goroutine within mu.Unlock().
runtime_notifyListWait(&mu.n, t)
// Relock!
outer.Lock()
}
// unused fields left
// un-named for safety.
type notifyList struct {
_ uint32 // wait uint32
notify uint32 // notify uint32
_ uintptr // lock mutex
_ unsafe.Pointer // head *sudog
_ unsafe.Pointer // tail *sudog
}
// See runtime/sema.go for documentation.
//
//go:linkname runtime_notifyListAdd sync.runtime_notifyListAdd
func runtime_notifyListAdd(l *notifyList) uint32
[performance] media processing improvements (#1288) * media processor consolidation and reformatting, reduce amount of required syscalls Signed-off-by: kim <grufwub@gmail.com> * update go-store library, stream jpeg/png encoding + use buffer pools, improved media processing AlreadyExists error handling Signed-off-by: kim <grufwub@gmail.com> * fix duration not being set, fix mp4 test expecting error Signed-off-by: kim <grufwub@gmail.com> * fix test expecting media files with different extension Signed-off-by: kim <grufwub@gmail.com> * remove unused code Signed-off-by: kim <grufwub@gmail.com> * fix expected storage paths in tests, update expected test thumbnails Signed-off-by: kim <grufwub@gmail.com> * remove dead code Signed-off-by: kim <grufwub@gmail.com> * fix cached presigned s3 url fetching Signed-off-by: kim <grufwub@gmail.com> * fix tests Signed-off-by: kim <grufwub@gmail.com> * fix test models Signed-off-by: kim <grufwub@gmail.com> * update media processing to use sync.Once{} for concurrency protection Signed-off-by: kim <grufwub@gmail.com> * shutup linter Signed-off-by: kim <grufwub@gmail.com> * fix passing in KVStore GetStream() as stream to PutStream() Signed-off-by: kim <grufwub@gmail.com> * fix unlocks of storage keys Signed-off-by: kim <grufwub@gmail.com> * whoops, return the error... Signed-off-by: kim <grufwub@gmail.com> * pour one out for tobi's code <3 Signed-off-by: kim <grufwub@gmail.com> * add back the byte slurping code Signed-off-by: kim <grufwub@gmail.com> * check for both ErrUnexpectedEOF and EOF Signed-off-by: kim <grufwub@gmail.com> * add back links to file format header information Signed-off-by: kim <grufwub@gmail.com> Signed-off-by: kim <grufwub@gmail.com>
2023-01-11 12:13:13 +01:00
// See runtime/sema.go for documentation.
//
//go:linkname runtime_notifyListWait sync.runtime_notifyListWait
func runtime_notifyListWait(l *notifyList, t uint32)
// See runtime/sema.go for documentation.
//
//go:linkname runtime_notifyListNotifyAll sync.runtime_notifyListNotifyAll
func runtime_notifyListNotifyAll(l *notifyList)
// Ensure that sync and runtime agree on size of notifyList.
//
//go:linkname runtime_notifyListCheck sync.runtime_notifyListCheck
func runtime_notifyListCheck(size uintptr)
func init() {
var n notifyList
runtime_notifyListCheck(unsafe.Sizeof(n))
}