mirror of
https://github.com/d0zingcat/gocryptotrader.git
synced 2026-05-22 07:26:50 +00:00
common/timedmutex: clean and improve (#1486)
* improv. timed mutex * Add all protection back in and jankyness because races. :'( * Add intial benchmarkeroos * Add master benchmarks * goodness me * what? * what again? * glorious: nits * just a swaperino instead --------- Co-authored-by: Ryan O'Hara-Reid <ryan.oharareid@thrasher.io>
This commit is contained in:
@@ -2,77 +2,61 @@ package timedmutex
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
// NewTimedMutex creates a new timed mutex with a
|
||||
// specified duration
|
||||
func NewTimedMutex(length time.Duration) *TimedMutex {
|
||||
return &TimedMutex{
|
||||
duration: length,
|
||||
}
|
||||
// TimedMutex is a blocking mutex which will unlock after a specified time
|
||||
type TimedMutex struct {
|
||||
// primary mutex is the main lock that will be unlocked after the duration
|
||||
primary sync.Mutex
|
||||
// secondary mutex is used to protect the timer
|
||||
secondary sync.Mutex
|
||||
timer *time.Timer
|
||||
// primed is used to determine if the timer has been started this is
|
||||
// slightly more performant than checking the timer directly and interacting
|
||||
// with a RW mutex.
|
||||
primed atomic.Bool
|
||||
duration time.Duration
|
||||
}
|
||||
|
||||
// LockForDuration will start a timer, lock the mutex,
|
||||
// then allow the caller to continue
|
||||
// NewTimedMutex creates a new timed mutex with a specified duration
|
||||
func NewTimedMutex(length time.Duration) *TimedMutex {
|
||||
return &TimedMutex{duration: length}
|
||||
}
|
||||
|
||||
// LockForDuration will start a timer, lock the mutex, then allow the caller to continue
|
||||
// After the duration, the mutex will be unlocked
|
||||
func (t *TimedMutex) LockForDuration() {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go t.lockAndSetTimer(&wg)
|
||||
wg.Wait()
|
||||
t.primary.Lock()
|
||||
if !t.primed.Swap(true) {
|
||||
t.secondary.Lock()
|
||||
t.timer = time.AfterFunc(t.duration, func() { t.primary.Unlock() })
|
||||
t.secondary.Unlock()
|
||||
} else {
|
||||
// Timer C channel is not used with AfterFunc, so no need to drain.
|
||||
t.secondary.Lock()
|
||||
t.timer.Reset(t.duration)
|
||||
t.secondary.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (t *TimedMutex) lockAndSetTimer(wg *sync.WaitGroup) {
|
||||
t.mtx.Lock()
|
||||
t.setTimer()
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
// UnlockIfLocked will unlock the mutex if its currently locked
|
||||
// Will return true if successfully unlocked
|
||||
// UnlockIfLocked will unlock the mutex if its currently locked Will return true
|
||||
// if successfully unlocked
|
||||
func (t *TimedMutex) UnlockIfLocked() bool {
|
||||
if t.isTimerNil() {
|
||||
if !t.primed.Load() {
|
||||
return false
|
||||
}
|
||||
|
||||
if !t.stopTimer() {
|
||||
t.secondary.Lock()
|
||||
wasStoppedByCall := t.timer.Stop()
|
||||
t.secondary.Unlock()
|
||||
|
||||
if !wasStoppedByCall {
|
||||
// Timer has already fired and the mutex has been unlocked.
|
||||
// Timer C channel is not used with AfterFunc, so no need to drain.
|
||||
return false
|
||||
}
|
||||
t.mtx.Unlock()
|
||||
t.primary.Unlock()
|
||||
return true
|
||||
}
|
||||
|
||||
// stopTimer will return true if timer has been stopped by this command
|
||||
// If the timer has expired, clear the channel
|
||||
func (t *TimedMutex) stopTimer() bool {
|
||||
t.timerLock.Lock()
|
||||
defer t.timerLock.Unlock()
|
||||
if !t.timer.Stop() {
|
||||
select {
|
||||
case <-t.timer.C:
|
||||
default:
|
||||
}
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// isTimerNil safely read locks to detect nil
|
||||
func (t *TimedMutex) isTimerNil() bool {
|
||||
t.timerLock.RLock()
|
||||
isNil := t.timer == nil
|
||||
t.timerLock.RUnlock()
|
||||
return isNil
|
||||
}
|
||||
|
||||
// setTimer safely locks and sets a timer
|
||||
// which will automatically execute a mutex unlock
|
||||
// once timer expires
|
||||
func (t *TimedMutex) setTimer() {
|
||||
t.timerLock.Lock()
|
||||
t.timer = time.AfterFunc(t.duration, func() {
|
||||
t.mtx.Unlock()
|
||||
})
|
||||
t.timerLock.Unlock()
|
||||
}
|
||||
|
||||
@@ -5,13 +5,44 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// 1000000 1074 ns/op 136 B/op 4 allocs/op (prev)
|
||||
// 2423571 503.9 ns/op 0 B/op 0 allocs/op (current)
|
||||
func BenchmarkTimedMutexTime(b *testing.B) {
|
||||
tm := NewTimedMutex(20 * time.Millisecond)
|
||||
tm := NewTimedMutex(0)
|
||||
for i := 0; i < b.N; i++ {
|
||||
tm.LockForDuration()
|
||||
}
|
||||
}
|
||||
|
||||
// 352309195 3.194 ns/op 0 B/op 0 allocs/op (prev)
|
||||
// 927051118 1.298 ns/op 0 B/op 0 allocs/op
|
||||
func BenchmarkTimedMutexTimeUnlockNotPrimed(b *testing.B) {
|
||||
tm := NewTimedMutex(0)
|
||||
for i := 0; i < b.N; i++ {
|
||||
tm.UnlockIfLocked()
|
||||
}
|
||||
}
|
||||
|
||||
// 95322825 15.51 ns/op 0 B/op 0 allocs/op (prev)
|
||||
// 239158972 4.621 ns/op 0 B/op 0 allocs/op
|
||||
func BenchmarkTimedMutexTimeUnlockPrimed(b *testing.B) {
|
||||
tm := NewTimedMutex(0)
|
||||
tm.LockForDuration()
|
||||
for i := 0; i < b.N; i++ {
|
||||
tm.UnlockIfLocked()
|
||||
}
|
||||
}
|
||||
|
||||
// 1000000 1193 ns/op 136 B/op 4 allocs/op (prev)
|
||||
// 38592405 36.12 ns/op 0 B/op 0 allocs/op
|
||||
func BenchmarkTimedMutexTimeLinearInteraction(b *testing.B) {
|
||||
tm := NewTimedMutex(0)
|
||||
for i := 0; i < b.N; i++ {
|
||||
tm.LockForDuration()
|
||||
tm.UnlockIfLocked()
|
||||
}
|
||||
}
|
||||
|
||||
func TestConsistencyOfPanicFreeUnlock(t *testing.T) {
|
||||
t.Parallel()
|
||||
duration := 20 * time.Microsecond
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
package timedmutex
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TimedMutex is a blocking mutex which will unlock
|
||||
// after a specified time
|
||||
type TimedMutex struct {
|
||||
mtx sync.Mutex
|
||||
timerLock sync.RWMutex
|
||||
timer *time.Timer
|
||||
duration time.Duration
|
||||
}
|
||||
Reference in New Issue
Block a user