engine/exchanges: Add exchange currency state subsystem (#774)

* state: Add management system (init)

* linter: fix

* engine: gofmt

* gct: after merge fixup

* documentation: add

* rpc: implement services for testing

* gctcli: gofmt state_management.go

* documentation: reinstate lost information

* state: Add pair check to determine trading operation

* exchanges: add interface for specific state scoped subsystem functionality

* engine/order_man: reduce code footprint using new method

* RPC: implement pair trading request and change exported name to something specific to state

* engine: add tests

* engine: Add to withdraw manager

* documentation: reinstate soxipy in contrib. list

* engine: const fake name

* Glorious: NITERINOS

* merge: fix issues

* engine: csm incorporate service name into log output

* engine: fix linter issues

* gct: fix tests

* currencystate: remove management type

* rpc: fix tests

* backtester: fix tests

* Update engine/currency_state_manager.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* Update engine/currency_state_manager.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* Update exchanges/currencystate/currency_state.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* Update exchanges/alert/alert.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* Update exchanges/alert/alert.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* glorious: nits

* config: integrate with config and remove flag delay adjustment

* gctcli: fix issues after name changes

* engine: gofmt manager file

* Update engine/rpcserver.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* engine: Add enable/disable manager functions, add default popoulation for potential assets

* linter: fix

* engine/test: bump subsystem count

* Update engine/currency_state_manager.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* Update exchanges/bithumb/bithumb.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* glorious: nits addressed

* alert: fix commenting for its generalized purpose

* glorious: nits

* engine: use standard string in log output

* bitfinex: apply patch, thanks @thrasher-

* bitfinex: fix spelling

* engine/currencystate: Add logs/fix logs

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>
This commit is contained in:
Ryan O'Hara-Reid
2021-09-27 13:33:49 +10:00
committed by GitHub
parent 1d7c656665
commit 5dfbbf84de
48 changed files with 4685 additions and 1110 deletions

View File

@@ -2,11 +2,11 @@ package orderbook
import (
"sync"
"sync/atomic"
"time"
"github.com/gofrs/uuid"
"github.com/thrasher-corp/gocryptotrader/dispatch"
"github.com/thrasher-corp/gocryptotrader/exchanges/alert"
"github.com/thrasher-corp/gocryptotrader/log"
)
@@ -18,7 +18,7 @@ type Depth struct {
// unexported stack of nodes
stack *stack
Alert
alert.Notice
mux *dispatch.Mux
id uuid.UUID
@@ -101,7 +101,7 @@ func (d *Depth) LoadSnapshot(bids, asks []Item, lastUpdateID int64, lastUpdated
d.restSnapshot = updateByREST
d.bids.load(bids, d.stack)
d.asks.load(asks, d.stack)
d.alert()
d.Alert()
d.m.Unlock()
}
@@ -112,7 +112,7 @@ func (d *Depth) Flush() {
d.lastUpdated = time.Time{}
d.bids.load(nil, d.stack)
d.asks.load(nil, d.stack)
d.alert()
d.Alert()
d.m.Unlock()
}
@@ -132,7 +132,7 @@ func (d *Depth) UpdateBidAskByPrice(bidUpdts, askUpdts Items, maxDepth int, last
if len(askUpdts) != 0 {
d.asks.updateInsertByPrice(askUpdts, d.stack, maxDepth, tn)
}
d.alert()
d.Alert()
d.m.Unlock()
}
@@ -157,7 +157,7 @@ func (d *Depth) UpdateBidAskByID(bidUpdts, askUpdts Items, lastUpdateID int64, l
}
d.lastUpdateID = lastUpdateID
d.lastUpdated = lastUpdated
d.alert()
d.Alert()
return nil
}
@@ -182,7 +182,7 @@ func (d *Depth) DeleteBidAskByID(bidUpdts, askUpdts Items, bypassErr bool, lastU
}
d.lastUpdateID = lastUpdateID
d.lastUpdated = lastUpdated
d.alert()
d.Alert()
return nil
}
@@ -207,7 +207,7 @@ func (d *Depth) InsertBidAskByID(bidUpdts, askUpdts Items, lastUpdateID int64, l
}
d.lastUpdateID = lastUpdateID
d.lastUpdated = lastUpdated
d.alert()
d.Alert()
return nil
}
@@ -230,7 +230,7 @@ func (d *Depth) UpdateInsertByID(bidUpdts, askUpdts Items, lastUpdateID int64, l
return err
}
}
d.alert()
d.Alert()
d.lastUpdateID = lastUpdateID
d.lastUpdated = lastUpdated
return nil
@@ -281,79 +281,3 @@ func (d *Depth) IsFundingRate() bool {
defer d.m.Unlock()
return d.isFundingRate
}
// Alert defines fields required to alert sub-systems of a change of state to
// re-check depth list
type Alert struct {
// Channel to wait for an alert on.
forAlert chan struct{}
// Lets the updater functions know if there are any routines waiting for an
// alert.
sema uint32
// After closing the forAlert channel this will notify when all the routines
// that have waited, have either checked the orderbook depth or finished.
wg sync.WaitGroup
// Segregated lock only for waiting routines, so as this does not interfere
// with the main depth lock, acts as a rolling gate.
m sync.Mutex
}
// alert establishes a state change on the orderbook depth.
func (a *Alert) alert() {
// CompareAndSwap is used to swap from 1 -> 2 so we don't keep actuating
// the opposing compare and swap in method wait. This function can return
// freely when an alert operation is in process.
if !atomic.CompareAndSwapUint32(&a.sema, 1, 2) {
// Return if no waiting routines or currently alerting.
return
}
go func() {
// Actuate lock in a different routine, as alerting is a second order
// priority compared to updating and releasing calling routine.
a.m.Lock()
// Closing; alerts many waiting routines.
close(a.forAlert)
// Wait for waiting routines to receive alert and return.
a.wg.Wait()
atomic.SwapUint32(&a.sema, 0) // Swap back to neutral state.
a.m.Unlock()
}()
}
// Wait pauses calling routine until depth change has been established via depth
// method alert. Kick allows for cancellation of waiting or when the caller has
// has been shut down, if this is not needed it can be set to nil. This
// returns a channel so strategies can cleanly wait on a select statement case.
func (a *Alert) Wait(kick <-chan struct{}) <-chan bool {
reply := make(chan bool)
a.m.Lock()
a.wg.Add(1)
if atomic.CompareAndSwapUint32(&a.sema, 0, 1) {
a.forAlert = make(chan struct{})
}
go a.hold(reply, kick)
a.m.Unlock()
return reply
}
// hold waits on either channel in the event that the routine has finished or an
// alert from a depth update has occurred.
func (a *Alert) hold(ch chan<- bool, kick <-chan struct{}) {
select {
// In a select statement, if by chance there is no receiver or its late,
// we can still close and return, limiting dead-lock potential.
case <-a.forAlert: // Main waiting channel from alert
select {
case ch <- false:
default:
}
case <-kick: // This can be nil.
select {
case ch <- true:
default:
}
}
a.wg.Done()
close(ch)
}

View File

@@ -2,9 +2,7 @@ package orderbook
import (
"errors"
"log"
"reflect"
"sync"
"testing"
"time"
@@ -307,87 +305,3 @@ func TestPublish(t *testing.T) {
d := Depth{}
d.Publish()
}
func TestWait(t *testing.T) {
wait := Alert{}
var wg sync.WaitGroup
// standard alert
wg.Add(100)
for x := 0; x < 100; x++ {
go func() {
w := wait.Wait(nil)
wg.Done()
if <-w {
log.Fatal("incorrect routine wait response for alert expecting false")
}
wg.Done()
}()
}
wg.Wait()
wg.Add(100)
isLeaky(&wait, nil, t)
wait.alert()
wg.Wait()
isLeaky(&wait, nil, t)
// use kick
ch := make(chan struct{})
wg.Add(100)
for x := 0; x < 100; x++ {
go func() {
w := wait.Wait(ch)
wg.Done()
if !<-w {
log.Fatal("incorrect routine wait response for kick expecting true")
}
wg.Done()
}()
}
wg.Wait()
wg.Add(100)
isLeaky(&wait, ch, t)
close(ch)
wg.Wait()
ch = make(chan struct{})
isLeaky(&wait, ch, t)
// late receivers
wg.Add(100)
for x := 0; x < 100; x++ {
go func(x int) {
bb := wait.Wait(ch)
wg.Done()
if x%2 == 0 {
time.Sleep(time.Millisecond * 5)
}
b := <-bb
if b {
log.Fatal("incorrect routine wait response since we call alert below; expecting false")
}
wg.Done()
}(x)
}
wg.Wait()
wg.Add(100)
isLeaky(&wait, ch, t)
wait.alert()
wg.Wait()
isLeaky(&wait, ch, t)
}
// isLeaky tests to see if the wait functionality is returning an abnormal
// channel that is operational when it shouldn't be.
func isLeaky(a *Alert, ch chan struct{}, t *testing.T) {
t.Helper()
check := a.Wait(ch)
time.Sleep(time.Millisecond * 5) // When we call wait a routine for hold is
// spawned, so for a test we need to add in a time for goschedular to allow
// routine to actually wait on the forAlert and kick channels
select {
case <-check:
t.Fatal("leaky waiter")
default:
}
}

View File

@@ -3,6 +3,8 @@ package orderbook
import (
"sync"
"time"
"github.com/thrasher-corp/gocryptotrader/exchanges/alert"
)
// Unsafe is an exported linked list reference to the current bid/ask heads and
@@ -20,7 +22,7 @@ type Unsafe struct {
// protocol then this book is not considered live and cannot be trusted.
UpdatedViaREST *bool
LastUpdated *time.Time
*Alert
*alert.Notice
}
// Lock locks down the underlying linked list which inhibits all pending updates
@@ -55,7 +57,7 @@ func (d *Depth) GetUnsafe() Unsafe {
BidHead: &d.bids.linkedList.head,
AskHead: &d.asks.linkedList.head,
m: &d.m,
Alert: &d.Alert,
Notice: &d.Notice,
UpdatedViaREST: &d.options.restSnapshot,
LastUpdated: &d.options.lastUpdated,
}