mirror of
https://github.com/d0zingcat/gocryptotrader.git
synced 2026-05-14 07:26:47 +00:00
* orderbook/buffer: data integrity and resubscription pass * btcmarkets: REMOVE THAT LIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIINE!!!!!!!!!!!!!!!!! * buffer: reinstate publish, refaactor, invalidate more and comments * buffer/orderbook: improve update and snapshot performance. Move Update type to orderbook package to util. pointer through entire function calls. (cleanup). Change action string to uint8 for easier comparison. Add parsing helper. Update current test benchmark comments. * dispatch: change publish func to variadic id param * dispatch: remove sender receiver wait time as this adds overhead and complexity. update tests. * dispatch: don't create pointers for every job container * rpcserver: fix assertion issues with data publishing change * linter: fixes * glorious: nits addr * depth: change validation handling to incorporate and store err * linter: fix more issues * dispatch: fix race * travis: update before fetching * depth: wrap and return wrapped error in invalidate call and fix tests * btcmarkets: fix commenting * workflow: check * workflow: check * orderbook: check error * buffer/depth: return invalidation error and fix tests * gctcli: display errors on orderbook streams * buffer: remove unused types * orderbook/bitmex: shift function to bitmex * orderbook: Add specific comments to unexported functions that don't have locking require locking. * orderbook: restrict published data functionality to orderbook.Outbound interface * common: add assertion failure helper for error * dispatch: remove atomics, add mutex protection, remove add/remove worker, redo main tests * dispatch: export function * engine: revert and change sub logger to manager * engine: remove old test * dispatch: add common variable ;) * btcmarket: don't overflow int in tests on 32bit systems * ci: force 1.17.7 usage for go * Revert "ci: force 1.17.7 usage for go" This reverts commit af2f95563bf218cf2b9f36a9fcf3258e2c6a2d91. * golangci: bump version add and remove linter items * Revert "golangci: bump version add and remove linter items" This reverts commit 3c98bffc9d030e39faca0387ea40c151df2ab06b. * dispatch: remove unsused mutex from mux * order: slight optimizations * nits: glorious * dispatch: fix regression on uuid generation and input inline with master * linter: fix * linter: fix * glorious: nit - rm slice segration * account: fix test after merge * coinbasepro: revert change * account: close channel instead of needing a receiver, push alert in routine to prepare for waiter. Co-authored-by: Ryan O'Hara-Reid <ryan.oharareid@thrasher.io>
337 lines
7.3 KiB
Go
337 lines
7.3 KiB
Go
package dispatch
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gofrs/uuid"
|
|
"github.com/thrasher-corp/gocryptotrader/log"
|
|
)
|
|
|
|
var (
|
|
// ErrNotRunning defines an error when the dispatcher is not running
|
|
ErrNotRunning = errors.New("dispatcher not running")
|
|
|
|
errDispatcherNotInitialized = errors.New("dispatcher not initialised")
|
|
errDispatcherAlreadyRunning = errors.New("dispatcher already running")
|
|
errDispatchShutdown = errors.New("dispatcher did not shutdown properly, routines failed to close")
|
|
errDispatcherUUIDNotFoundInRouteList = errors.New("dispatcher uuid not found in route list")
|
|
errTypeAssertionFailure = errors.New("type assertion failure")
|
|
errChannelNotFoundInUUIDRef = errors.New("dispatcher channel not found in uuid reference slice")
|
|
errUUIDCollision = errors.New("dispatcher collision detected, uuid already exists")
|
|
errDispatcherJobsAtLimit = errors.New("dispatcher jobs at limit")
|
|
errChannelIsNil = errors.New("channel is nil")
|
|
errUUIDGeneratorFunctionIsNil = errors.New("UUID generator function is nil")
|
|
|
|
limitMessage = "%w [%d] current worker count [%d]. Spawn more workers via --dispatchworkers=x, or increase the jobs limit via --dispatchjobslimit=x"
|
|
)
|
|
|
|
// Name is an exported subsystem name
|
|
const Name = "dispatch"
|
|
|
|
func init() {
|
|
dispatcher = NewDispatcher()
|
|
}
|
|
|
|
// NewDispatcher creates a new Dispatcher for relaying data.
|
|
func NewDispatcher() *Dispatcher {
|
|
return &Dispatcher{
|
|
routes: make(map[uuid.UUID][]chan interface{}),
|
|
outbound: sync.Pool{
|
|
New: getChan,
|
|
},
|
|
}
|
|
}
|
|
|
|
func getChan() interface{} {
|
|
// Create unbuffered channel for data pass
|
|
return make(chan interface{})
|
|
}
|
|
|
|
// Start starts the dispatch system by spawning workers and allocating memory
|
|
func Start(workers, jobsLimit int) error {
|
|
return dispatcher.start(workers, jobsLimit)
|
|
}
|
|
|
|
// Stop attempts to stop the dispatch service, this will close all pipe channels
|
|
// flush job list and drop all workers
|
|
func Stop() error {
|
|
log.Debugln(log.DispatchMgr, "Dispatch manager shutting down...")
|
|
return dispatcher.stop()
|
|
}
|
|
|
|
// IsRunning checks to see if the dispatch service is running
|
|
func IsRunning() bool {
|
|
return dispatcher.isRunning()
|
|
}
|
|
|
|
// start compares atomic running value, sets defaults, overides with
|
|
// configuration, then spawns workers
|
|
func (d *Dispatcher) start(workers, channelCapacity int) error {
|
|
if d == nil {
|
|
return errDispatcherNotInitialized
|
|
}
|
|
|
|
d.m.Lock()
|
|
defer d.m.Unlock()
|
|
|
|
if d.running {
|
|
return errDispatcherAlreadyRunning
|
|
}
|
|
|
|
d.running = true
|
|
|
|
if workers < 1 {
|
|
log.Warnf(log.DispatchMgr,
|
|
"workers cannot be zero, using default value %d\n",
|
|
DefaultMaxWorkers)
|
|
workers = DefaultMaxWorkers
|
|
}
|
|
if channelCapacity < 1 {
|
|
log.Warnf(log.DispatchMgr,
|
|
"jobs limit cannot be zero, using default values %d\n",
|
|
DefaultJobsLimit)
|
|
channelCapacity = DefaultJobsLimit
|
|
}
|
|
d.jobs = make(chan job, channelCapacity)
|
|
d.maxWorkers = workers
|
|
d.shutdown = make(chan struct{})
|
|
|
|
for i := 0; i < d.maxWorkers; i++ {
|
|
d.wg.Add(1)
|
|
go d.relayer()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// stop stops the service and shuts down all worker routines
|
|
func (d *Dispatcher) stop() error {
|
|
if d == nil {
|
|
return errDispatcherNotInitialized
|
|
}
|
|
|
|
d.m.Lock()
|
|
defer d.m.Unlock()
|
|
|
|
if !d.running {
|
|
return ErrNotRunning
|
|
}
|
|
|
|
d.running = false
|
|
|
|
// Stop all jobs
|
|
close(d.jobs)
|
|
|
|
// Release finished workers
|
|
close(d.shutdown)
|
|
|
|
d.rMtx.Lock()
|
|
for key, pipes := range d.routes {
|
|
for i := range pipes {
|
|
// Boot off receivers waiting on pipes.
|
|
close(pipes[i])
|
|
}
|
|
// Flush all pipes, re-subscription will need to occur.
|
|
d.routes[key] = nil
|
|
}
|
|
d.rMtx.Unlock()
|
|
|
|
ch := make(chan struct{})
|
|
timer := time.NewTimer(time.Second)
|
|
go func(ch chan<- struct{}) { d.wg.Wait(); ch <- struct{}{} }(ch)
|
|
select {
|
|
case <-ch:
|
|
log.Debugln(log.DispatchMgr, "Dispatch manager shutdown.")
|
|
return nil
|
|
case <-timer.C:
|
|
return errDispatchShutdown
|
|
}
|
|
}
|
|
|
|
// isRunning returns if the dispatch system is running
|
|
func (d *Dispatcher) isRunning() bool {
|
|
if d == nil {
|
|
return false
|
|
}
|
|
|
|
d.m.RLock()
|
|
defer d.m.RUnlock()
|
|
return d.running
|
|
}
|
|
|
|
// relayer routine relays communications across the defined routes
|
|
func (d *Dispatcher) relayer() {
|
|
for {
|
|
select {
|
|
case j := <-d.jobs:
|
|
d.rMtx.RLock()
|
|
if pipes, ok := d.routes[j.ID]; ok {
|
|
for i := range pipes {
|
|
select {
|
|
case pipes[i] <- j.Data:
|
|
default:
|
|
// no receiver; don't wait. This limits complexity.
|
|
}
|
|
}
|
|
}
|
|
d.rMtx.RUnlock()
|
|
case <-d.shutdown:
|
|
d.wg.Done()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// publish relays data to the subscribed subsystems
|
|
func (d *Dispatcher) publish(id uuid.UUID, data interface{}) error {
|
|
if d == nil {
|
|
return errDispatcherNotInitialized
|
|
}
|
|
|
|
if id.IsNil() {
|
|
return errIDNotSet
|
|
}
|
|
|
|
if data == nil {
|
|
return errNoData
|
|
}
|
|
|
|
d.m.RLock()
|
|
defer d.m.RUnlock()
|
|
|
|
if !d.running {
|
|
return nil
|
|
}
|
|
|
|
select {
|
|
case d.jobs <- job{data, id}: // Push job into job channel.
|
|
return nil
|
|
default:
|
|
return fmt.Errorf(limitMessage,
|
|
errDispatcherJobsAtLimit,
|
|
len(d.jobs),
|
|
d.maxWorkers)
|
|
}
|
|
}
|
|
|
|
// Subscribe subscribes a system and returns a communication chan, this does not
|
|
// ensure initial push.
|
|
func (d *Dispatcher) subscribe(id uuid.UUID) (<-chan interface{}, error) {
|
|
if d == nil {
|
|
return nil, errDispatcherNotInitialized
|
|
}
|
|
|
|
if id.IsNil() {
|
|
return nil, errIDNotSet
|
|
}
|
|
|
|
d.m.RLock()
|
|
defer d.m.RUnlock()
|
|
|
|
if !d.running {
|
|
return nil, ErrNotRunning
|
|
}
|
|
|
|
d.rMtx.Lock()
|
|
defer d.rMtx.Unlock()
|
|
if _, ok := d.routes[id]; !ok {
|
|
return nil, errDispatcherUUIDNotFoundInRouteList
|
|
}
|
|
|
|
// Get an unused channel from the channel pool
|
|
ch, ok := d.outbound.Get().(chan interface{})
|
|
if !ok {
|
|
return nil, errTypeAssertionFailure
|
|
}
|
|
|
|
d.routes[id] = append(d.routes[id], ch)
|
|
return ch, nil
|
|
}
|
|
|
|
// Unsubscribe unsubs a routine from the dispatcher
|
|
func (d *Dispatcher) unsubscribe(id uuid.UUID, usedChan <-chan interface{}) error {
|
|
if d == nil {
|
|
return errDispatcherNotInitialized
|
|
}
|
|
|
|
if id.IsNil() {
|
|
return errIDNotSet
|
|
}
|
|
|
|
if usedChan == nil {
|
|
return errChannelIsNil
|
|
}
|
|
|
|
d.m.RLock()
|
|
defer d.m.RUnlock()
|
|
|
|
if !d.running {
|
|
// reference will already be released in the stop function
|
|
return nil
|
|
}
|
|
|
|
d.rMtx.Lock()
|
|
defer d.rMtx.Unlock()
|
|
pipes, ok := d.routes[id]
|
|
if !ok {
|
|
return errDispatcherUUIDNotFoundInRouteList
|
|
}
|
|
|
|
for i := range pipes {
|
|
if pipes[i] != usedChan {
|
|
continue
|
|
}
|
|
// Delete individual reference
|
|
pipes[i] = pipes[len(pipes)-1]
|
|
pipes[len(pipes)-1] = nil
|
|
d.routes[id] = pipes[:len(pipes)-1]
|
|
|
|
// Drain and put the used chan back in pool; only if it is not closed.
|
|
select {
|
|
case _, ok = <-usedChan:
|
|
default:
|
|
}
|
|
|
|
if ok {
|
|
d.outbound.Put(usedChan)
|
|
}
|
|
return nil
|
|
}
|
|
return errChannelNotFoundInUUIDRef
|
|
}
|
|
|
|
// GetNewID returns a new ID
|
|
func (d *Dispatcher) getNewID(genFn func() (uuid.UUID, error)) (uuid.UUID, error) {
|
|
if d == nil {
|
|
return uuid.Nil, errDispatcherNotInitialized
|
|
}
|
|
|
|
if genFn == nil {
|
|
return uuid.Nil, errUUIDGeneratorFunctionIsNil
|
|
}
|
|
|
|
// Continue to allow the generation, input and return of UUIDs even if
|
|
// service is not currently enabled.
|
|
|
|
d.m.RLock()
|
|
defer d.m.RUnlock()
|
|
|
|
// Generate new uuid
|
|
newID, err := genFn()
|
|
if err != nil {
|
|
return uuid.Nil, err
|
|
}
|
|
|
|
d.rMtx.Lock()
|
|
defer d.rMtx.Unlock()
|
|
// Check to see if it already exists
|
|
if _, ok := d.routes[newID]; ok {
|
|
return uuid.Nil, errUUIDCollision
|
|
}
|
|
// Write the key into system
|
|
d.routes[newID] = nil
|
|
return newID, nil
|
|
}
|