mirror of
https://github.com/d0zingcat/gocryptotrader.git
synced 2026-05-14 15:09:51 +00:00
* gateio: Add multi asset websocket support WIP. * meow * Add tests and shenanigans * integrate flushing and for enabling/disabling pairs from rpc shenanigans * some changes * linter: fixes strikes again. * Change name ConnectionAssociation -> ConnectionCandidate for better clarity on purpose. Change connections map to point to candidate to track subscriptions for future dynamic connections holder and drop struct ConnectionDetails. * Add subscription tests (state functional) * glorious:nits + proxy handling * Spelling * linter: fixerino * instead of nil, dont do nil. * clean up nils * cya nils * don't need to set URL or check if its running * stream match update * update tests * linter: fix * glorious: nits + handle context cancellations * stop ping handler routine leak * * Fix bug where reader routine on error that is not a disconnection error but websocket frame error or anything really makes the reader routine return and then connection never cycles and the buffer gets filled. * Handle reconnection via an errors.Is check which is simpler and in that scope allow for quick disconnect reconnect without waiting for connection cycle. * Dial now uses code from DialContext but just calls context.Background() * Don't allow reader to return on parse binary response error. Just output error and return a non nil response * Allow rollback on connect on any error across all connections * fix shadow jutsu * glorious/gk: nitters - adds in ws mock server * linter: fix * fix deadlock on connection as the previous channel had no reader and would hang connection reader for eternity. * glorious: whooops * gk: nits * Leak issue and edge case * Websocket: Add SendMessageReturnResponses * whooooooopsie * gk: nitssssss * Update exchanges/stream/stream_match.go Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com> * Update exchanges/stream/stream_match_test.go Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com> * linter: appease the linter gods * gk: nits * gk: drain brain * started * more changes before merge match pr * gateio: still building out * gateio: finish spot * fix up tests in gateio * Add tests for stream package * rm unused field * glorious: nits * rn files, specifically set function names to asset and offload routing to websocket type. * linter: fix * glorious: nits * add counter and update gateio * fix collision issue * Update exchanges/stream/websocket.go Co-authored-by: Scott <gloriousCode@users.noreply.github.com> * glorious: nits * add tests * linter: fix * After merge * Add error connection info * upgrade to upstream merge * Fix edge case where it does not reconnect made by an already closed connection * stream coverage * glorious: nits * glorious: nits removed asset error handling in stream package * linter: fix * rm block * Add basic readme * fix asset enabled flush cycle for multi connection * spella: fix * linter: fix * Add glorious suggestions, fix some race thing * reinstate name before any routine gets spawned * stop on error in mock tests * glorious: nits * glorious: nits found in CI build * Add test for drain, bumped wait times as there seems to be something happening on macos CI builds, used context.WithTimeout because its instant. * mutex across shutdown and connect for protection * lint: fix * test time withoffset, reinstate stop * fix whoops * const trafficCheckInterval; rm testmain * y * fix lint * bump time check window * stream: fix intermittant test failures while testing routines and remove code that is not needed. * spells * cant do what I did * protect race due to routine. * update testURL * use mock websocket connection instead of test URL's * linter: fix * remove url because its throwing errors on CI builds * connections drop all the time, don't need to worry about not being able to echo back ws data as it can be easily reviewed _test file side. * remove another superfluous url thats not really set up for this * spawn overwatch routine when there is no errors, inline checker instead of waiting for a time period, add sleep inline with echo handler as this is really quick and wanted to ensure that latency is handing correctly * linter: fixerino uperino * glorious: panix * linter: things * whoops * dont need to make consecutive Unix() calls * websocket: fix potential panic on error and no responses and adding waitForResponses * rm json parser and handle in json package instead * linter: fix * linter: fix again * * change field name OutboundRequestSignature to WrapperDefinedConnectionSignature for agnostic inbound and outbound connections. * change method name GetOutboundConnection to GetConnection for agnostic inbound and outbound connections. * drop outbound field map for improved performance just using a range and field check (less complex as well) * change field name connections to connectionToWrapper for better clarity * spells and magic and wands * glorious: nits * comparable check for signature * mv err var * glorious: nits and stuff * attempt to fix race * glorious: nits * gk: nits; engine log cleanup * gk: nits; OCD * gk: nits; move function change file names * gk: nits; 🚀 * gk: nits; convert variadic function and message inspection to interface and include a specific function for that handling so as to not need nil on every call * gk: nits; continued * gk: engine nits; rm loaded exchange * gk: nits; drop WebsocketLoginResponse * stream: Add match method EnsureMatchWithData * gk: nits; rn Inspect to IsFinal * gk: nits; rn to MessageFilter * linter: fix * gateio: update rate limit definitions (cherry-pick) * Add test and missing * Shared REST rate limit definitions with Websocket service, set lookup item to nil for systems that do not require rate limiting; add glorious nit * integrate rate limits for websocket trading spot * bitstamp: fix issue * glorious: nits * ch name and commentary * fix bug add test * rm a thing * fix test * Update engine/engine.go Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io> * thrasher: nits * Update exchanges/stream/stream_match_test.go Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io> * Update exchanges/stream/stream_match_test.go Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io> * GK: nits rn websocket functions * explicit function names for single to multi outbound orders * linter: fix --------- Co-authored-by: shazbert <ryan.oharareid@thrasher.io> Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com> Co-authored-by: Scott <gloriousCode@users.noreply.github.com> Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io>
GoCryptoTrader Exchange Stream Package
This package is part of the GoCryptoTrader project and is responsible for handling exchange streaming data.
Overview
The stream package uses Gorilla Websocket and provides functionalities to connect to various cryptocurrency exchanges and handle real-time data streams.
Features
- Handle real-time market data streams
- Unified interface for managing data streams
- Multi-connection management - a system that can be used to manage multiple connections to the same exchange
- Connection monitoring - a system that can be used to monitor the health of the websocket connections. This can be used to check if the connection is still alive and if it is not, it will attempt to reconnect
- Traffic monitoring - will reconnect if no message is sent for a period of time defined in your config
- Subscription management - a system that can be used to manage subscriptions to various data streams
- Rate limiting - a system that can be used to rate limit the number of requests sent to the exchange
- Message ID generation - a system that can be used to generate message IDs for websocket requests
- Websocket message response matching - can be used to match websocket responses to the requests that were sent
Usage
Default single websocket connection
Here is a basic example of how to setup the stream package for websocket:
package main
import (
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
)
type Exchange struct {
exchange.Base
}
// In the exchange wrapper this will set up the initial pointer field provided by exchange.Base
func (e *Exchange) SetDefault() {
e.Websocket = stream.NewWebsocket()
e.WebsocketResponseMaxLimit = exchange.DefaultWebsocketResponseMaxLimit
e.WebsocketResponseCheckTimeout = exchange.DefaultWebsocketResponseCheckTimeout
e.WebsocketOrderbookBufferLimit = exchange.DefaultWebsocketOrderbookBufferLimit
}
// In the exchange wrapper this is the original setup pattern for the websocket services
func (e *Exchange) Setup(exch *config.Exchange) error {
// This sets up global connection, sub, unsub and generate subscriptions for each connection defined below.
if err := e.Websocket.Setup(&stream.WebsocketSetup{
ExchangeConfig: exch,
DefaultURL: connectionURLString,
RunningURL: connectionURLString,
Connector: e.WsConnect,
Subscriber: e.Subscribe,
Unsubscriber: e.Unsubscribe,
GenerateSubscriptions: e.GenerateDefaultSubscriptions,
Features: &e.Features.Supports.WebsocketCapabilities,
MaxWebsocketSubscriptionsPerConnection: 240,
OrderbookBufferConfig: buffer.Config{ Checksum: e.CalculateUpdateOrderbookChecksum },
}); err != nil {
return err
}
// This is a public websocket connection
if err := ok.Websocket.SetupNewConnection(&stream.ConnectionSetup{
URL: connectionURLString,
ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
ResponseMaxLimit: exchangeWebsocketResponseMaxLimit,
RateLimit: request.NewRateLimitWithWeight(time.Second, 2, 1),
}); err != nil {
return err
}
// This is a private websocket connection
return ok.Websocket.SetupNewConnection(&stream.ConnectionSetup{
URL: privateConnectionURLString,
ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
ResponseMaxLimit: exchangeWebsocketResponseMaxLimit,
Authenticated: true,
RateLimit: request.NewRateLimitWithWeight(time.Second, 2, 1),
})
}
Multiple websocket connections
The example below provides the now optional multi connection management system which allows for more connections to be maintained and established based off URL, connections types, asset types etc.
func (e *Exchange) Setup(exch *config.Exchange) error {
// This sets up global connection, sub, unsub and generate subscriptions for each connection defined below.
if err := e.Websocket.Setup(&stream.WebsocketSetup{
ExchangeConfig: exch,
Features: &e.Features.Supports.WebsocketCapabilities,
FillsFeed: e.Features.Enabled.FillsFeed,
TradeFeed: e.Features.Enabled.TradeFeed,
UseMultiConnectionManagement: true,
})
if err != nil {
return err
}
// Spot connection
err = g.Websocket.SetupNewConnection(&stream.ConnectionSetup{
URL: connectionURLStringForSpot,
RateLimit: request.NewWeightedRateLimitByDuration(gateioWebsocketRateLimit),
ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
ResponseMaxLimit: exch.WebsocketResponseMaxLimit,
// Custom handlers for the specific connection:
Handler: e.WsHandleSpotData,
Subscriber: e.SpotSubscribe,
Unsubscriber: e.SpotUnsubscribe,
GenerateSubscriptions: e.GenerateDefaultSubscriptionsSpot,
Connector: e.WsConnectSpot,
BespokeGenerateMessageID: e.GenerateWebsocketMessageID,
})
if err != nil {
return err
}
// Futures connection - USDT margined
err = g.Websocket.SetupNewConnection(&stream.ConnectionSetup{
URL: connectionURLStringForSpotForFutures,
RateLimit: request.NewWeightedRateLimitByDuration(gateioWebsocketRateLimit),
ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
ResponseMaxLimit: exch.WebsocketResponseMaxLimit,
// Custom handlers for the specific connection:
Handler: func(ctx context.Context, incoming []byte) error { return e.WsHandleFuturesData(ctx, incoming, asset.Futures) },
Subscriber: e.FuturesSubscribe,
Unsubscriber: e.FuturesUnsubscribe,
GenerateSubscriptions: func() (subscription.List, error) { return e.GenerateFuturesDefaultSubscriptions(currency.USDT) },
Connector: e.WsFuturesConnect,
BespokeGenerateMessageID: e.GenerateWebsocketMessageID,
})
if err != nil {
return err
}
}