mirror of
https://github.com/d0zingcat/gocryptotrader.git
synced 2026-05-13 15:09:42 +00:00
* gateio: Add multi asset websocket support WIP. * meow * Add tests and shenanigans * integrate flushing and for enabling/disabling pairs from rpc shenanigans * some changes * linter: fixes strikes again. * Change name ConnectionAssociation -> ConnectionCandidate for better clarity on purpose. Change connections map to point to candidate to track subscriptions for future dynamic connections holder and drop struct ConnectionDetails. * Add subscription tests (state functional) * glorious:nits + proxy handling * Spelling * linter: fixerino * instead of nil, dont do nil. * clean up nils * cya nils * don't need to set URL or check if its running * stream match update * update tests * linter: fix * glorious: nits + handle context cancellations * stop ping handler routine leak * * Fix bug where reader routine on error that is not a disconnection error but websocket frame error or anything really makes the reader routine return and then connection never cycles and the buffer gets filled. * Handle reconnection via an errors.Is check which is simpler and in that scope allow for quick disconnect reconnect without waiting for connection cycle. * Dial now uses code from DialContext but just calls context.Background() * Don't allow reader to return on parse binary response error. Just output error and return a non nil response * Allow rollback on connect on any error across all connections * fix shadow jutsu * glorious/gk: nitters - adds in ws mock server * linter: fix * fix deadlock on connection as the previous channel had no reader and would hang connection reader for eternity. * glorious: whooops * gk: nits * Leak issue and edge case * Websocket: Add SendMessageReturnResponses * whooooooopsie * gk: nitssssss * Update exchanges/stream/stream_match.go Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com> * Update exchanges/stream/stream_match_test.go Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com> * linter: appease the linter gods * gk: nits * gk: drain brain * started * more changes before merge match pr * gateio: still building out * gateio: finish spot * fix up tests in gateio * Add tests for stream package * rm unused field * glorious: nits * rn files, specifically set function names to asset and offload routing to websocket type. * linter: fix * Add futures websocket request support * gateio: integrate with IBOTExchange (cherry pick my nose) * linter: fix * glorious: nits * add counter and update gateio * fix collision issue * Update exchanges/stream/websocket.go Co-authored-by: Scott <gloriousCode@users.noreply.github.com> * glorious: nits * add tests * linter: fix * After merge * Add error connection info * upgrade to upstream merge * Fix edge case where it does not reconnect made by an already closed connection * stream coverage * glorious: nits * glorious: nits removed asset error handling in stream package * linter: fix * rm block * Add basic readme * fix asset enabled flush cycle for multi connection * spella: fix * linter: fix * Add glorious suggestions, fix some race thing * reinstate name before any routine gets spawned * stop on error in mock tests * glorious: nits * Set correct price * glorious: nits found in CI build * Add test for drain, bumped wait times as there seems to be something happening on macos CI builds, used context.WithTimeout because its instant. * mutex across shutdown and connect for protection * lint: fix * test time withoffset, reinstate stop * fix whoops * const trafficCheckInterval; rm testmain * y * fix lint * bump time check window * stream: fix intermittant test failures while testing routines and remove code that is not needed. * spells * cant do what I did * protect race due to routine. * update testURL * use mock websocket connection instead of test URL's * linter: fix * remove url because its throwing errors on CI builds * connections drop all the time, don't need to worry about not being able to echo back ws data as it can be easily reviewed _test file side. * remove another superfluous url thats not really set up for this * spawn overwatch routine when there is no errors, inline checker instead of waiting for a time period, add sleep inline with echo handler as this is really quick and wanted to ensure that latency is handing correctly * linter: fixerino uperino * fix ID bug, why I do this, I don't know. * glorious: panix * linter: things * whoops * dont need to make consecutive Unix() calls * websocket: fix potential panic on error and no responses and adding waitForResponses * rm json parser and handle in json package instead * in favour of json package unmarshalling * linter: fix * linter: fix again * * change field name OutboundRequestSignature to WrapperDefinedConnectionSignature for agnostic inbound and outbound connections. * change method name GetOutboundConnection to GetConnection for agnostic inbound and outbound connections. * drop outbound field map for improved performance just using a range and field check (less complex as well) * change field name connections to connectionToWrapper for better clarity * spells and magic and wands * glorious: nits * comparable check for signature * mv err var * glorious: nits and stuff * attempt to fix race * glorious: nits * gk: nits; engine log cleanup * gk: nits; OCD * gk: nits; move function change file names * gk: nits; 🚀 * gk: nits; convert variadic function and message inspection to interface and include a specific function for that handling so as to not need nil on every call * gk: nits; continued * gk: engine nits; rm loaded exchange * gk: nits; drop WebsocketLoginResponse * stream: Add match method EnsureMatchWithData * gk: nits; rn Inspect to IsFinal * gk: nits; rn to MessageFilter * linter: fix * gateio: update rate limit definitions (cherry-pick) * Add test and missing * Shared REST rate limit definitions with Websocket service, set lookup item to nil for systems that do not require rate limiting; add glorious nit * integrate rate limits for websocket trading spot * conform to match upstream changes * standardise names to upstream style * fix wrapper standards test when sending a auth request through a websocket connection * whoops * Update exchanges/gateio/gateio_types.go Co-authored-by: Scott <gloriousCode@users.noreply.github.com> * glorious: nits * linter: fix * linter: overload * whoops * spelling fixes on recent merge * glorious: nits * linter: fix? * glorious: nits * gk: assert errors touched * gk: unexport derive functions * gk: nitssssssss * fix test * gk: nitters v1 * gk: http status * gk/nits: Add getAssetFromFuturesPair * gk: nits single response when submitting * gk: new pair with delimiter in tests * gk: param update slice to slice of pointers * gk: add asset type in params, includes t.Context() for tests * linter: fix * linter: fix * fix merge whoopsie * glorious: nits * gk: nit * shift over to websocket package error * internal/exchange/websocket -> exchange/websocket * PEAK OCD! * appease the OCD gods * thrasher: nits --------- Co-authored-by: shazbert <ryan.oharareid@thrasher.io> Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com> Co-authored-by: Scott <gloriousCode@users.noreply.github.com>
360 lines
9.8 KiB
Go
360 lines
9.8 KiB
Go
package engine
|
|
|
|
import (
|
|
"errors"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/thrasher-corp/gocryptotrader/config"
|
|
"github.com/thrasher-corp/gocryptotrader/currency"
|
|
"github.com/thrasher-corp/gocryptotrader/exchange/websocket"
|
|
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
|
|
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
|
|
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
|
|
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
|
|
)
|
|
|
|
func TestWebsocketRoutineManagerSetup(t *testing.T) {
|
|
_, err := setupWebsocketRoutineManager(nil, nil, nil, nil, false)
|
|
if !errors.Is(err, errNilExchangeManager) {
|
|
t.Errorf("error '%v', expected '%v'", err, errNilExchangeManager)
|
|
}
|
|
|
|
_, err = setupWebsocketRoutineManager(NewExchangeManager(), (*OrderManager)(nil), nil, nil, false)
|
|
if !errors.Is(err, errNilCurrencyPairSyncer) {
|
|
t.Errorf("error '%v', expected '%v'", err, errNilCurrencyPairSyncer)
|
|
}
|
|
_, err = setupWebsocketRoutineManager(NewExchangeManager(), &OrderManager{}, &SyncManager{}, nil, false)
|
|
if !errors.Is(err, errNilCurrencyConfig) {
|
|
t.Errorf("error '%v', expected '%v'", err, errNilCurrencyConfig)
|
|
}
|
|
|
|
_, err = setupWebsocketRoutineManager(NewExchangeManager(), &OrderManager{}, &SyncManager{}, ¤cy.Config{}, true)
|
|
if !errors.Is(err, errNilCurrencyPairFormat) {
|
|
t.Errorf("error '%v', expected '%v'", err, errNilCurrencyPairFormat)
|
|
}
|
|
|
|
m, err := setupWebsocketRoutineManager(NewExchangeManager(), &OrderManager{}, &SyncManager{}, ¤cy.Config{CurrencyPairFormat: ¤cy.PairFormat{}}, false)
|
|
if !errors.Is(err, nil) {
|
|
t.Errorf("error '%v', expected '%v'", err, nil)
|
|
}
|
|
if m == nil {
|
|
t.Error("expecting manager")
|
|
}
|
|
}
|
|
|
|
func TestWebsocketRoutineManagerStart(t *testing.T) {
|
|
var m *WebsocketRoutineManager
|
|
err := m.Start()
|
|
if !errors.Is(err, ErrNilSubsystem) {
|
|
t.Errorf("error '%v', expected '%v'", err, ErrNilSubsystem)
|
|
}
|
|
cfg := ¤cy.Config{CurrencyPairFormat: ¤cy.PairFormat{
|
|
Uppercase: false,
|
|
Delimiter: "-",
|
|
}}
|
|
m, err = setupWebsocketRoutineManager(NewExchangeManager(), &OrderManager{}, &SyncManager{}, cfg, true)
|
|
if !errors.Is(err, nil) {
|
|
t.Errorf("error '%v', expected '%v'", err, nil)
|
|
}
|
|
err = m.Start()
|
|
if !errors.Is(err, nil) {
|
|
t.Errorf("error '%v', expected '%v'", err, nil)
|
|
}
|
|
err = m.Start()
|
|
if !errors.Is(err, ErrSubSystemAlreadyStarted) {
|
|
t.Errorf("error '%v', expected '%v'", err, ErrSubSystemAlreadyStarted)
|
|
}
|
|
}
|
|
|
|
func TestWebsocketRoutineManagerIsRunning(t *testing.T) {
|
|
var m *WebsocketRoutineManager
|
|
if m.IsRunning() {
|
|
t.Error("expected false")
|
|
}
|
|
|
|
m, err := setupWebsocketRoutineManager(NewExchangeManager(), &OrderManager{}, &SyncManager{}, ¤cy.Config{CurrencyPairFormat: ¤cy.PairFormat{}}, false)
|
|
if !errors.Is(err, nil) {
|
|
t.Errorf("error '%v', expected '%v'", err, nil)
|
|
}
|
|
if m.IsRunning() {
|
|
t.Error("expected false")
|
|
}
|
|
|
|
err = m.Start()
|
|
if !errors.Is(err, nil) {
|
|
t.Errorf("error '%v', expected '%v'", err, nil)
|
|
}
|
|
for atomic.LoadInt32(&m.state) == startingState {
|
|
<-time.After(time.Second / 100)
|
|
}
|
|
if !m.IsRunning() {
|
|
t.Error("expected true")
|
|
}
|
|
}
|
|
|
|
func TestWebsocketRoutineManagerStop(t *testing.T) {
|
|
var m *WebsocketRoutineManager
|
|
err := m.Stop()
|
|
if !errors.Is(err, ErrNilSubsystem) {
|
|
t.Errorf("error '%v', expected '%v'", err, ErrNilSubsystem)
|
|
}
|
|
|
|
m, err = setupWebsocketRoutineManager(NewExchangeManager(), &OrderManager{}, &SyncManager{}, ¤cy.Config{CurrencyPairFormat: ¤cy.PairFormat{}}, false)
|
|
if !errors.Is(err, nil) {
|
|
t.Errorf("error '%v', expected '%v'", err, nil)
|
|
}
|
|
err = m.Stop()
|
|
if !errors.Is(err, ErrSubSystemNotStarted) {
|
|
t.Errorf("error '%v', expected '%v'", err, ErrSubSystemNotStarted)
|
|
}
|
|
|
|
err = m.Start()
|
|
if !errors.Is(err, nil) {
|
|
t.Errorf("error '%v', expected '%v'", err, nil)
|
|
}
|
|
err = m.Stop()
|
|
if !errors.Is(err, nil) {
|
|
t.Errorf("error '%v', expected '%v'", err, nil)
|
|
}
|
|
}
|
|
|
|
func TestWebsocketRoutineManagerHandleData(t *testing.T) {
|
|
exchName := "Bitstamp"
|
|
var wg sync.WaitGroup
|
|
em := NewExchangeManager()
|
|
exch, err := em.NewExchangeByName(exchName)
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("error '%v', expected '%v'", err, nil)
|
|
}
|
|
exch.SetDefaults()
|
|
err = em.Add(exch)
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
om, err := SetupOrderManager(em, &CommunicationManager{}, &wg, &config.OrderManager{})
|
|
if !errors.Is(err, nil) {
|
|
t.Errorf("error '%v', expected '%v'", err, nil)
|
|
}
|
|
err = om.Start()
|
|
if !errors.Is(err, nil) {
|
|
t.Errorf("error '%v', expected '%v'", err, nil)
|
|
}
|
|
cfg := ¤cy.Config{CurrencyPairFormat: ¤cy.PairFormat{
|
|
Uppercase: false,
|
|
Delimiter: "-",
|
|
}}
|
|
m, err := setupWebsocketRoutineManager(em, om, &SyncManager{}, cfg, true)
|
|
if !errors.Is(err, nil) {
|
|
t.Errorf("error '%v', expected '%v'", err, nil)
|
|
}
|
|
err = m.Start()
|
|
if !errors.Is(err, nil) {
|
|
t.Errorf("error '%v', expected '%v'", err, nil)
|
|
}
|
|
orderID := "1337"
|
|
err = m.websocketDataHandler(exchName, errors.New("error"))
|
|
if err == nil {
|
|
t.Error("Error not handled correctly")
|
|
}
|
|
err = m.websocketDataHandler(exchName, websocket.FundingData{})
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
err = m.websocketDataHandler(exchName, &ticker.Price{
|
|
ExchangeName: exchName,
|
|
Pair: currency.NewPair(currency.BTC, currency.USDC),
|
|
AssetType: asset.Spot,
|
|
})
|
|
if !errors.Is(err, nil) {
|
|
t.Errorf("error '%v', expected '%v'", err, nil)
|
|
}
|
|
err = m.websocketDataHandler(exchName, websocket.KlineData{})
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
origOrder := &order.Detail{
|
|
Exchange: exchName,
|
|
OrderID: orderID,
|
|
Amount: 1337,
|
|
Price: 1337,
|
|
}
|
|
err = m.websocketDataHandler(exchName, origOrder)
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
// Send it again since it exists now
|
|
err = m.websocketDataHandler(exchName, &order.Detail{
|
|
Exchange: exchName,
|
|
OrderID: orderID,
|
|
Amount: 1338,
|
|
})
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
updated, err := m.orderManager.GetByExchangeAndID(origOrder.Exchange, origOrder.OrderID)
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
if updated.Amount != 1338 {
|
|
t.Error("Bad pipeline")
|
|
}
|
|
|
|
err = m.websocketDataHandler(exchName, &order.Detail{
|
|
Exchange: "Bitstamp",
|
|
OrderID: orderID,
|
|
Status: order.Active,
|
|
})
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
updated, err = m.orderManager.GetByExchangeAndID(origOrder.Exchange, origOrder.OrderID)
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
if updated.Status != order.Active {
|
|
t.Error("Expected order to be modified to Active")
|
|
}
|
|
|
|
// Send some gibberish
|
|
err = m.websocketDataHandler(exchName, order.Stop)
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
|
|
err = m.websocketDataHandler(exchName, websocket.UnhandledMessageWarning{
|
|
Message: "there's an issue here's a tissue",
|
|
})
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
|
|
classificationError := order.ClassificationError{
|
|
Exchange: "test",
|
|
OrderID: "one",
|
|
Err: errors.New("lol"),
|
|
}
|
|
err = m.websocketDataHandler(exchName, classificationError)
|
|
if err == nil {
|
|
t.Error("Expected error")
|
|
}
|
|
if !errors.Is(err, classificationError.Err) {
|
|
t.Errorf("error '%v', expected '%v'", err, classificationError.Err)
|
|
}
|
|
|
|
err = m.websocketDataHandler(exchName, &orderbook.Base{
|
|
Exchange: "Bitstamp",
|
|
Pair: currency.NewPair(currency.BTC, currency.USD),
|
|
})
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
err = m.websocketDataHandler(exchName, "this is a test string")
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
}
|
|
|
|
func TestRegisterWebsocketDataHandlerWithFunctionality(t *testing.T) {
|
|
t.Parallel()
|
|
var m *WebsocketRoutineManager
|
|
err := m.registerWebsocketDataHandler(nil, false)
|
|
if !errors.Is(err, ErrNilSubsystem) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, ErrNilSubsystem)
|
|
}
|
|
|
|
m = new(WebsocketRoutineManager)
|
|
m.shutdown = make(chan struct{})
|
|
|
|
err = m.registerWebsocketDataHandler(nil, false)
|
|
if !errors.Is(err, errNilWebsocketDataHandlerFunction) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, errNilWebsocketDataHandlerFunction)
|
|
}
|
|
|
|
// externally defined capture device
|
|
dataChan := make(chan any)
|
|
fn := func(_ string, data any) error {
|
|
switch data.(type) {
|
|
case string:
|
|
dataChan <- data
|
|
default:
|
|
}
|
|
return nil
|
|
}
|
|
|
|
err = m.registerWebsocketDataHandler(fn, true)
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
|
|
if len(m.dataHandlers) != 1 {
|
|
t.Fatal("unexpected data handlers registered")
|
|
}
|
|
|
|
mock := websocket.NewManager()
|
|
mock.ToRoutine = make(chan any)
|
|
m.state = readyState
|
|
err = m.websocketDataReceiver(mock)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
mock.ToRoutine <- nil
|
|
mock.ToRoutine <- 1336
|
|
mock.ToRoutine <- "intercepted"
|
|
|
|
if r := <-dataChan; r != "intercepted" {
|
|
t.Fatal("unexpected value received")
|
|
}
|
|
|
|
close(m.shutdown)
|
|
m.wg.Wait()
|
|
}
|
|
|
|
func TestSetWebsocketDataHandler(t *testing.T) {
|
|
t.Parallel()
|
|
var m *WebsocketRoutineManager
|
|
err := m.setWebsocketDataHandler(nil)
|
|
if !errors.Is(err, ErrNilSubsystem) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, ErrNilSubsystem)
|
|
}
|
|
|
|
m = new(WebsocketRoutineManager)
|
|
m.shutdown = make(chan struct{})
|
|
|
|
err = m.setWebsocketDataHandler(nil)
|
|
if !errors.Is(err, errNilWebsocketDataHandlerFunction) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, errNilWebsocketDataHandlerFunction)
|
|
}
|
|
|
|
err = m.registerWebsocketDataHandler(m.websocketDataHandler, false)
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
|
|
err = m.registerWebsocketDataHandler(m.websocketDataHandler, false)
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
|
|
err = m.registerWebsocketDataHandler(m.websocketDataHandler, false)
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
|
|
if len(m.dataHandlers) != 3 {
|
|
t.Fatal("unexpected data handler count")
|
|
}
|
|
|
|
err = m.setWebsocketDataHandler(m.websocketDataHandler)
|
|
if !errors.Is(err, nil) {
|
|
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
|
}
|
|
|
|
if len(m.dataHandlers) != 1 {
|
|
t.Fatal("unexpected data handler count")
|
|
}
|
|
}
|