Websocket orderbook buffering (#333)

* Initial commit setting up a map orderbook system with a buffer. It will write to the buffer, sort apply to main orderbook and then process.

* Moves namespaces again

* Updates orderbook to use a sweet new WebsocketOrderbookUpdate type to handle all updates whether its using ID or not. So good. Adds many tests

* Starting to implement orderbook update handling per exchange. Updates namespaces again. Hopefuylly will find a way to update via ID not timestamp, too many endpoints dont provide update timestamps

* Changes orderbookbuffer to use BufferUpdate type instead of orderbook.Base to achieve more functionality and no need for type conversion functions. Updates tests

* Updates all instances of ws.orderbook.Update. Simplifies some orderbook logic

* Introduces toggleable buffer. Renames orderbooks. Completes implementation for everywhere but OKGroup due to hash calculation

* Implements orderbook update for okgroup, but forgets about the orderbook hash checking

* Fixes okgroup checksum calculation. Fixes linting issue. Removes redundant Kraken tests.

* Introduces sorting toggle and separates from buffer toggle. Uses benchmarks to highlight performance gains

* Fixes Gemini rate limit and parsing. Removes comments and fixes typos

* Fixes bitfinex orderbook processing

* Inbuilt sorting, minor fixes for websocket implementations. Improves test coverage

* Adds surprise LakeBTC websocket support

* Fixes data race

* Fixes rebasing issues due to namespace movements

* Addresses PR nits: moves folder namespace from ws to websocket. Removes line spaces in imports. Fixes lakebtc websocket returns and defer fucntions. Fixes comments

* Adds poloniex orderook sorting support

* Enables bitstamp and hitbtc orderbook sorting. Fixes poloniex's sorting

* Renames namespaces and combines monitor and connection into wshandler. Removes unused SPOT const. Changes how orderbook stuff is loaded. It is done in startup with a setup. Removes exchange name from loadsnapshot as well

* Removes the connection.go from rebasing issues. Removes error response from functions used in goroutines

* Fixes test with exchange name output change

* Fixes issues where copy and paste and replace all were used poorly
This commit is contained in:
Scott
2019-08-13 09:32:59 +10:00
committed by Adrian Gallagher
parent 2078ba907f
commit 0fbf8b172a
110 changed files with 2197 additions and 1909 deletions

View File

@@ -0,0 +1,864 @@
package wshandler
import (
"bytes"
"compress/flate"
"compress/gzip"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/config"
log "github.com/thrasher-corp/gocryptotrader/logger"
)
// New initialises the websocket struct
func New() *Websocket {
return &Websocket{
defaultURL: "",
enabled: false,
proxyAddr: "",
runningURL: "",
init: true,
}
}
// Setup sets main variables for websocket connection
func (w *Websocket) Setup(connector func() error,
subscriber func(channelToSubscribe WebsocketChannelSubscription) error,
unsubscriber func(channelToUnsubscribe WebsocketChannelSubscription) error,
exchangeName string,
wsEnabled,
verbose bool,
defaultURL,
runningURL string,
authenticatedWebsocketAPISupport bool) error {
w.DataHandler = make(chan interface{}, 1)
w.Connected = make(chan struct{}, 1)
w.Disconnected = make(chan struct{}, 1)
w.TrafficAlert = make(chan struct{}, 1)
w.verbose = verbose
w.SetChannelSubscriber(subscriber)
w.SetChannelUnsubscriber(unsubscriber)
err := w.SetWsStatusAndConnection(wsEnabled)
if err != nil {
return err
}
w.SetDefaultURL(defaultURL)
w.SetConnector(connector)
w.SetWebsocketURL(runningURL)
w.SetExchangeName(exchangeName)
w.SetCanUseAuthenticatedEndpoints(authenticatedWebsocketAPISupport)
w.init = false
w.noConnectionCheckLimit = 5
w.reconnectionLimit = 10
return nil
}
// Connect intiates a websocket connection by using a package defined connection
// function
func (w *Websocket) Connect() error {
w.m.Lock()
defer w.m.Unlock()
if !w.IsEnabled() {
return errors.New(WebsocketNotEnabled)
}
if w.connected {
w.connecting = false
return errors.New("exchange_websocket.go error - already connected, cannot connect again")
}
w.connecting = true
w.ShutdownC = make(chan struct{}, 1)
err := w.connector()
if err != nil {
w.connecting = false
return fmt.Errorf("exchange_websocket.go connection error %s",
err)
}
if !w.connected {
w.Connected <- struct{}{}
w.connected = true
w.connecting = false
}
var anotherWG sync.WaitGroup
anotherWG.Add(1)
go w.trafficMonitor(&anotherWG)
anotherWG.Wait()
if !w.connectionMonitorRunning {
go w.connectionMonitor()
}
go w.manageSubscriptions()
return nil
}
// connectionMonitor ensures that the WS keeps connecting
func (w *Websocket) connectionMonitor() {
w.m.Lock()
w.connectionMonitorRunning = true
w.m.Unlock()
defer func() {
w.connectionMonitorRunning = false
}()
for {
time.Sleep(connectionMonitorDelay)
w.m.Lock()
if !w.enabled {
w.m.Unlock()
w.DataHandler <- fmt.Errorf("%v connectionMonitor: websocket disabled, shutting down", w.exchangeName)
err := w.Shutdown()
if err != nil {
log.Error(err)
}
if w.verbose {
log.Debugf("%v connectionMonitor exiting", w.exchangeName)
}
return
}
w.m.Unlock()
err := w.checkConnection()
if err != nil {
log.Error(err)
}
}
}
// checkConnection ensures the connection is maintained
// Will reconnect on disconnect
func (w *Websocket) checkConnection() error {
if w.verbose {
log.Debugf("%v checking connection", w.exchangeName)
}
switch {
case !w.IsConnected() && !w.IsConnecting():
w.m.Lock()
defer w.m.Unlock()
if w.verbose {
log.Debugf("%v no connection. Attempt %v/%v", w.exchangeName, w.noConnectionChecks, w.noConnectionCheckLimit)
}
if w.noConnectionChecks >= w.noConnectionCheckLimit {
if w.verbose {
log.Debugf("%v resetting connection", w.exchangeName)
}
w.connecting = true
go w.WebsocketReset()
w.noConnectionChecks = 0
}
w.noConnectionChecks++
case w.IsConnecting():
if w.reconnectionChecks >= w.reconnectionLimit {
return fmt.Errorf("%v websocket failed to reconnect after %v seconds",
w.exchangeName,
w.reconnectionLimit*int(connectionMonitorDelay.Seconds()))
}
if w.verbose {
log.Debugf("%v Busy reconnecting", w.exchangeName)
}
w.reconnectionChecks++
default:
w.noConnectionChecks = 0
w.reconnectionChecks = 0
}
return nil
}
// IsConnected exposes websocket connection status
func (w *Websocket) IsConnected() bool {
w.m.Lock()
defer w.m.Unlock()
return w.connected
}
// IsConnecting checks whether websocket is busy connecting
func (w *Websocket) IsConnecting() bool {
w.m.Lock()
defer w.m.Unlock()
return w.connecting
}
// Shutdown attempts to shut down a websocket connection and associated routines
// by using a package defined shutdown function
func (w *Websocket) Shutdown() error {
w.m.Lock()
defer func() {
w.Orderbook.FlushCache()
w.m.Unlock()
}()
if !w.connected && w.ShutdownC == nil {
return fmt.Errorf("%v cannot shutdown a disconnected websocket", w.exchangeName)
}
if w.verbose {
log.Debugf("%v shutting down websocket channels", w.exchangeName)
}
timer := time.NewTimer(15 * time.Second)
c := make(chan struct{}, 1)
go func(c chan struct{}) {
close(w.ShutdownC)
w.Wg.Wait()
if w.verbose {
log.Debugf("%v completed websocket channel shutdown", w.exchangeName)
}
c <- struct{}{}
}(c)
select {
case <-c:
w.connected = false
return nil
case <-timer.C:
return fmt.Errorf("%s websocket routines failed to shutdown after 15 seconds",
w.GetName())
}
}
// WebsocketReset sends the shutdown command, waits for channel/func closure and then reconnects
func (w *Websocket) WebsocketReset() {
err := w.Shutdown()
if err != nil {
// does not return here to allow connection to be made if already shut down
w.DataHandler <- fmt.Errorf("%v shutdown error: %v", w.exchangeName, err)
}
log.Infof("%v reconnecting to websocket", w.exchangeName)
w.m.Lock()
w.init = true
w.m.Unlock()
err = w.Connect()
if err != nil {
w.DataHandler <- fmt.Errorf("%v connection error: %v", w.exchangeName, err)
}
}
// trafficMonitor monitors traffic and switches connection modes for websocket
func (w *Websocket) trafficMonitor(wg *sync.WaitGroup) {
w.Wg.Add(1)
wg.Done() // Makes sure we are unlocking after we add to waitgroup
defer func() {
if w.connected {
w.Disconnected <- struct{}{}
}
w.Wg.Done()
}()
// Define an initial traffic timer which will be a delay then fall over to
// WebsocketTrafficLimitTime after first response
trafficTimer := time.NewTimer(5 * time.Second)
for {
select {
case <-w.ShutdownC: // Returns on shutdown channel close
if w.verbose {
log.Debugf("%v trafficMonitor shutdown message received", w.exchangeName)
}
return
case <-w.TrafficAlert: // Resets timer on traffic
w.m.Lock()
if !w.connected {
w.Connected <- struct{}{}
w.connected = true
}
w.m.Unlock()
trafficTimer.Reset(WebsocketTrafficLimitTime)
case <-trafficTimer.C: // Falls through when timer runs out
newtimer := time.NewTimer(10 * time.Second) // New secondary timer set
if w.verbose {
log.Debugf("%v has not received a traffic alert in 5 seconds.", w.exchangeName)
}
w.m.Lock()
if w.connected {
// If connected divert traffic to rest
w.Disconnected <- struct{}{}
w.connected = false
}
w.m.Unlock()
select {
case <-w.ShutdownC: // Returns on shutdown channel close
w.m.Lock()
w.connected = false
w.m.Unlock()
return
case <-newtimer.C: // If secondary timer runs state timeout is sent to the data handler
if w.verbose {
log.Debugf("%v has not received a traffic alert in 15 seconds, exiting", w.exchangeName)
}
w.DataHandler <- fmt.Errorf("trafficMonitor %v", WebsocketStateTimeout)
return
case <-w.TrafficAlert: // If in this time response traffic comes through
trafficTimer.Reset(WebsocketTrafficLimitTime)
w.m.Lock()
if !w.connected {
// If not connected dive rt traffic from REST to websocket
w.Connected <- struct{}{}
if w.verbose {
log.Debugf("%v has received a traffic alert. Setting status to connected", w.exchangeName)
}
w.connected = true
}
w.m.Unlock()
}
}
}
}
// SetWebsocketURL sets websocket URL
func (w *Websocket) SetWebsocketURL(websocketURL string) {
if websocketURL == "" || websocketURL == config.WebsocketURLNonDefaultMessage {
w.runningURL = w.defaultURL
return
}
w.runningURL = websocketURL
}
// GetWebsocketURL returns the running websocket URL
func (w *Websocket) GetWebsocketURL() string {
return w.runningURL
}
// SetWsStatusAndConnection sets if websocket is enabled
// it will also connect/disconnect the websocket connection
func (w *Websocket) SetWsStatusAndConnection(enabled bool) error {
w.m.Lock()
if w.enabled == enabled {
if w.init {
w.m.Unlock()
return nil
}
w.m.Unlock()
return fmt.Errorf("exchange_websocket.go error - already set as %t",
enabled)
}
w.enabled = enabled
if !w.init {
if enabled {
if w.connected {
w.m.Unlock()
return nil
}
w.m.Unlock()
return w.Connect()
}
if !w.connected {
w.m.Unlock()
return nil
}
w.m.Unlock()
return w.Shutdown()
}
w.m.Unlock()
return nil
}
// IsEnabled returns bool
func (w *Websocket) IsEnabled() bool {
return w.enabled
}
// SetProxyAddress sets websocket proxy address
func (w *Websocket) SetProxyAddress(proxyAddr string) error {
if w.proxyAddr == proxyAddr {
return errors.New("exchange_websocket.go error - Setting proxy address - same address")
}
w.proxyAddr = proxyAddr
if !w.init && w.enabled {
if w.connected {
err := w.Shutdown()
if err != nil {
return err
}
}
return w.Connect()
}
return nil
}
// GetProxyAddress returns the current websocket proxy
func (w *Websocket) GetProxyAddress() string {
return w.proxyAddr
}
// SetDefaultURL sets default websocket URL
func (w *Websocket) SetDefaultURL(defaultURL string) {
w.defaultURL = defaultURL
}
// GetDefaultURL returns the default websocket URL
func (w *Websocket) GetDefaultURL() string {
return w.defaultURL
}
// SetConnector sets connection function
func (w *Websocket) SetConnector(connector func() error) {
w.connector = connector
}
// SetExchangeName sets exchange name
func (w *Websocket) SetExchangeName(exchName string) {
w.exchangeName = exchName
}
// GetName returns exchange name
func (w *Websocket) GetName() string {
return w.exchangeName
}
// GetFunctionality returns a functionality bitmask for the websocket
// connection
func (w *Websocket) GetFunctionality() uint32 {
return w.Functionality
}
// SupportsFunctionality returns if the functionality is supported as a boolean
func (w *Websocket) SupportsFunctionality(f uint32) bool {
return w.GetFunctionality()&f == f
}
// FormatFunctionality will return each of the websocket connection compatible
// stream methods as a string
func (w *Websocket) FormatFunctionality() string {
var functionality []string
for i := 0; i < 32; i++ {
var check uint32 = 1 << uint32(i)
if w.GetFunctionality()&check != 0 {
switch check {
case WebsocketTickerSupported:
functionality = append(functionality, WebsocketTickerSupportedText)
case WebsocketOrderbookSupported:
functionality = append(functionality, WebsocketOrderbookSupportedText)
case WebsocketKlineSupported:
functionality = append(functionality, WebsocketKlineSupportedText)
case WebsocketTradeDataSupported:
functionality = append(functionality, WebsocketTradeDataSupportedText)
case WebsocketAccountSupported:
functionality = append(functionality, WebsocketAccountSupportedText)
case WebsocketAllowsRequests:
functionality = append(functionality, WebsocketAllowsRequestsText)
case WebsocketSubscribeSupported:
functionality = append(functionality, WebsocketSubscribeSupportedText)
case WebsocketUnsubscribeSupported:
functionality = append(functionality, WebsocketUnsubscribeSupportedText)
case WebsocketAuthenticatedEndpointsSupported:
functionality = append(functionality, WebsocketAuthenticatedEndpointsSupportedText)
case WebsocketAccountDataSupported:
functionality = append(functionality, WebsocketAccountDataSupportedText)
case WebsocketSubmitOrderSupported:
functionality = append(functionality, WebsocketSubmitOrderSupportedText)
case WebsocketCancelOrderSupported:
functionality = append(functionality, WebsocketCancelOrderSupportedText)
case WebsocketWithdrawSupported:
functionality = append(functionality, WebsocketWithdrawSupportedText)
case WebsocketMessageCorrelationSupported:
functionality = append(functionality, WebsocketMessageCorrelationSupportedText)
case WebsocketSequenceNumberSupported:
functionality = append(functionality, WebsocketSequenceNumberSupportedText)
case WebsocketDeadMansSwitchSupported:
functionality = append(functionality, WebsocketDeadMansSwitchSupportedText)
default:
functionality = append(functionality,
fmt.Sprintf("%s[1<<%v]", UnknownWebsocketFunctionality, i))
}
}
}
if len(functionality) > 0 {
return strings.Join(functionality, " & ")
}
return NoWebsocketSupportText
}
// SetChannelSubscriber sets the function to use the base subscribe func
func (w *Websocket) SetChannelSubscriber(subscriber func(channelToSubscribe WebsocketChannelSubscription) error) {
w.channelSubscriber = subscriber
}
// SetChannelUnsubscriber sets the function to use the base unsubscribe func
func (w *Websocket) SetChannelUnsubscriber(unsubscriber func(channelToUnsubscribe WebsocketChannelSubscription) error) {
w.channelUnsubscriber = unsubscriber
}
// ManageSubscriptions ensures the subscriptions specified continue to be subscribed to
func (w *Websocket) manageSubscriptions() {
if !w.SupportsFunctionality(WebsocketSubscribeSupported) && !w.SupportsFunctionality(WebsocketUnsubscribeSupported) {
w.DataHandler <- fmt.Errorf("%v does not support channel subscriptions, exiting ManageSubscriptions()", w.exchangeName)
return
}
w.Wg.Add(1)
defer func() {
if w.verbose {
log.Debugf("%v ManageSubscriptions exiting", w.exchangeName)
}
w.Wg.Done()
}()
for {
select {
case <-w.ShutdownC:
w.subscribedChannels = []WebsocketChannelSubscription{}
if w.verbose {
log.Debugf("%v shutdown manageSubscriptions", w.exchangeName)
}
return
default:
time.Sleep(manageSubscriptionsDelay)
if w.verbose {
log.Debugf("%v checking subscriptions", w.exchangeName)
}
// Subscribe to channels Pending a subscription
if w.SupportsFunctionality(WebsocketSubscribeSupported) {
err := w.subscribeToChannels()
if err != nil {
w.DataHandler <- err
}
}
if w.SupportsFunctionality(WebsocketUnsubscribeSupported) {
err := w.unsubscribeToChannels()
if err != nil {
w.DataHandler <- err
}
}
}
}
}
// subscribeToChannels compares channelsToSubscribe to subscribedChannels
// and subscribes to any channels not present in subscribedChannels
func (w *Websocket) subscribeToChannels() error {
w.subscriptionLock.Lock()
defer w.subscriptionLock.Unlock()
for i := 0; i < len(w.channelsToSubscribe); i++ {
channelIsSubscribed := false
for j := 0; j < len(w.subscribedChannels); j++ {
if w.subscribedChannels[j].Equal(&w.channelsToSubscribe[i]) {
channelIsSubscribed = true
break
}
}
if !channelIsSubscribed {
if w.verbose {
log.Debugf("%v Subscribing to %v %v", w.exchangeName, w.channelsToSubscribe[i].Channel, w.channelsToSubscribe[i].Currency.String())
}
err := w.channelSubscriber(w.channelsToSubscribe[i])
if err != nil {
return err
}
w.subscribedChannels = append(w.subscribedChannels, w.channelsToSubscribe[i])
}
}
return nil
}
// unsubscribeToChannels compares subscribedChannels to channelsToSubscribe
// and unsubscribes to any channels not present in channelsToSubscribe
func (w *Websocket) unsubscribeToChannels() error {
w.subscriptionLock.Lock()
defer w.subscriptionLock.Unlock()
for i := 0; i < len(w.subscribedChannels); i++ {
subscriptionFound := false
for j := 0; j < len(w.channelsToSubscribe); j++ {
if w.channelsToSubscribe[j].Equal(&w.subscribedChannels[i]) {
subscriptionFound = true
break
}
}
if !subscriptionFound {
err := w.channelUnsubscriber(w.subscribedChannels[i])
if err != nil {
return err
}
}
}
// Now that the slices should match, assign rather than looping and appending the differences
w.subscribedChannels = append(w.channelsToSubscribe[:0:0], w.channelsToSubscribe...) //nolint:gocritic
return nil
}
// RemoveSubscribedChannels removes supplied channels from channelsToSubscribe
func (w *Websocket) RemoveSubscribedChannels(channels []WebsocketChannelSubscription) {
for i := range channels {
w.removeChannelToSubscribe(channels[i])
}
}
// removeChannelToSubscribe removes an entry from w.channelsToSubscribe
// so an unsubscribe event can be triggered
func (w *Websocket) removeChannelToSubscribe(subscribedChannel WebsocketChannelSubscription) {
w.subscriptionLock.Lock()
defer w.subscriptionLock.Unlock()
channelLength := len(w.channelsToSubscribe)
i := 0
for j := 0; j < len(w.channelsToSubscribe); j++ {
if !w.channelsToSubscribe[j].Equal(&subscribedChannel) {
w.channelsToSubscribe[i] = w.channelsToSubscribe[j]
i++
}
}
w.channelsToSubscribe = w.channelsToSubscribe[:i]
if channelLength == len(w.channelsToSubscribe) {
w.DataHandler <- fmt.Errorf("%v removeChannelToSubscribe() Channel %v Currency %v could not be removed because it was not found",
w.exchangeName,
subscribedChannel.Channel,
subscribedChannel.Currency)
}
}
// ResubscribeToChannel calls unsubscribe func and
// removes it from subscribedChannels to trigger a subscribe event
func (w *Websocket) ResubscribeToChannel(subscribedChannel WebsocketChannelSubscription) {
w.subscriptionLock.Lock()
defer w.subscriptionLock.Unlock()
err := w.channelUnsubscriber(subscribedChannel)
if err != nil {
w.DataHandler <- err
}
// Remove the channel from the list of subscribed channels
// ManageSubscriptions will automatically resubscribe
i := 0
for j := 0; j < len(w.subscribedChannels); j++ {
if !w.subscribedChannels[j].Equal(&subscribedChannel) {
w.subscribedChannels[i] = w.subscribedChannels[j]
i++
}
}
w.subscribedChannels = w.subscribedChannels[:i]
}
// SubscribeToChannels appends supplied channels to channelsToSubscribe
func (w *Websocket) SubscribeToChannels(channels []WebsocketChannelSubscription) {
for i := range channels {
channelFound := false
for j := range w.channelsToSubscribe {
if w.channelsToSubscribe[j].Equal(&channels[i]) {
channelFound = true
}
}
if !channelFound {
w.channelsToSubscribe = append(w.channelsToSubscribe, channels[i])
}
}
w.noConnectionChecks = 0
}
// Equal two WebsocketChannelSubscription to determine equality
func (w *WebsocketChannelSubscription) Equal(subscribedChannel *WebsocketChannelSubscription) bool {
return strings.EqualFold(w.Channel, subscribedChannel.Channel) &&
strings.EqualFold(w.Currency.String(), subscribedChannel.Currency.String())
}
// GetSubscriptions returns a copied list of subscriptions
// subscriptions is a private member and cannot be manipulated
func (w *Websocket) GetSubscriptions() []WebsocketChannelSubscription {
return append(w.subscribedChannels[:0:0], w.subscribedChannels...)
}
// SetCanUseAuthenticatedEndpoints sets canUseAuthenticatedEndpoints val in
// a thread safe manner
func (w *Websocket) SetCanUseAuthenticatedEndpoints(val bool) {
w.subscriptionLock.Lock()
defer w.subscriptionLock.Unlock()
w.canUseAuthenticatedEndpoints = val
}
// CanUseAuthenticatedEndpoints gets canUseAuthenticatedEndpoints val in
// a thread safe manner
func (w *Websocket) CanUseAuthenticatedEndpoints() bool {
w.subscriptionLock.Lock()
defer w.subscriptionLock.Unlock()
return w.canUseAuthenticatedEndpoints
}
// AddResponseWithID adds data to IDResponses with locks and a nil check
func (w *WebsocketConnection) AddResponseWithID(id int64, data []byte) {
w.Lock()
defer w.Unlock()
if w.IDResponses == nil {
w.IDResponses = make(map[int64][]byte)
}
w.IDResponses[id] = data
}
// Dial sets proxy urls and then connects to the websocket
func (w *WebsocketConnection) Dial(dialer *websocket.Dialer, headers http.Header) error {
if w.ProxyURL != "" {
proxy, err := url.Parse(w.ProxyURL)
if err != nil {
return err
}
dialer.Proxy = http.ProxyURL(proxy)
}
var err error
var conStatus *http.Response
w.Connection, conStatus, err = dialer.Dial(w.URL, headers)
if err != nil {
if conStatus != nil {
return fmt.Errorf("%v %v %v Error: %v", w.URL, conStatus, conStatus.StatusCode, err)
}
return fmt.Errorf("%v Error: %v", w.URL, err)
}
return nil
}
// SendMessage the one true message request. Sends message to WS
func (w *WebsocketConnection) SendMessage(data interface{}) error {
w.Lock()
defer w.Unlock()
json, err := common.JSONEncode(data)
if err != nil {
return err
}
if w.Verbose {
log.Debugf("%v sending message to websocket %v", w.ExchangeName, string(json))
}
if w.RateLimit > 0 {
time.Sleep(time.Duration(w.RateLimit) * time.Millisecond)
}
return w.Connection.WriteMessage(websocket.TextMessage, json)
}
// SendMessageReturnResponse will send a WS message to the connection
// It will then run a goroutine to await a JSON response
// If there is no response it will return an error
func (w *WebsocketConnection) SendMessageReturnResponse(id int64, request interface{}) ([]byte, error) {
err := w.SendMessage(request)
if err != nil {
return nil, err
}
var wg sync.WaitGroup
wg.Add(1)
go w.WaitForResult(id, &wg)
defer func() {
delete(w.IDResponses, id)
}()
wg.Wait()
if _, ok := w.IDResponses[id]; !ok {
return nil, fmt.Errorf("timeout waiting for response with ID %v", id)
}
return w.IDResponses[id], nil
}
// WaitForResult will keep checking w.IDResponses for a response ID
// If the timer expires, it will return without
func (w *WebsocketConnection) WaitForResult(id int64, wg *sync.WaitGroup) {
defer wg.Done()
timer := time.NewTimer(w.ResponseMaxLimit)
for {
select {
case <-timer.C:
return
default:
w.Lock()
for k := range w.IDResponses {
if k == id {
w.Unlock()
return
}
}
w.Unlock()
time.Sleep(w.ResponseCheckTimeout)
}
}
}
// ReadMessage reads messages, can handle text, gzip and binary
func (w *WebsocketConnection) ReadMessage() (WebsocketResponse, error) {
mType, resp, err := w.Connection.ReadMessage()
if err != nil {
return WebsocketResponse{}, err
}
var standardMessage []byte
switch mType {
case websocket.TextMessage:
standardMessage = resp
case websocket.BinaryMessage:
standardMessage, err = w.parseBinaryResponse(resp)
if err != nil {
return WebsocketResponse{}, err
}
}
if w.Verbose {
log.Debugf("%v Websocket message received: %v",
w.ExchangeName,
string(standardMessage))
}
return WebsocketResponse{Raw: standardMessage, Type: mType}, nil
}
// parseBinaryResponse parses a websocket binary response into a usable byte array
func (w *WebsocketConnection) parseBinaryResponse(resp []byte) ([]byte, error) {
var standardMessage []byte
var err error
// Detect GZIP
if resp[0] == 31 && resp[1] == 139 {
b := bytes.NewReader(resp)
var gReader *gzip.Reader
gReader, err = gzip.NewReader(b)
if err != nil {
return standardMessage, err
}
standardMessage, err = ioutil.ReadAll(gReader)
if err != nil {
return standardMessage, err
}
err = gReader.Close()
if err != nil {
return standardMessage, err
}
} else {
reader := flate.NewReader(bytes.NewReader(resp))
standardMessage, err = ioutil.ReadAll(reader)
if err != nil {
return standardMessage, err
}
err = reader.Close()
if err != nil {
return standardMessage, err
}
}
return standardMessage, nil
}
// GenerateMessageID Creates a messageID to checkout
func (w *WebsocketConnection) GenerateMessageID(useNano bool) int64 {
if useNano {
return time.Now().UnixNano()
}
return time.Now().Unix()
}

View File

@@ -0,0 +1,385 @@
package wshandler
import (
"fmt"
"strings"
"testing"
"time"
)
var ws *Websocket
func TestWebsocketInit(t *testing.T) {
ws = New()
if ws == nil {
t.Error("test failed - Websocket New() error")
}
}
func TestWebsocket(t *testing.T) {
if err := ws.SetProxyAddress("testProxy"); err != nil {
t.Error("test failed - SetProxyAddress", err)
}
ws.Setup(func() error { return nil },
func(test WebsocketChannelSubscription) error { return nil },
func(test WebsocketChannelSubscription) error { return nil },
"testName",
true,
false,
"testDefaultURL",
"testRunningURL",
false)
// Test variable setting and retreival
if ws.GetName() != "testName" {
t.Error("test failed - WebsocketSetup")
}
if !ws.IsEnabled() {
t.Error("test failed - WebsocketSetup")
}
if ws.GetProxyAddress() != "testProxy" {
t.Error("test failed - WebsocketSetup")
}
if ws.GetDefaultURL() != "testDefaultURL" {
t.Error("test failed - WebsocketSetup")
}
if ws.GetWebsocketURL() != "testRunningURL" {
t.Error("test failed - WebsocketSetup")
}
// Test websocket connect and shutdown functions
comms := make(chan struct{}, 1)
go func() {
var count int
for {
if count == 4 {
close(comms)
return
}
select {
case <-ws.Connected:
count++
case <-ws.Disconnected:
count++
}
}
}()
// -- Not connected shutdown
err := ws.Shutdown()
if err == nil {
t.Fatal("test failed - should not be connected to able to shut down")
}
ws.Wg.Wait()
// -- Normal connect
err = ws.Connect()
if err != nil {
t.Fatal("test failed - WebsocketSetup", err)
}
// -- Already connected connect
err = ws.Connect()
if err == nil {
t.Fatal("test failed - should not connect, already connected")
}
ws.SetWebsocketURL("")
// -- Set true when already true
err = ws.SetWsStatusAndConnection(true)
if err == nil {
t.Fatal("test failed - setting enabled should not work")
}
// -- Set false normal
err = ws.SetWsStatusAndConnection(false)
if err != nil {
t.Fatal("test failed - setting enabled should not work")
}
// -- Set true normal
err = ws.SetWsStatusAndConnection(true)
if err != nil {
t.Fatal("test failed - setting enabled should not work")
}
// -- Normal shutdown
err = ws.Shutdown()
if err != nil {
t.Fatal("test failed - WebsocketSetup", err)
}
timer := time.NewTimer(5 * time.Second)
select {
case <-comms:
case <-timer.C:
t.Fatal("test failed - WebsocketSetup - timeout")
}
}
func TestFunctionality(t *testing.T) {
var w Websocket
if w.FormatFunctionality() != NoWebsocketSupportText {
t.Fatalf("Test Failed - FormatFunctionality error expected %s but received %s",
NoWebsocketSupportText, w.FormatFunctionality())
}
w.Functionality = 1 << 31
if w.FormatFunctionality() != UnknownWebsocketFunctionality+"[1<<31]" {
t.Fatal("Test Failed - GetFunctionality error incorrect error returned")
}
w.Functionality = WebsocketOrderbookSupported
if w.GetFunctionality() != WebsocketOrderbookSupported {
t.Fatal("Test Failed - GetFunctionality error incorrect bitmask returned")
}
if !w.SupportsFunctionality(WebsocketOrderbookSupported) {
t.Fatal("Test Failed - SupportsFunctionality error should be true")
}
}
// placeholderSubscriber basic function to test subscriptions
func placeholderSubscriber(channelToSubscribe WebsocketChannelSubscription) error {
return nil
}
// TestSubscribe logic test
func TestSubscribe(t *testing.T) {
w := Websocket{
channelsToSubscribe: []WebsocketChannelSubscription{
{
Channel: "hello",
},
},
subscribedChannels: []WebsocketChannelSubscription{},
}
w.SetChannelSubscriber(placeholderSubscriber)
w.subscribeToChannels()
if len(w.subscribedChannels) != 1 {
t.Errorf("Subscription did not occur")
}
}
// TestUnsubscribe logic test
func TestUnsubscribe(t *testing.T) {
w := Websocket{
channelsToSubscribe: []WebsocketChannelSubscription{},
subscribedChannels: []WebsocketChannelSubscription{
{
Channel: "hello",
},
},
}
w.SetChannelUnsubscriber(placeholderSubscriber)
w.unsubscribeToChannels()
if len(w.subscribedChannels) != 0 {
t.Errorf("Unsubscription did not occur")
}
}
// TestSubscriptionWithExistingEntry logic test
func TestSubscriptionWithExistingEntry(t *testing.T) {
w := Websocket{
channelsToSubscribe: []WebsocketChannelSubscription{
{
Channel: "hello",
},
},
subscribedChannels: []WebsocketChannelSubscription{
{
Channel: "hello",
},
},
}
w.SetChannelSubscriber(placeholderSubscriber)
w.subscribeToChannels()
if len(w.subscribedChannels) != 1 {
t.Errorf("Subscription should not have occured")
}
}
// TestUnsubscriptionWithExistingEntry logic test
func TestUnsubscriptionWithExistingEntry(t *testing.T) {
w := Websocket{
channelsToSubscribe: []WebsocketChannelSubscription{
{
Channel: "hello",
},
},
subscribedChannels: []WebsocketChannelSubscription{
{
Channel: "hello",
},
},
}
w.SetChannelUnsubscriber(placeholderSubscriber)
w.unsubscribeToChannels()
if len(w.subscribedChannels) != 1 {
t.Errorf("Unsubscription should not have occured")
}
}
// TestManageSubscriptionsStartStop logic test
func TestManageSubscriptionsStartStop(t *testing.T) {
w := Websocket{
ShutdownC: make(chan struct{}, 1),
Functionality: WebsocketSubscribeSupported | WebsocketUnsubscribeSupported,
}
go w.manageSubscriptions()
time.Sleep(time.Second)
close(w.ShutdownC)
}
// TestConnectionMonitorNoConnection logic test
func TestConnectionMonitorNoConnection(t *testing.T) {
w := Websocket{}
w.DataHandler = make(chan interface{}, 1)
w.ShutdownC = make(chan struct{}, 1)
w.exchangeName = "hello"
go w.connectionMonitor()
err := <-w.DataHandler
if !strings.EqualFold(err.(error).Error(),
fmt.Sprintf("%v connectionMonitor: websocket disabled, shutting down", w.exchangeName)) {
t.Errorf("expecting error 'connectionMonitor: websocket disabled, shutting down', received '%v'", err)
}
}
// TestWsNoConnectionTolerance logic test
func TestWsNoConnectionTolerance(t *testing.T) {
w := Websocket{}
w.DataHandler = make(chan interface{}, 1)
w.ShutdownC = make(chan struct{}, 1)
w.enabled = true
w.noConnectionCheckLimit = 500
w.checkConnection()
if w.noConnectionChecks == 0 {
t.Errorf("Expected noConnectionTolerance to increment, received '%v'", w.noConnectionChecks)
}
}
// TestConnecting logic test
func TestConnecting(t *testing.T) {
w := Websocket{}
w.DataHandler = make(chan interface{}, 1)
w.ShutdownC = make(chan struct{}, 1)
w.enabled = true
w.connecting = true
w.reconnectionLimit = 500
w.checkConnection()
if w.reconnectionChecks != 1 {
t.Errorf("Expected reconnectionLimit to increment, received '%v'", w.reconnectionChecks)
}
}
// TestReconnectionLimit logic test
func TestReconnectionLimit(t *testing.T) {
w := Websocket{}
w.DataHandler = make(chan interface{}, 1)
w.ShutdownC = make(chan struct{}, 1)
w.enabled = true
w.connecting = true
w.reconnectionChecks = 99
w.reconnectionLimit = 1
err := w.checkConnection()
if err == nil {
t.Error("Expected error")
}
}
// TestRemoveChannelToSubscribe logic test
func TestRemoveChannelToSubscribe(t *testing.T) {
subscription := WebsocketChannelSubscription{
Channel: "hello",
}
w := Websocket{
channelsToSubscribe: []WebsocketChannelSubscription{
subscription,
},
}
w.SetChannelUnsubscriber(placeholderSubscriber)
w.removeChannelToSubscribe(subscription)
if len(w.subscribedChannels) != 0 {
t.Errorf("Unsubscription did not occur")
}
}
// TestRemoveChannelToSubscribeWithNoSubscription logic test
func TestRemoveChannelToSubscribeWithNoSubscription(t *testing.T) {
subscription := WebsocketChannelSubscription{
Channel: "hello",
}
w := Websocket{
channelsToSubscribe: []WebsocketChannelSubscription{},
}
w.DataHandler = make(chan interface{}, 1)
w.SetChannelUnsubscriber(placeholderSubscriber)
go w.removeChannelToSubscribe(subscription)
err := <-w.DataHandler
if !strings.Contains(err.(error).Error(), "could not be removed because it was not found") {
t.Error("Expected not found error")
}
}
// TestResubscribeToChannel logic test
func TestResubscribeToChannel(t *testing.T) {
subscription := WebsocketChannelSubscription{
Channel: "hello",
}
w := Websocket{
channelsToSubscribe: []WebsocketChannelSubscription{},
}
w.DataHandler = make(chan interface{}, 1)
w.SetChannelUnsubscriber(placeholderSubscriber)
w.SetChannelSubscriber(placeholderSubscriber)
w.ResubscribeToChannel(subscription)
}
// TestSliceCopyDoesntImpactBoth logic test
func TestSliceCopyDoesntImpactBoth(t *testing.T) {
w := Websocket{
channelsToSubscribe: []WebsocketChannelSubscription{
{
Channel: "hello1",
},
{
Channel: "hello2",
},
},
subscribedChannels: []WebsocketChannelSubscription{
{
Channel: "hello3",
},
},
}
w.SetChannelUnsubscriber(placeholderSubscriber)
w.unsubscribeToChannels()
if len(w.subscribedChannels) != 2 {
t.Errorf("Unsubscription did not occur")
}
w.subscribedChannels[0].Channel = "test"
if strings.EqualFold(w.subscribedChannels[0].Channel, w.channelsToSubscribe[0].Channel) {
t.Errorf("Slice has not been copies appropriately")
}
}
// TestSetCanUseAuthenticatedEndpoints logic test
func TestSetCanUseAuthenticatedEndpoints(t *testing.T) {
w := Websocket{}
result := w.CanUseAuthenticatedEndpoints()
if result {
t.Error("expected `canUseAuthenticatedEndpoints` to be false")
}
w.SetCanUseAuthenticatedEndpoints(true)
result = w.CanUseAuthenticatedEndpoints()
if !result {
t.Error("expected `canUseAuthenticatedEndpoints` to be true")
}
}

View File

@@ -0,0 +1,196 @@
package wshandler
import (
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/websocket/wsorderbook"
)
// Websocket functionality list and state consts
const (
NoWebsocketSupport uint32 = 0
WebsocketTickerSupported uint32 = 1 << (iota - 1)
WebsocketOrderbookSupported
WebsocketKlineSupported
WebsocketTradeDataSupported
WebsocketAccountSupported
WebsocketAllowsRequests
WebsocketSubscribeSupported
WebsocketUnsubscribeSupported
WebsocketAuthenticatedEndpointsSupported
WebsocketAccountDataSupported
WebsocketSubmitOrderSupported
WebsocketCancelOrderSupported
WebsocketWithdrawSupported
WebsocketMessageCorrelationSupported
WebsocketSequenceNumberSupported
WebsocketDeadMansSwitchSupported
WebsocketTickerSupportedText = "TICKER STREAMING SUPPORTED"
WebsocketOrderbookSupportedText = "ORDERBOOK STREAMING SUPPORTED"
WebsocketKlineSupportedText = "KLINE STREAMING SUPPORTED"
WebsocketTradeDataSupportedText = "TRADE STREAMING SUPPORTED"
WebsocketAccountSupportedText = "ACCOUNT STREAMING SUPPORTED"
WebsocketAllowsRequestsText = "WEBSOCKET REQUESTS SUPPORTED"
NoWebsocketSupportText = "WEBSOCKET NOT SUPPORTED"
UnknownWebsocketFunctionality = "UNKNOWN FUNCTIONALITY BITMASK"
WebsocketSubscribeSupportedText = "WEBSOCKET SUBSCRIBE SUPPORTED"
WebsocketUnsubscribeSupportedText = "WEBSOCKET UNSUBSCRIBE SUPPORTED"
WebsocketAuthenticatedEndpointsSupportedText = "WEBSOCKET AUTHENTICATED ENDPOINTS SUPPORTED"
WebsocketAccountDataSupportedText = "WEBSOCKET ACCOUNT DATA SUPPORTED"
WebsocketSubmitOrderSupportedText = "WEBSOCKET SUBMIT ORDER SUPPORTED"
WebsocketCancelOrderSupportedText = "WEBSOCKET CANCEL ORDER SUPPORTED"
WebsocketWithdrawSupportedText = "WEBSOCKET WITHDRAW SUPPORTED"
WebsocketMessageCorrelationSupportedText = "WEBSOCKET MESSAGE CORRELATION SUPPORTED"
WebsocketSequenceNumberSupportedText = "WEBSOCKET SEQUENCE NUMBER SUPPORTED"
WebsocketDeadMansSwitchSupportedText = "WEBSOCKET DEAD MANS SWITCH SUPPORTED"
// WebsocketNotEnabled alerts of a disabled websocket
WebsocketNotEnabled = "exchange_websocket_not_enabled"
// WebsocketTrafficLimitTime defines a standard time for no traffic from the
// websocket connection
WebsocketTrafficLimitTime = 5 * time.Second
websocketRestablishConnection = time.Second
manageSubscriptionsDelay = 5 * time.Second
// connection monitor time delays and limits
connectionMonitorDelay = 2 * time.Second
// WebsocketStateTimeout defines a const for when a websocket connection
// times out, will be handled by the routine management system
WebsocketStateTimeout = "TIMEOUT"
)
// Websocket defines a return type for websocket connections via the interface
// wrapper for routine processing in routines.go
type Websocket struct {
proxyAddr string
defaultURL string
runningURL string
exchangeName string
enabled bool
init bool
connected bool
connecting bool
verbose bool
connector func() error
m sync.Mutex
subscriptionLock sync.Mutex
connectionMonitorRunning bool
reconnectionLimit int
noConnectionChecks int
reconnectionChecks int
noConnectionCheckLimit int
subscribedChannels []WebsocketChannelSubscription
channelsToSubscribe []WebsocketChannelSubscription
channelSubscriber func(channelToSubscribe WebsocketChannelSubscription) error
channelUnsubscriber func(channelToUnsubscribe WebsocketChannelSubscription) error
// Connected denotes a channel switch for diversion of request flow
Connected chan struct{}
// Disconnected denotes a channel switch for diversion of request flow
Disconnected chan struct{}
// DataHandler pipes websocket data to an exchange websocket data handler
DataHandler chan interface{}
// ShutdownC is the main shutdown channel which controls all websocket go funcs
ShutdownC chan struct{}
// Orderbook is a local cache of orderbooks
Orderbook wsorderbook.WebsocketOrderbookLocal
// Wg defines a wait group for websocket routines for cleanly shutting down
// routines
Wg sync.WaitGroup
// TrafficAlert monitors if there is a halt in traffic throughput
TrafficAlert chan struct{}
// Functionality defines websocket stream capabilities
Functionality uint32
canUseAuthenticatedEndpoints bool
}
// WebsocketChannelSubscription container for websocket subscriptions
// Currently only a one at a time thing to avoid complexity
type WebsocketChannelSubscription struct {
Channel string
Currency currency.Pair
Params map[string]interface{}
}
// WebsocketResponse defines generalised data from the websocket connection
type WebsocketResponse struct {
Type int
Raw []byte
}
// WebsocketOrderbookUpdate defines a websocket event in which the orderbook
// has been updated in the orderbook package
type WebsocketOrderbookUpdate struct {
Pair currency.Pair
Asset string
Exchange string
}
// TradeData defines trade data
type TradeData struct {
Timestamp time.Time
CurrencyPair currency.Pair
AssetType string
Exchange string
EventType string
EventTime int64
Price float64
Amount float64
Side string
}
// TickerData defines ticker feed
type TickerData struct {
Timestamp time.Time
Pair currency.Pair
AssetType string
Exchange string
ClosePrice float64
Quantity float64
OpenPrice float64
HighPrice float64
LowPrice float64
}
// KlineData defines kline feed
type KlineData struct {
Timestamp time.Time
Pair currency.Pair
AssetType string
Exchange string
StartTime time.Time
CloseTime time.Time
Interval string
OpenPrice float64
ClosePrice float64
HighPrice float64
LowPrice float64
Volume float64
}
// WebsocketPositionUpdated reflects a change in orders/contracts on an exchange
type WebsocketPositionUpdated struct {
Timestamp time.Time
Pair currency.Pair
AssetType string
Exchange string
}
// WebsocketConnection contains all the data needed to send a message to a WS
type WebsocketConnection struct {
sync.Mutex
Verbose bool
RateLimit float64
ExchangeName string
URL string
ProxyURL string
Wg sync.WaitGroup
Connection *websocket.Conn
Shutdown chan struct{}
// These are the request IDs and the corresponding response JSON
IDResponses map[int64][]byte
ResponseCheckTimeout time.Duration
ResponseMaxLimit time.Duration
}

View File

@@ -0,0 +1,242 @@
package wsorderbook
import (
"fmt"
"sort"
"sync"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
)
// Setup sets private variables
func (w *WebsocketOrderbookLocal) Setup(obBufferLimit int, bufferEnabled, sortBuffer, sortBufferByUpdateIDs, updateEntriesByID bool, exchangeName string) {
w.obBufferLimit = obBufferLimit
w.bufferEnabled = bufferEnabled
w.sortBuffer = sortBuffer
w.sortBufferByUpdateIDs = sortBufferByUpdateIDs
w.updateEntriesByID = updateEntriesByID
w.exchangeName = exchangeName
}
// Update updates a local cache using bid targets and ask targets then updates
// main orderbook
// Volume == 0; deletion at price target
// Price target not found; append of price target
// Price target found; amend volume of price target
func (w *WebsocketOrderbookLocal) Update(orderbookUpdate *WebsocketOrderbookUpdate) error {
if (orderbookUpdate.Bids == nil && orderbookUpdate.Asks == nil) ||
(len(orderbookUpdate.Bids) == 0 && len(orderbookUpdate.Asks) == 0) {
return fmt.Errorf("%v cannot have bids and ask targets both nil", w.exchangeName)
}
w.m.Lock()
defer w.m.Unlock()
if _, ok := w.ob[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType]; !ok {
return fmt.Errorf("ob.Base could not be found for Exchange %s CurrencyPair: %s AssetType: %s",
w.exchangeName,
orderbookUpdate.CurrencyPair.String(),
orderbookUpdate.AssetType)
}
if w.bufferEnabled {
overBufferLimit := w.processBufferUpdate(orderbookUpdate)
if !overBufferLimit {
return nil
}
} else {
w.processObUpdate(orderbookUpdate)
}
err := w.ob[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType].Process()
if err != nil {
return err
}
if w.bufferEnabled {
// Reset the buffer
w.buffer[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType] = nil
}
return nil
}
func (w *WebsocketOrderbookLocal) processBufferUpdate(orderbookUpdate *WebsocketOrderbookUpdate) bool {
if w.buffer == nil {
w.buffer = make(map[currency.Pair]map[string][]WebsocketOrderbookUpdate)
}
if w.buffer[orderbookUpdate.CurrencyPair] == nil {
w.buffer[orderbookUpdate.CurrencyPair] = make(map[string][]WebsocketOrderbookUpdate)
}
if len(w.buffer[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType]) <= w.obBufferLimit {
w.buffer[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType] = append(w.buffer[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType], *orderbookUpdate)
if len(w.buffer[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType]) < w.obBufferLimit {
return false
}
}
if w.sortBuffer {
// sort by last updated to ensure each update is in order
if w.sortBufferByUpdateIDs {
sort.Slice(w.buffer[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType], func(i, j int) bool {
return w.buffer[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType][i].UpdateID < w.buffer[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType][j].UpdateID
})
} else {
sort.Slice(w.buffer[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType], func(i, j int) bool {
return w.buffer[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType][i].UpdateTime.Before(w.buffer[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType][j].UpdateTime)
})
}
}
for i := 0; i < len(w.buffer[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType]); i++ {
w.processObUpdate(&w.buffer[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType][i])
}
return true
}
func (w *WebsocketOrderbookLocal) processObUpdate(orderbookUpdate *WebsocketOrderbookUpdate) {
if w.updateEntriesByID {
w.updateByIDAndAction(orderbookUpdate)
} else {
var wg sync.WaitGroup
wg.Add(2)
go w.updateAsksByPrice(orderbookUpdate, &wg)
go w.updateBidsByPrice(orderbookUpdate, &wg)
wg.Wait()
}
}
func (w *WebsocketOrderbookLocal) updateAsksByPrice(base *WebsocketOrderbookUpdate, wg *sync.WaitGroup) {
for j := 0; j < len(base.Asks); j++ {
found := false
for k := 0; k < len(w.ob[base.CurrencyPair][base.AssetType].Asks); k++ {
if w.ob[base.CurrencyPair][base.AssetType].Asks[k].Price == base.Asks[j].Price {
found = true
if base.Asks[j].Amount == 0 {
w.ob[base.CurrencyPair][base.AssetType].Asks = append(w.ob[base.CurrencyPair][base.AssetType].Asks[:k],
w.ob[base.CurrencyPair][base.AssetType].Asks[k+1:]...)
break
}
w.ob[base.CurrencyPair][base.AssetType].Asks[k].Amount = base.Asks[j].Amount
break
}
}
if !found {
w.ob[base.CurrencyPair][base.AssetType].Asks = append(w.ob[base.CurrencyPair][base.AssetType].Asks, base.Asks[j])
}
}
sort.Slice(w.ob[base.CurrencyPair][base.AssetType].Asks, func(i, j int) bool {
return w.ob[base.CurrencyPair][base.AssetType].Asks[i].Price < w.ob[base.CurrencyPair][base.AssetType].Asks[j].Price
})
wg.Done()
}
func (w *WebsocketOrderbookLocal) updateBidsByPrice(base *WebsocketOrderbookUpdate, wg *sync.WaitGroup) {
for j := 0; j < len(base.Bids); j++ {
found := false
for k := 0; k < len(w.ob[base.CurrencyPair][base.AssetType].Bids); k++ {
if w.ob[base.CurrencyPair][base.AssetType].Bids[k].Price == base.Bids[j].Price {
found = true
if base.Bids[j].Amount == 0 {
w.ob[base.CurrencyPair][base.AssetType].Bids = append(w.ob[base.CurrencyPair][base.AssetType].Bids[:k],
w.ob[base.CurrencyPair][base.AssetType].Bids[k+1:]...)
break
}
w.ob[base.CurrencyPair][base.AssetType].Bids[k].Amount = base.Bids[j].Amount
break
}
}
if !found {
w.ob[base.CurrencyPair][base.AssetType].Bids = append(w.ob[base.CurrencyPair][base.AssetType].Bids, base.Bids[j])
}
}
sort.Slice(w.ob[base.CurrencyPair][base.AssetType].Bids, func(i, j int) bool {
return w.ob[base.CurrencyPair][base.AssetType].Bids[i].Price > w.ob[base.CurrencyPair][base.AssetType].Bids[j].Price
})
wg.Done()
}
// updateByIDAndAction will receive an action to execute against the orderbook
// it will then match by IDs instead of price to perform the action
func (w *WebsocketOrderbookLocal) updateByIDAndAction(orderbookUpdate *WebsocketOrderbookUpdate) {
switch orderbookUpdate.Action {
case "update":
for _, target := range orderbookUpdate.Bids {
for i := range w.ob[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType].Bids {
if w.ob[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType].Bids[i].ID == target.ID {
w.ob[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType].Bids[i].Amount = target.Amount
break
}
}
}
for _, target := range orderbookUpdate.Asks {
for i := range w.ob[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType].Asks {
if w.ob[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType].Asks[i].ID == target.ID {
w.ob[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType].Asks[i].Amount = target.Amount
break
}
}
}
case "delete":
for _, target := range orderbookUpdate.Bids {
for i := 0; i < len(w.ob[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType].Bids); i++ {
if w.ob[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType].Bids[i].ID == target.ID {
w.ob[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType].Bids = append(w.ob[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType].Bids[:i],
w.ob[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType].Bids[i+1:]...)
i--
break
}
}
}
for _, target := range orderbookUpdate.Asks {
for i := 0; i < len(w.ob[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType].Asks); i++ {
if w.ob[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType].Asks[i].ID == target.ID {
w.ob[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType].Asks = append(w.ob[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType].Asks[:i],
w.ob[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType].Asks[i+1:]...)
i--
break
}
}
}
case "insert":
w.ob[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType].Bids = append(w.ob[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType].Bids, orderbookUpdate.Bids...)
w.ob[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType].Asks = append(w.ob[orderbookUpdate.CurrencyPair][orderbookUpdate.AssetType].Asks, orderbookUpdate.Asks...)
}
}
// LoadSnapshot loads initial snapshot of ob data, overwrite allows full
// ob to be completely rewritten because the exchange is a doing a full
// update not an incremental one
func (w *WebsocketOrderbookLocal) LoadSnapshot(newOrderbook *orderbook.Base, overwrite bool) error {
if len(newOrderbook.Asks) == 0 || len(newOrderbook.Bids) == 0 {
return fmt.Errorf("%v snapshot ask and bids are nil", w.exchangeName)
}
w.m.Lock()
defer w.m.Unlock()
if w.ob == nil {
w.ob = make(map[currency.Pair]map[string]*orderbook.Base)
}
if w.ob[newOrderbook.Pair] == nil {
w.ob[newOrderbook.Pair] = make(map[string]*orderbook.Base)
}
if w.ob[newOrderbook.Pair][newOrderbook.AssetType] != nil &&
(len(w.ob[newOrderbook.Pair][newOrderbook.AssetType].Asks) > 0 ||
len(w.ob[newOrderbook.Pair][newOrderbook.AssetType].Bids) > 0) {
if overwrite {
w.ob[newOrderbook.Pair][newOrderbook.AssetType] = newOrderbook
return newOrderbook.Process()
}
return fmt.Errorf("%v snapshot instance already found", w.exchangeName)
}
w.ob[newOrderbook.Pair][newOrderbook.AssetType] = newOrderbook
return newOrderbook.Process()
}
// GetOrderbook use sparingly. Modifying anything here will ruin hash calculation and cause problems
func (w *WebsocketOrderbookLocal) GetOrderbook(p currency.Pair, assetType string) *orderbook.Base {
w.m.Lock()
defer w.m.Unlock()
return w.ob[p][assetType]
}
// FlushCache flushes w.ob data to be garbage collected and refreshed when a
// connection is lost and reconnected
func (w *WebsocketOrderbookLocal) FlushCache() {
w.m.Lock()
w.ob = nil
w.buffer = nil
w.m.Unlock()
}

View File

@@ -0,0 +1,582 @@
package wsorderbook
import (
"fmt"
"math/rand"
"testing"
"time"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
)
var itemArray = [][]orderbook.Item{
{{Price: 1000, Amount: 1, ID: 1}},
{{Price: 2000, Amount: 1, ID: 2}},
{{Price: 3000, Amount: 1, ID: 3}},
{{Price: 3000, Amount: 2, ID: 4}},
{{Price: 4000, Amount: 0, ID: 6}},
{{Price: 5000, Amount: 1, ID: 5}},
}
const (
exchangeName = "exchangeTest"
spot = orderbook.Spot
)
func createSnapshot() (obl *WebsocketOrderbookLocal, curr currency.Pair, asks, bids []orderbook.Item, err error) {
var snapShot1 orderbook.Base
curr = currency.NewPairFromString("BTCUSD")
asks = []orderbook.Item{
{Price: 4000, Amount: 1, ID: 6},
}
bids = []orderbook.Item{
{Price: 4000, Amount: 1, ID: 6},
}
snapShot1.Asks = asks
snapShot1.Bids = bids
snapShot1.AssetType = spot
snapShot1.Pair = curr
obl = &WebsocketOrderbookLocal{}
err = obl.LoadSnapshot(&snapShot1, false)
return
}
// BenchmarkBufferPerformance demonstrates buffer more performant than multi process calls
func BenchmarkBufferPerformance(b *testing.B) {
obl, curr, asks, bids, err := createSnapshot()
if err != nil {
b.Fatal(err)
}
obl.exchangeName = exchangeName
obl.sortBuffer = true
update := &WebsocketOrderbookUpdate{
Bids: bids,
Asks: asks,
CurrencyPair: curr,
UpdateTime: time.Now(),
AssetType: spot,
}
for i := 0; i < b.N; i++ {
randomIndex := rand.Intn(5)
update.Asks = itemArray[randomIndex]
update.Bids = itemArray[randomIndex]
err = obl.Update(update)
if err != nil {
b.Fatal(err)
}
}
}
// BenchmarkBufferSortingPerformance benchmark
func BenchmarkBufferSortingPerformance(b *testing.B) {
obl, curr, asks, bids, err := createSnapshot()
if err != nil {
b.Fatal(err)
}
obl.exchangeName = exchangeName
obl.sortBuffer = true
obl.bufferEnabled = true
obl.obBufferLimit = 5
update := &WebsocketOrderbookUpdate{
Bids: bids,
Asks: asks,
CurrencyPair: curr,
UpdateTime: time.Now(),
AssetType: spot,
}
for i := 0; i < b.N; i++ {
randomIndex := rand.Intn(5)
update.Asks = itemArray[randomIndex]
update.Bids = itemArray[randomIndex]
err = obl.Update(update)
if err != nil {
b.Fatal(err)
}
}
}
// BenchmarkNoBufferPerformance demonstrates orderbook process less performant than buffer
func BenchmarkNoBufferPerformance(b *testing.B) {
obl, curr, asks, bids, err := createSnapshot()
if err != nil {
b.Fatal(err)
}
obl.exchangeName = exchangeName
update := &WebsocketOrderbookUpdate{
Bids: bids,
Asks: asks,
CurrencyPair: curr,
UpdateTime: time.Now(),
AssetType: spot,
}
for i := 0; i < b.N; i++ {
randomIndex := rand.Intn(5)
update.Asks = itemArray[randomIndex]
update.Bids = itemArray[randomIndex]
err = obl.Update(update)
if err != nil {
b.Fatal(err)
}
}
}
// TestHittingTheBuffer logic test
func TestHittingTheBuffer(t *testing.T) {
obl, curr, _, _, err := createSnapshot()
if err != nil {
t.Fatal(err)
}
obl.exchangeName = exchangeName
obl.bufferEnabled = true
obl.obBufferLimit = 5
for i := 0; i < len(itemArray); i++ {
asks := itemArray[i]
bids := itemArray[i]
err = obl.Update(&WebsocketOrderbookUpdate{
Bids: bids,
Asks: asks,
CurrencyPair: curr,
UpdateTime: time.Now(),
AssetType: spot,
})
if err != nil {
t.Fatal(err)
}
}
if len(obl.ob[curr][spot].Asks) != 3 {
t.Log(obl.ob[curr][spot])
t.Errorf("expected 3 entries, received: %v", len(obl.ob[curr][spot].Asks))
}
if len(obl.ob[curr][spot].Bids) != 3 {
t.Errorf("expected 3 entries, received: %v", len(obl.ob[curr][spot].Bids))
}
}
// TestInsertWithIDs logic test
func TestInsertWithIDs(t *testing.T) {
obl, curr, _, _, err := createSnapshot()
if err != nil {
t.Fatal(err)
}
obl.exchangeName = exchangeName
obl.bufferEnabled = true
obl.updateEntriesByID = true
obl.obBufferLimit = 5
for i := 0; i < len(itemArray); i++ {
asks := itemArray[i]
bids := itemArray[i]
err = obl.Update(&WebsocketOrderbookUpdate{
Bids: bids,
Asks: asks,
CurrencyPair: curr,
UpdateTime: time.Now(),
AssetType: spot,
Action: "insert",
})
if err != nil {
t.Fatal(err)
}
}
if len(obl.ob[curr][spot].Asks) != 6 {
t.Errorf("expected 6 entries, received: %v", len(obl.ob[curr][spot].Asks))
}
if len(obl.ob[curr][spot].Bids) != 6 {
t.Errorf("expected 6 entries, received: %v", len(obl.ob[curr][spot].Bids))
}
}
// TestSortIDs logic test
func TestSortIDs(t *testing.T) {
obl, curr, _, _, err := createSnapshot()
if err != nil {
t.Fatal(err)
}
obl.exchangeName = exchangeName
obl.bufferEnabled = true
obl.sortBufferByUpdateIDs = true
obl.sortBuffer = true
obl.obBufferLimit = 5
for i := 0; i < len(itemArray); i++ {
asks := itemArray[i]
bids := itemArray[i]
err = obl.Update(&WebsocketOrderbookUpdate{
Bids: bids,
Asks: asks,
CurrencyPair: curr,
UpdateID: int64(i),
AssetType: spot,
})
if err != nil {
t.Fatal(err)
}
}
if len(obl.ob[curr][spot].Asks) != 3 {
t.Errorf("expected 6 entries, received: %v", len(obl.ob[curr][spot].Asks))
}
if len(obl.ob[curr][spot].Bids) != 3 {
t.Errorf("expected 6 entries, received: %v", len(obl.ob[curr][spot].Bids))
}
}
// TestDeleteWithIDs logic test
func TestDeleteWithIDs(t *testing.T) {
obl, curr, _, _, err := createSnapshot()
if err != nil {
t.Fatal(err)
}
obl.exchangeName = exchangeName
obl.updateEntriesByID = true
for i := 0; i < len(itemArray); i++ {
asks := itemArray[i]
bids := itemArray[i]
err = obl.Update(&WebsocketOrderbookUpdate{
Bids: bids,
Asks: asks,
CurrencyPair: curr,
UpdateTime: time.Now(),
AssetType: spot,
Action: "delete",
})
if err != nil {
t.Fatal(err)
}
}
if len(obl.ob[curr][spot].Asks) != 0 {
t.Errorf("expected 0 entries, received: %v", len(obl.ob[curr][spot].Asks))
}
if len(obl.ob[curr][spot].Bids) != 0 {
t.Errorf("expected 0 entries, received: %v", len(obl.ob[curr][spot].Bids))
}
}
// TestUpdateWithIDs logic test
func TestUpdateWithIDs(t *testing.T) {
obl, curr, _, _, err := createSnapshot()
if err != nil {
t.Fatal(err)
}
obl.exchangeName = exchangeName
obl.updateEntriesByID = true
for i := 0; i < len(itemArray); i++ {
asks := itemArray[i]
bids := itemArray[i]
err = obl.Update(&WebsocketOrderbookUpdate{
Bids: bids,
Asks: asks,
CurrencyPair: curr,
UpdateTime: time.Now(),
AssetType: spot,
Action: "update",
})
if err != nil {
t.Fatal(err)
}
}
if len(obl.ob[curr][spot].Asks) != 1 {
t.Log(obl.ob[curr][spot])
t.Errorf("expected 1 entries, received: %v", len(obl.ob[curr][spot].Asks))
}
if len(obl.ob[curr][spot].Bids) != 1 {
t.Errorf("expected 1 entries, received: %v", len(obl.ob[curr][spot].Bids))
}
}
// TestOutOfOrderIDs logic test
func TestOutOfOrderIDs(t *testing.T) {
obl, curr, _, _, err := createSnapshot()
if err != nil {
t.Fatal(err)
}
outOFOrderIDs := []int64{2, 1, 5, 3, 4, 6}
if itemArray[0][0].Price != 1000 {
t.Errorf("expected sorted price to be 3000, received: %v", itemArray[1][0].Price)
}
obl.exchangeName = exchangeName
obl.bufferEnabled = true
obl.sortBuffer = true
obl.obBufferLimit = 5
for i := 0; i < len(itemArray); i++ {
asks := itemArray[i]
err = obl.Update(&WebsocketOrderbookUpdate{
Asks: asks,
CurrencyPair: curr,
UpdateID: outOFOrderIDs[i],
AssetType: spot,
})
if err != nil {
t.Fatal(err)
}
}
// Index 1 since index 0 is price 7000
if obl.ob[curr][spot].Asks[1].Price != 2000 {
t.Errorf("expected sorted price to be 3000, received: %v", obl.ob[curr][spot].Asks[1].Price)
}
}
// TestRunUpdateWithoutSnapshot logic test
func TestRunUpdateWithoutSnapshot(t *testing.T) {
var obl WebsocketOrderbookLocal
var snapShot1 orderbook.Base
curr := currency.NewPairFromString("BTCUSD")
asks := []orderbook.Item{
{Price: 4000, Amount: 1, ID: 8},
}
bids := []orderbook.Item{
{Price: 5999, Amount: 1, ID: 8},
{Price: 4000, Amount: 1, ID: 9},
}
snapShot1.Asks = asks
snapShot1.Bids = bids
snapShot1.AssetType = spot
snapShot1.Pair = curr
obl.exchangeName = exchangeName
err := obl.Update(&WebsocketOrderbookUpdate{
Bids: bids,
Asks: asks,
CurrencyPair: curr,
UpdateTime: time.Now(),
AssetType: spot,
})
if err == nil {
t.Fatal("expected an error running update with no snapshot loaded")
}
if err.Error() != "ob.Base could not be found for Exchange exchangeTest CurrencyPair: BTCUSD AssetType: SPOT" {
t.Fatal(err)
}
}
// TestRunUpdateWithoutAnyUpdates logic test
func TestRunUpdateWithoutAnyUpdates(t *testing.T) {
var obl WebsocketOrderbookLocal
var snapShot1 orderbook.Base
curr := currency.NewPairFromString("BTCUSD")
snapShot1.Asks = []orderbook.Item{}
snapShot1.Bids = []orderbook.Item{}
snapShot1.AssetType = spot
snapShot1.Pair = curr
obl.exchangeName = exchangeName
err := obl.Update(&WebsocketOrderbookUpdate{
Bids: snapShot1.Asks,
Asks: snapShot1.Bids,
CurrencyPair: curr,
UpdateTime: time.Now(),
AssetType: spot,
})
if err == nil {
t.Fatal("expected an error running update with no snapshot loaded")
}
if err.Error() != fmt.Sprintf("%v cannot have bids and ask targets both nil", exchangeName) {
t.Fatal("expected nil asks and bids error")
}
}
// TestRunSnapshotWithNoData logic test
func TestRunSnapshotWithNoData(t *testing.T) {
var obl WebsocketOrderbookLocal
var snapShot1 orderbook.Base
curr := currency.NewPairFromString("BTCUSD")
snapShot1.Asks = []orderbook.Item{}
snapShot1.Bids = []orderbook.Item{}
snapShot1.AssetType = spot
snapShot1.Pair = curr
snapShot1.ExchangeName = "test"
obl.exchangeName = "test"
err := obl.LoadSnapshot(&snapShot1,
false)
if err == nil {
t.Fatal("expected an error loading a snapshot")
}
if err.Error() != "test snapshot ask and bids are nil" {
t.Fatal(err)
}
}
// TestLoadSnapshotWithOverride logic test
func TestLoadSnapshotWithOverride(t *testing.T) {
var obl WebsocketOrderbookLocal
var snapShot1 orderbook.Base
curr := currency.NewPairFromString("BTCUSD")
asks := []orderbook.Item{
{Price: 4000, Amount: 1, ID: 8},
}
bids := []orderbook.Item{
{Price: 4000, Amount: 1, ID: 9},
}
snapShot1.Asks = asks
snapShot1.Bids = bids
snapShot1.AssetType = spot
snapShot1.Pair = curr
err := obl.LoadSnapshot(&snapShot1, false)
if err != nil {
t.Error(err)
}
err = obl.LoadSnapshot(&snapShot1, false)
if err == nil {
t.Error("expected error: 'snapshot instance already found'")
}
err = obl.LoadSnapshot(&snapShot1, true)
if err != nil {
t.Error(err)
}
}
// TestInsertWithIDs logic test
func TestFlushCache(t *testing.T) {
obl, curr, _, _, err := createSnapshot()
if err != nil {
t.Fatal(err)
}
if obl.ob[curr][spot] == nil {
t.Error("expected ob to have ask entries")
}
obl.FlushCache()
if obl.ob[curr][spot] != nil {
t.Error("expected ob be flushed")
}
}
// TestInsertingSnapShots logic test
func TestInsertingSnapShots(t *testing.T) {
var obl WebsocketOrderbookLocal
var snapShot1 orderbook.Base
asks := []orderbook.Item{
{Price: 6000, Amount: 1, ID: 1},
{Price: 6001, Amount: 0.5, ID: 2},
{Price: 6002, Amount: 2, ID: 3},
{Price: 6003, Amount: 3, ID: 4},
{Price: 6004, Amount: 5, ID: 5},
{Price: 6005, Amount: 2, ID: 6},
{Price: 6006, Amount: 1.5, ID: 7},
{Price: 6007, Amount: 0.5, ID: 8},
{Price: 6008, Amount: 23, ID: 9},
{Price: 6009, Amount: 9, ID: 10},
{Price: 6010, Amount: 7, ID: 11},
}
bids := []orderbook.Item{
{Price: 5999, Amount: 1, ID: 12},
{Price: 5998, Amount: 0.5, ID: 13},
{Price: 5997, Amount: 2, ID: 14},
{Price: 5996, Amount: 3, ID: 15},
{Price: 5995, Amount: 5, ID: 16},
{Price: 5994, Amount: 2, ID: 17},
{Price: 5993, Amount: 1.5, ID: 18},
{Price: 5992, Amount: 0.5, ID: 19},
{Price: 5991, Amount: 23, ID: 20},
{Price: 5990, Amount: 9, ID: 21},
{Price: 5989, Amount: 7, ID: 22},
}
snapShot1.Asks = asks
snapShot1.Bids = bids
snapShot1.AssetType = spot
snapShot1.Pair = currency.NewPairFromString("BTCUSD")
err := obl.LoadSnapshot(&snapShot1, false)
if err != nil {
t.Fatal(err)
}
var snapShot2 orderbook.Base
asks = []orderbook.Item{
{Price: 51, Amount: 1, ID: 1},
{Price: 52, Amount: 0.5, ID: 2},
{Price: 53, Amount: 2, ID: 3},
{Price: 54, Amount: 3, ID: 4},
{Price: 55, Amount: 5, ID: 5},
{Price: 56, Amount: 2, ID: 6},
{Price: 57, Amount: 1.5, ID: 7},
{Price: 58, Amount: 0.5, ID: 8},
{Price: 59, Amount: 23, ID: 9},
{Price: 50, Amount: 9, ID: 10},
{Price: 60, Amount: 7, ID: 11},
}
bids = []orderbook.Item{
{Price: 49, Amount: 1, ID: 12},
{Price: 48, Amount: 0.5, ID: 13},
{Price: 47, Amount: 2, ID: 14},
{Price: 46, Amount: 3, ID: 15},
{Price: 45, Amount: 5, ID: 16},
{Price: 44, Amount: 2, ID: 17},
{Price: 43, Amount: 1.5, ID: 18},
{Price: 42, Amount: 0.5, ID: 19},
{Price: 41, Amount: 23, ID: 20},
{Price: 40, Amount: 9, ID: 21},
{Price: 39, Amount: 7, ID: 22},
}
snapShot2.Asks = asks
snapShot2.Bids = bids
snapShot2.AssetType = spot
snapShot2.Pair = currency.NewPairFromString("LTCUSD")
err = obl.LoadSnapshot(&snapShot2, false)
if err != nil {
t.Fatal(err)
}
var snapShot3 orderbook.Base
asks = []orderbook.Item{
{Price: 511, Amount: 1, ID: 1},
{Price: 52, Amount: 0.5, ID: 2},
{Price: 53, Amount: 2, ID: 3},
{Price: 54, Amount: 3, ID: 4},
{Price: 55, Amount: 5, ID: 5},
{Price: 56, Amount: 2, ID: 6},
{Price: 57, Amount: 1.5, ID: 7},
{Price: 58, Amount: 0.5, ID: 8},
{Price: 59, Amount: 23, ID: 9},
{Price: 50, Amount: 9, ID: 10},
{Price: 60, Amount: 7, ID: 11},
}
bids = []orderbook.Item{
{Price: 49, Amount: 1, ID: 12},
{Price: 48, Amount: 0.5, ID: 13},
{Price: 47, Amount: 2, ID: 14},
{Price: 46, Amount: 3, ID: 15},
{Price: 45, Amount: 5, ID: 16},
{Price: 44, Amount: 2, ID: 17},
{Price: 43, Amount: 1.5, ID: 18},
{Price: 42, Amount: 0.5, ID: 19},
{Price: 41, Amount: 23, ID: 20},
{Price: 40, Amount: 9, ID: 21},
{Price: 39, Amount: 7, ID: 22},
}
snapShot3.Asks = asks
snapShot3.Bids = bids
snapShot3.AssetType = "FUTURES"
snapShot3.Pair = currency.NewPairFromString("LTCUSD")
err = obl.LoadSnapshot(&snapShot3, false)
if err != nil {
t.Fatal(err)
}
if obl.ob[snapShot1.Pair][snapShot1.AssetType].Asks[0] != snapShot1.Asks[0] {
t.Errorf("loaded data mismatch. Expected %v, received %v", snapShot1.Asks[0], obl.ob[snapShot1.Pair][snapShot1.AssetType].Asks[0])
}
if obl.ob[snapShot2.Pair][snapShot2.AssetType].Asks[0] != snapShot2.Asks[0] {
t.Errorf("loaded data mismatch. Expected %v, received %v", snapShot2.Asks[0], obl.ob[snapShot2.Pair][snapShot2.AssetType].Asks[0])
}
if obl.ob[snapShot3.Pair][snapShot3.AssetType].Asks[0] != snapShot3.Asks[0] {
t.Errorf("loaded data mismatch. Expected %v, received %v", snapShot3.Asks[0], obl.ob[snapShot3.Pair][snapShot3.AssetType].Asks[0])
}
}
func TestGetOrderbook(t *testing.T) {
obl, curr, _, _, err := createSnapshot()
if err != nil {
t.Fatal(err)
}
ob := obl.GetOrderbook(curr, spot)
if obl.ob[curr][spot] != ob {
t.Error("Failed to get orderbook")
}
}
func TestSetup(t *testing.T) {
w := WebsocketOrderbookLocal{}
w.Setup(1, true, true, true, true, "hi")
if w.obBufferLimit != 1 || !w.bufferEnabled || !w.sortBuffer || !w.sortBufferByUpdateIDs || !w.updateEntriesByID || w.exchangeName != "hi" {
t.Errorf("Setup incorrectly loaded %v", w)
}
}

View File

@@ -0,0 +1,34 @@
package wsorderbook
import (
"sync"
"time"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
)
// WebsocketOrderbookLocal defines a local cache of orderbooks for amending,
// appending and deleting changes and updates the main store in wsorderbook.go
type WebsocketOrderbookLocal struct {
ob map[currency.Pair]map[string]*orderbook.Base
buffer map[currency.Pair]map[string][]WebsocketOrderbookUpdate
obBufferLimit int
bufferEnabled bool
sortBuffer bool
sortBufferByUpdateIDs bool // When timestamps aren't provided, an id can help sort
updateEntriesByID bool // Use the update IDs to match ob entries
exchangeName string
m sync.Mutex
}
// WebsocketOrderbookUpdate stores orderbook updates and dictates what features to use when processing
type WebsocketOrderbookUpdate struct {
UpdateID int64 // Used when no time is provided
UpdateTime time.Time
AssetType string
Action string // Used in conjunction with UpdateEntriesByID
Bids []orderbook.Item
Asks []orderbook.Item
CurrencyPair currency.Pair
}