Files
gocryptotrader/dispatch/dispatch.go
Ryan O'Hara-Reid e93ee83563 script: implementation of error insertion on return (#986)
* exchanges/account: shift credentials to account package and segregate funds to keys

* merge: fixes

* linter: fix

* Update exchanges/account/account.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* glorious: nits + protection for string panic

* glorious_suggestion: add method for matching keys

* linter: fix tests

* account: add protected method for credentials minimizing access, display full account details to rpc.

* linter: spelling kweeeeeeen

* accounts/portfolio: clean/check portfolio code and quickly check balances from change. Add protected method for future matching.

* accounts: theres no point in pointerising everything

* linter: ok pointerise this then...

* exchanges: fix regression add in little notes.

* glorious: nits

* Update exchanges/account/credentials.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* Update exchanges/account/credentials_test.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* Update exchanges/account/credentials_test.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* glorious: nits

* gloriously: fix glorious glorious test gloriously

* script: initial implementation of error insertion on return

* script: make script context aware(ish) and update error handle in examples

* script: add tests

* script: add syntax highlighting to readme

* Update gctscript/vm/vm.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* Update gctscript/vm/vm.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* Update gctscript/examples/exchange/account_info.gct

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* Update gctscript/examples/exchange/cancel_order.gct

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* Update gctscript/examples/verbose.gct

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* Update gctscript/modules/gct/gct_test.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* glorious: nits

* rm: bros

* scripts: handle errors in examples when they are going to use the data after fetching

* linter: fix rides again

* SCOTT_SPELL_CHECK_LINTER: fix

* gctscript: fix tests

* glorious: niiiiiiiiiiiiits

* scriptmodules/gct: standardize runtime errors

* Update gctscript/modules/gct/exchange.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* Update gctscript/modules/gct/exchange.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* Update gctscript/modules/gct/exchange.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* Update gctscript/modules/gct/exchange.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* Update gctscript/modules/gct/gct.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* Update gctscript/modules/gct/gct.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* Update gctscript/modules/gct/gct.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* Update gctscript/modules/gct/gct.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* Update gctscript/modules/gct/gct.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* Update gctscript/modules/gct/gct.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* Update gctscript/modules/gct/gct.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* glorious: nits/reverts

* go mod: tidy

Co-authored-by: Ryan O'Hara-Reid <ryan.oharareid@thrasher.io>
Co-authored-by: Scott <gloriousCode@users.noreply.github.com>
2022-08-17 14:18:53 +10:00

337 lines
7.3 KiB
Go

package dispatch
import (
"errors"
"fmt"
"sync"
"time"
"github.com/gofrs/uuid"
"github.com/thrasher-corp/gocryptotrader/log"
)
var (
// ErrNotRunning defines an error when the dispatcher is not running
ErrNotRunning = errors.New("dispatcher not running")
errDispatcherNotInitialized = errors.New("dispatcher not initialised")
errDispatcherAlreadyRunning = errors.New("dispatcher already running")
errDispatchShutdown = errors.New("dispatcher did not shutdown properly, routines failed to close")
errDispatcherUUIDNotFoundInRouteList = errors.New("dispatcher uuid not found in route list")
errTypeAssertionFailure = errors.New("type assertion failure")
errChannelNotFoundInUUIDRef = errors.New("dispatcher channel not found in uuid reference slice")
errUUIDCollision = errors.New("dispatcher collision detected, uuid already exists")
errDispatcherJobsAtLimit = errors.New("dispatcher jobs at limit")
errChannelIsNil = errors.New("channel is nil")
errUUIDGeneratorFunctionIsNil = errors.New("UUID generator function is nil")
limitMessage = "%w [%d] current worker count [%d]. Spawn more workers via --dispatchworkers=x, or increase the jobs limit via --dispatchjobslimit=x"
)
// Name is an exported subsystem name
const Name = "dispatch"
func init() {
dispatcher = NewDispatcher()
}
// NewDispatcher creates a new Dispatcher for relaying data.
func NewDispatcher() *Dispatcher {
return &Dispatcher{
routes: make(map[uuid.UUID][]chan interface{}),
outbound: sync.Pool{
New: getChan,
},
}
}
func getChan() interface{} {
// Create unbuffered channel for data pass
return make(chan interface{})
}
// Start starts the dispatch system by spawning workers and allocating memory
func Start(workers, jobsLimit int) error {
return dispatcher.start(workers, jobsLimit)
}
// Stop attempts to stop the dispatch service, this will close all pipe channels
// flush job list and drop all workers
func Stop() error {
log.Debugln(log.DispatchMgr, "Dispatch manager shutting down...")
return dispatcher.stop()
}
// IsRunning checks to see if the dispatch service is running
func IsRunning() bool {
return dispatcher.isRunning()
}
// start compares atomic running value, sets defaults, overrides with
// configuration, then spawns workers
func (d *Dispatcher) start(workers, channelCapacity int) error {
if d == nil {
return errDispatcherNotInitialized
}
d.m.Lock()
defer d.m.Unlock()
if d.running {
return errDispatcherAlreadyRunning
}
d.running = true
if workers < 1 {
log.Warnf(log.DispatchMgr,
"workers cannot be zero, using default value %d\n",
DefaultMaxWorkers)
workers = DefaultMaxWorkers
}
if channelCapacity < 1 {
log.Warnf(log.DispatchMgr,
"jobs limit cannot be zero, using default values %d\n",
DefaultJobsLimit)
channelCapacity = DefaultJobsLimit
}
d.jobs = make(chan job, channelCapacity)
d.maxWorkers = workers
d.shutdown = make(chan struct{})
for i := 0; i < d.maxWorkers; i++ {
d.wg.Add(1)
go d.relayer()
}
return nil
}
// stop stops the service and shuts down all worker routines
func (d *Dispatcher) stop() error {
if d == nil {
return errDispatcherNotInitialized
}
d.m.Lock()
defer d.m.Unlock()
if !d.running {
return ErrNotRunning
}
d.running = false
// Stop all jobs
close(d.jobs)
// Release finished workers
close(d.shutdown)
d.rMtx.Lock()
for key, pipes := range d.routes {
for i := range pipes {
// Boot off receivers waiting on pipes.
close(pipes[i])
}
// Flush all pipes, re-subscription will need to occur.
d.routes[key] = nil
}
d.rMtx.Unlock()
ch := make(chan struct{})
timer := time.NewTimer(time.Second)
go func(ch chan<- struct{}) { d.wg.Wait(); ch <- struct{}{} }(ch)
select {
case <-ch:
log.Debugln(log.DispatchMgr, "Dispatch manager shutdown.")
return nil
case <-timer.C:
return errDispatchShutdown
}
}
// isRunning returns if the dispatch system is running
func (d *Dispatcher) isRunning() bool {
if d == nil {
return false
}
d.m.RLock()
defer d.m.RUnlock()
return d.running
}
// relayer routine relays communications across the defined routes
func (d *Dispatcher) relayer() {
for {
select {
case j := <-d.jobs:
d.rMtx.RLock()
if pipes, ok := d.routes[j.ID]; ok {
for i := range pipes {
select {
case pipes[i] <- j.Data:
default:
// no receiver; don't wait. This limits complexity.
}
}
}
d.rMtx.RUnlock()
case <-d.shutdown:
d.wg.Done()
return
}
}
}
// publish relays data to the subscribed subsystems
func (d *Dispatcher) publish(id uuid.UUID, data interface{}) error {
if d == nil {
return errDispatcherNotInitialized
}
if id.IsNil() {
return errIDNotSet
}
if data == nil {
return errNoData
}
d.m.RLock()
defer d.m.RUnlock()
if !d.running {
return nil
}
select {
case d.jobs <- job{data, id}: // Push job into job channel.
return nil
default:
return fmt.Errorf(limitMessage,
errDispatcherJobsAtLimit,
len(d.jobs),
d.maxWorkers)
}
}
// Subscribe subscribes a system and returns a communication chan, this does not
// ensure initial push.
func (d *Dispatcher) subscribe(id uuid.UUID) (<-chan interface{}, error) {
if d == nil {
return nil, errDispatcherNotInitialized
}
if id.IsNil() {
return nil, errIDNotSet
}
d.m.RLock()
defer d.m.RUnlock()
if !d.running {
return nil, ErrNotRunning
}
d.rMtx.Lock()
defer d.rMtx.Unlock()
if _, ok := d.routes[id]; !ok {
return nil, errDispatcherUUIDNotFoundInRouteList
}
// Get an unused channel from the channel pool
ch, ok := d.outbound.Get().(chan interface{})
if !ok {
return nil, errTypeAssertionFailure
}
d.routes[id] = append(d.routes[id], ch)
return ch, nil
}
// Unsubscribe unsubs a routine from the dispatcher
func (d *Dispatcher) unsubscribe(id uuid.UUID, usedChan <-chan interface{}) error {
if d == nil {
return errDispatcherNotInitialized
}
if id.IsNil() {
return errIDNotSet
}
if usedChan == nil {
return errChannelIsNil
}
d.m.RLock()
defer d.m.RUnlock()
if !d.running {
// reference will already be released in the stop function
return nil
}
d.rMtx.Lock()
defer d.rMtx.Unlock()
pipes, ok := d.routes[id]
if !ok {
return errDispatcherUUIDNotFoundInRouteList
}
for i := range pipes {
if pipes[i] != usedChan {
continue
}
// Delete individual reference
pipes[i] = pipes[len(pipes)-1]
pipes[len(pipes)-1] = nil
d.routes[id] = pipes[:len(pipes)-1]
// Drain and put the used chan back in pool; only if it is not closed.
select {
case _, ok = <-usedChan:
default:
}
if ok {
d.outbound.Put(usedChan)
}
return nil
}
return errChannelNotFoundInUUIDRef
}
// GetNewID returns a new ID
func (d *Dispatcher) getNewID(genFn func() (uuid.UUID, error)) (uuid.UUID, error) {
if d == nil {
return uuid.Nil, errDispatcherNotInitialized
}
if genFn == nil {
return uuid.Nil, errUUIDGeneratorFunctionIsNil
}
// Continue to allow the generation, input and return of UUIDs even if
// service is not currently enabled.
d.m.RLock()
defer d.m.RUnlock()
// Generate new uuid
newID, err := genFn()
if err != nil {
return uuid.Nil, err
}
d.rMtx.Lock()
defer d.rMtx.Unlock()
// Check to see if it already exists
if _, ok := d.routes[newID]; ok {
return uuid.Nil, errUUIDCollision
}
// Write the key into system
d.routes[newID] = nil
return newID, nil
}