mirror of
https://github.com/d0zingcat/gocryptotrader.git
synced 2026-05-13 23:16:45 +00:00
* orderbook/buffer: data integrity and resubscription pass * btcmarkets: REMOVE THAT LIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIINE!!!!!!!!!!!!!!!!! * buffer: reinstate publish, refaactor, invalidate more and comments * buffer/orderbook: improve update and snapshot performance. Move Update type to orderbook package to util. pointer through entire function calls. (cleanup). Change action string to uint8 for easier comparison. Add parsing helper. Update current test benchmark comments. * dispatch: change publish func to variadic id param * dispatch: remove sender receiver wait time as this adds overhead and complexity. update tests. * dispatch: don't create pointers for every job container * rpcserver: fix assertion issues with data publishing change * linter: fixes * glorious: nits addr * depth: change validation handling to incorporate and store err * linter: fix more issues * dispatch: fix race * travis: update before fetching * depth: wrap and return wrapped error in invalidate call and fix tests * btcmarkets: fix commenting * workflow: check * workflow: check * orderbook: check error * buffer/depth: return invalidation error and fix tests * gctcli: display errors on orderbook streams * buffer: remove unused types * orderbook/bitmex: shift function to bitmex * orderbook: Add specific comments to unexported functions that don't have locking require locking. * orderbook: restrict published data functionality to orderbook.Outbound interface * common: add assertion failure helper for error * dispatch: remove atomics, add mutex protection, remove add/remove worker, redo main tests * dispatch: export function * engine: revert and change sub logger to manager * engine: remove old test * dispatch: add common variable ;) * btcmarket: don't overflow int in tests on 32bit systems * ci: force 1.17.7 usage for go * Revert "ci: force 1.17.7 usage for go" This reverts commit af2f95563bf218cf2b9f36a9fcf3258e2c6a2d91. * golangci: bump version add and remove linter items * Revert "golangci: bump version add and remove linter items" This reverts commit 3c98bffc9d030e39faca0387ea40c151df2ab06b. * dispatch: remove unsused mutex from mux * order: slight optimizations * nits: glorious * dispatch: fix regression on uuid generation and input inline with master * linter: fix * linter: fix * glorious: nit - rm slice segration * account: fix test after merge * coinbasepro: revert change * account: close channel instead of needing a receiver, push alert in routine to prepare for waiter. Co-authored-by: Ryan O'Hara-Reid <ryan.oharareid@thrasher.io>
81 lines
2.1 KiB
Go
81 lines
2.1 KiB
Go
package dispatch
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gofrs/uuid"
|
|
)
|
|
|
|
const (
|
|
// DefaultJobsLimit defines a maxiumum amount of jobs allowed in channel
|
|
DefaultJobsLimit = 100
|
|
|
|
// DefaultMaxWorkers is the package default worker ceiling amount
|
|
DefaultMaxWorkers = 10
|
|
|
|
// DefaultHandshakeTimeout defines a workers max length of time to wait on a
|
|
// an unbuffered channel for a receiver before moving on to next route
|
|
DefaultHandshakeTimeout = 200 * time.Nanosecond
|
|
)
|
|
|
|
// dispatcher is our main in memory instance with a stop/start mtx below
|
|
var dispatcher *Dispatcher
|
|
|
|
// Dispatcher defines an internal subsystem communication/change state publisher
|
|
type Dispatcher struct {
|
|
// routes refers to a subystem uuid ticket map with associated publish
|
|
// channels, a relayer will be given a unique id through its job channel,
|
|
// then publish the data across the full registered channels for that uuid.
|
|
// See relayer() method below.
|
|
routes map[uuid.UUID][]chan interface{}
|
|
// rMtx protects the routes variable ensuring acceptable read/write access
|
|
rMtx sync.RWMutex
|
|
|
|
// Persistent buffered job queue for relayers
|
|
jobs chan job
|
|
|
|
// Dynamic channel pool; returns an unbuffered channel for routes map
|
|
outbound sync.Pool
|
|
|
|
// MaxWorkers defines max worker ceiling
|
|
maxWorkers int
|
|
|
|
// Dispatch status
|
|
running bool
|
|
|
|
// Unbufferd shutdown chan, sync wg for ensuring concurrency when only
|
|
// dropping a single relayer routine
|
|
shutdown chan struct{}
|
|
|
|
// Relayer shutdown tracking
|
|
wg sync.WaitGroup
|
|
|
|
// dispatcher write protection
|
|
m sync.RWMutex
|
|
}
|
|
|
|
// job defines a relaying job associated with a ticket which allows routing to
|
|
// routines that require specific data
|
|
type job struct {
|
|
Data interface{}
|
|
ID uuid.UUID
|
|
}
|
|
|
|
// Mux defines a new multiplexer for the dispatch system, these a generated
|
|
// per subsystem
|
|
type Mux struct {
|
|
// Reference to the main running dispatch service
|
|
d *Dispatcher
|
|
}
|
|
|
|
// Pipe defines an outbound object to the desired routine
|
|
type Pipe struct {
|
|
// Channel to get all our lovely informations
|
|
C <-chan interface{}
|
|
// ID to tracked system
|
|
id uuid.UUID
|
|
// Reference to multiplexer
|
|
m *Mux
|
|
}
|