mirror of
https://github.com/d0zingcat/gocryptotrader.git
synced 2026-05-13 23:16:45 +00:00
* orderbook/buffer: data integrity and resubscription pass * btcmarkets: REMOVE THAT LIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIINE!!!!!!!!!!!!!!!!! * buffer: reinstate publish, refaactor, invalidate more and comments * buffer/orderbook: improve update and snapshot performance. Move Update type to orderbook package to util. pointer through entire function calls. (cleanup). Change action string to uint8 for easier comparison. Add parsing helper. Update current test benchmark comments. * dispatch: change publish func to variadic id param * dispatch: remove sender receiver wait time as this adds overhead and complexity. update tests. * dispatch: don't create pointers for every job container * rpcserver: fix assertion issues with data publishing change * linter: fixes * glorious: nits addr * depth: change validation handling to incorporate and store err * linter: fix more issues * dispatch: fix race * travis: update before fetching * depth: wrap and return wrapped error in invalidate call and fix tests * btcmarkets: fix commenting * workflow: check * workflow: check * orderbook: check error * buffer/depth: return invalidation error and fix tests * gctcli: display errors on orderbook streams * buffer: remove unused types * orderbook/bitmex: shift function to bitmex * orderbook: Add specific comments to unexported functions that don't have locking require locking. * orderbook: restrict published data functionality to orderbook.Outbound interface * common: add assertion failure helper for error * dispatch: remove atomics, add mutex protection, remove add/remove worker, redo main tests * dispatch: export function * engine: revert and change sub logger to manager * engine: remove old test * dispatch: add common variable ;) * btcmarket: don't overflow int in tests on 32bit systems * ci: force 1.17.7 usage for go * Revert "ci: force 1.17.7 usage for go" This reverts commit af2f95563bf218cf2b9f36a9fcf3258e2c6a2d91. * golangci: bump version add and remove linter items * Revert "golangci: bump version add and remove linter items" This reverts commit 3c98bffc9d030e39faca0387ea40c151df2ab06b. * dispatch: remove unsused mutex from mux * order: slight optimizations * nits: glorious * dispatch: fix regression on uuid generation and input inline with master * linter: fix * linter: fix * glorious: nit - rm slice segration * account: fix test after merge * coinbasepro: revert change * account: close channel instead of needing a receiver, push alert in routine to prepare for waiter. Co-authored-by: Ryan O'Hara-Reid <ryan.oharareid@thrasher.io>
528 lines
12 KiB
Go
528 lines
12 KiB
Go
package dispatch
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"testing"
|
|
|
|
"github.com/gofrs/uuid"
|
|
)
|
|
|
|
var (
|
|
errTest = errors.New("test error")
|
|
nonEmptyUUID = [uuid.Size]byte{108, 105, 99, 107, 77, 121, 72, 97, 105, 114, 121, 66, 97, 108, 108, 115}
|
|
)
|
|
|
|
func TestGlobalDispatcher(t *testing.T) {
|
|
err := Start(0, 0)
|
|
if err != nil {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
|
|
running := IsRunning()
|
|
if !running {
|
|
t.Fatalf("received: '%v' but expected: '%v'", IsRunning(), true)
|
|
}
|
|
|
|
err = Stop()
|
|
if err != nil {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
|
|
running = IsRunning()
|
|
if running {
|
|
t.Fatalf("received: '%v' but expected: '%v'", IsRunning(), false)
|
|
}
|
|
}
|
|
|
|
func TestStartStop(t *testing.T) {
|
|
t.Parallel()
|
|
var d *Dispatcher
|
|
|
|
if d.isRunning() {
|
|
t.Fatalf("received: '%v' but expected: '%v'", d.isRunning(), false)
|
|
}
|
|
|
|
err := d.stop()
|
|
if !errors.Is(err, errDispatcherNotInitialized) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, errDispatcherNotInitialized)
|
|
}
|
|
|
|
err = d.start(10, 0)
|
|
if !errors.Is(err, errDispatcherNotInitialized) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, errDispatcherNotInitialized)
|
|
}
|
|
|
|
d = NewDispatcher()
|
|
|
|
err = d.stop()
|
|
if !errors.Is(err, ErrNotRunning) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, ErrNotRunning)
|
|
}
|
|
|
|
if d.isRunning() {
|
|
t.Fatalf("received: '%v' but expected: '%v'", d.isRunning(), false)
|
|
}
|
|
|
|
err = d.start(1, 100)
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
|
|
if !d.isRunning() {
|
|
t.Fatalf("received: '%v' but expected: '%v'", d.isRunning(), true)
|
|
}
|
|
|
|
err = d.start(0, 0)
|
|
if !errors.Is(err, errDispatcherAlreadyRunning) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, errDispatcherAlreadyRunning)
|
|
}
|
|
|
|
// Add route option
|
|
id, err := d.getNewID(uuid.NewV4)
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
|
|
// Add pipe
|
|
_, err = d.subscribe(id)
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
|
|
// Max out jobs channel
|
|
for x := 0; x < 99; x++ {
|
|
err = d.publish(id, "woah-nelly")
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
}
|
|
|
|
err = d.stop()
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
|
|
if d.isRunning() {
|
|
t.Fatalf("received: '%v' but expected: '%v'", d.isRunning(), false)
|
|
}
|
|
}
|
|
|
|
func TestSubscribe(t *testing.T) {
|
|
t.Parallel()
|
|
var d *Dispatcher
|
|
_, err := d.subscribe(uuid.Nil)
|
|
if !errors.Is(err, errDispatcherNotInitialized) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, errDispatcherNotInitialized)
|
|
}
|
|
|
|
d = NewDispatcher()
|
|
|
|
_, err = d.subscribe(uuid.Nil)
|
|
if !errors.Is(err, errIDNotSet) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, errIDNotSet)
|
|
}
|
|
|
|
_, err = d.subscribe(nonEmptyUUID)
|
|
if !errors.Is(err, ErrNotRunning) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, ErrNotRunning)
|
|
}
|
|
|
|
err = d.start(0, 0)
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
|
|
id, err := d.getNewID(uuid.NewV4)
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
|
|
_, err = d.subscribe(nonEmptyUUID)
|
|
if !errors.Is(err, errDispatcherUUIDNotFoundInRouteList) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, errDispatcherUUIDNotFoundInRouteList)
|
|
}
|
|
|
|
d.outbound.New = func() interface{} { return "omg" }
|
|
_, err = d.subscribe(id)
|
|
if !errors.Is(err, errTypeAssertionFailure) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, errTypeAssertionFailure)
|
|
}
|
|
|
|
d.outbound.New = getChan
|
|
ch, err := d.subscribe(id)
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
|
|
if ch == nil {
|
|
t.Fatal("expected channel value")
|
|
}
|
|
}
|
|
|
|
func TestUnsubscribe(t *testing.T) {
|
|
t.Parallel()
|
|
var d *Dispatcher
|
|
|
|
err := d.unsubscribe(uuid.Nil, nil)
|
|
if !errors.Is(err, errDispatcherNotInitialized) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, errDispatcherNotInitialized)
|
|
}
|
|
|
|
d = NewDispatcher()
|
|
|
|
err = d.unsubscribe(uuid.Nil, nil)
|
|
if !errors.Is(err, errIDNotSet) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, errIDNotSet)
|
|
}
|
|
|
|
err = d.unsubscribe(nonEmptyUUID, nil)
|
|
if !errors.Is(err, errChannelIsNil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, errChannelIsNil)
|
|
}
|
|
|
|
// will return nil if not running
|
|
err = d.unsubscribe(nonEmptyUUID, make(<-chan interface{}))
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
|
|
err = d.start(0, 0)
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
|
|
err = d.unsubscribe(nonEmptyUUID, make(<-chan interface{}))
|
|
if !errors.Is(err, errDispatcherUUIDNotFoundInRouteList) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, errDispatcherUUIDNotFoundInRouteList)
|
|
}
|
|
|
|
id, err := d.getNewID(uuid.NewV4)
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
|
|
err = d.unsubscribe(id, make(<-chan interface{}))
|
|
if !errors.Is(err, errChannelNotFoundInUUIDRef) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, errChannelNotFoundInUUIDRef)
|
|
}
|
|
|
|
// Skip over this when matching pipes
|
|
_, err = d.subscribe(id)
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
|
|
ch, err := d.subscribe(id)
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
|
|
err = d.unsubscribe(id, ch)
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
}
|
|
|
|
func TestPublish(t *testing.T) {
|
|
t.Parallel()
|
|
var d *Dispatcher
|
|
|
|
err := d.publish(uuid.Nil, nil)
|
|
if !errors.Is(err, errDispatcherNotInitialized) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, errDispatcherNotInitialized)
|
|
}
|
|
|
|
d = NewDispatcher()
|
|
|
|
err = d.publish(nonEmptyUUID, "lol")
|
|
if !errors.Is(err, nil) { // If not running, don't send back an error.
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
|
|
err = d.start(2, 10)
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
|
|
err = d.publish(uuid.Nil, nil)
|
|
if !errors.Is(err, errIDNotSet) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, errIDNotSet)
|
|
}
|
|
|
|
err = d.publish(nonEmptyUUID, nil)
|
|
if !errors.Is(err, errNoData) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, errNoData)
|
|
}
|
|
|
|
// max out worker processing
|
|
for x := 0; x < 100; x++ {
|
|
err2 := d.publish(nonEmptyUUID, "lol")
|
|
if !errors.Is(err2, nil) {
|
|
err = err2
|
|
break
|
|
}
|
|
}
|
|
|
|
if !errors.Is(err, errDispatcherJobsAtLimit) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, errDispatcherJobsAtLimit)
|
|
}
|
|
}
|
|
|
|
func TestPublishReceive(t *testing.T) {
|
|
t.Parallel()
|
|
d := NewDispatcher()
|
|
if err := d.start(0, 0); !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
|
|
id, err := d.getNewID(uuid.NewV4)
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
|
|
incoming, err := d.subscribe(id)
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
|
|
go func(d *Dispatcher, id uuid.UUID) {
|
|
for x := 0; x < 10; x++ {
|
|
err2 := d.publish(id, "WOW")
|
|
if !errors.Is(err2, nil) {
|
|
panic(err2)
|
|
}
|
|
}
|
|
}(d, id)
|
|
|
|
data, ok := (<-incoming).(string)
|
|
if !ok {
|
|
t.Fatal("type assertion failure expected string")
|
|
}
|
|
|
|
if data != "WOW" {
|
|
t.Fatal("unexpected value")
|
|
}
|
|
}
|
|
|
|
func TestGetNewID(t *testing.T) {
|
|
t.Parallel()
|
|
var d *Dispatcher
|
|
|
|
_, err := d.getNewID(uuid.NewV4)
|
|
if !errors.Is(err, errDispatcherNotInitialized) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, errDispatcherNotInitialized)
|
|
}
|
|
|
|
d = NewDispatcher()
|
|
|
|
err = d.start(0, 0)
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
|
|
_, err = d.getNewID(nil)
|
|
if !errors.Is(err, errUUIDGeneratorFunctionIsNil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, errUUIDGeneratorFunctionIsNil)
|
|
}
|
|
|
|
_, err = d.getNewID(func() (uuid.UUID, error) { return uuid.Nil, errTest })
|
|
if !errors.Is(err, errTest) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, errTest)
|
|
}
|
|
|
|
_, err = d.getNewID(func() (uuid.UUID, error) { return [uuid.Size]byte{254}, nil })
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
|
|
_, err = d.getNewID(func() (uuid.UUID, error) { return [uuid.Size]byte{254}, nil })
|
|
if !errors.Is(err, errUUIDCollision) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, errUUIDCollision)
|
|
}
|
|
}
|
|
|
|
func TestMux(t *testing.T) {
|
|
t.Parallel()
|
|
var mux *Mux
|
|
_, err := mux.Subscribe(uuid.Nil)
|
|
if !errors.Is(err, errMuxIsNil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, errMuxIsNil)
|
|
}
|
|
|
|
err = mux.Unsubscribe(uuid.Nil, nil)
|
|
if !errors.Is(err, errMuxIsNil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, errMuxIsNil)
|
|
}
|
|
|
|
err = mux.Publish(nil)
|
|
if !errors.Is(err, errMuxIsNil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, errMuxIsNil)
|
|
}
|
|
|
|
_, err = mux.GetID()
|
|
if !errors.Is(err, errMuxIsNil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, errMuxIsNil)
|
|
}
|
|
|
|
d := NewDispatcher()
|
|
err = d.start(0, 0)
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
|
|
mux = GetNewMux(d)
|
|
|
|
err = mux.Publish(nil)
|
|
if !errors.Is(err, errNoData) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, errNoData)
|
|
}
|
|
|
|
err = mux.Publish("lol")
|
|
if !errors.Is(err, errNoIDs) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, errNoIDs)
|
|
}
|
|
|
|
id, err := mux.GetID()
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
|
|
_, err = mux.Subscribe(uuid.Nil)
|
|
if !errors.Is(err, errIDNotSet) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, errIDNotSet)
|
|
}
|
|
|
|
pipe, err := mux.Subscribe(id)
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
|
|
var errChan = make(chan error)
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
// Makes sure receiver is waiting for update
|
|
go func(ch <-chan interface{}, errChan chan error, wg *sync.WaitGroup) {
|
|
wg.Done()
|
|
response, ok := (<-ch).(string)
|
|
if !ok {
|
|
errChan <- errors.New("type assertion failure")
|
|
return
|
|
}
|
|
|
|
if response != "string" {
|
|
errChan <- errors.New("unexpected return")
|
|
return
|
|
}
|
|
errChan <- nil
|
|
}(pipe.C, errChan, &wg)
|
|
|
|
wg.Wait()
|
|
|
|
payload := "string"
|
|
go func(payload string) {
|
|
err2 := mux.Publish(payload, id)
|
|
if err2 != nil {
|
|
fmt.Println(err2)
|
|
}
|
|
}(payload)
|
|
|
|
err = <-errChan
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
err = pipe.Release()
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
}
|
|
|
|
func TestMuxSubscribe(t *testing.T) {
|
|
t.Parallel()
|
|
d := NewDispatcher()
|
|
err := d.start(0, 0)
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
mux := GetNewMux(d)
|
|
itemID, err := mux.GetID()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
var pipes []Pipe
|
|
for i := 0; i < 1000; i++ {
|
|
newPipe, err := mux.Subscribe(itemID)
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
pipes = append(pipes, newPipe)
|
|
}
|
|
|
|
for i := range pipes {
|
|
err := pipes[i].Release()
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestMuxPublish(t *testing.T) {
|
|
t.Parallel()
|
|
d := NewDispatcher()
|
|
err := d.start(0, 0)
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
mux := GetNewMux(d)
|
|
itemID, err := mux.GetID()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
pipe, err := mux.Subscribe(itemID)
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
|
|
go func(mux *Mux) {
|
|
for i := 0; i < 100; i++ {
|
|
errMux := mux.Publish(i, itemID)
|
|
if errMux != nil {
|
|
t.Error(errMux)
|
|
}
|
|
}
|
|
}(mux)
|
|
|
|
<-pipe.C
|
|
|
|
// Shut down dispatch system
|
|
err = d.stop()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
// 2363419 468.7 ns/op 142 B/op 1 allocs/op
|
|
func BenchmarkSubscribe(b *testing.B) {
|
|
d := NewDispatcher()
|
|
err := d.start(0, 0)
|
|
if !errors.Is(err, nil) {
|
|
b.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
mux := GetNewMux(d)
|
|
newID, err := mux.GetID()
|
|
if err != nil {
|
|
b.Error(err)
|
|
}
|
|
|
|
for n := 0; n < b.N; n++ {
|
|
_, err := mux.Subscribe(newID)
|
|
if err != nil {
|
|
b.Error(err)
|
|
}
|
|
}
|
|
}
|