diff --git a/currency/manager.go b/currency/manager.go index a3c682ce..c3d22769 100644 --- a/currency/manager.go +++ b/currency/manager.go @@ -36,8 +36,8 @@ var ( // GetAssetTypes returns a list of stored asset types func (p *PairsManager) GetAssetTypes(enabled bool) asset.Items { - p.m.RLock() - defer p.m.RUnlock() + p.mutex.RLock() + defer p.mutex.RUnlock() assetTypes := make(asset.Items, 0, len(p.Pairs)) for k, ps := range p.Pairs { if enabled && (ps.AssetEnabled == nil || !*ps.AssetEnabled) { @@ -54,8 +54,8 @@ func (p *PairsManager) Get(a asset.Item) (*PairStore, error) { return nil, fmt.Errorf("%s %w", a, asset.ErrNotSupported) } - p.m.RLock() - defer p.m.RUnlock() + p.mutex.RLock() + defer p.mutex.RUnlock() c, ok := p.Pairs[a] if !ok { return nil, @@ -73,20 +73,20 @@ func (p *PairsManager) Store(a asset.Item, ps *PairStore) error { if err != nil { return err } - p.m.Lock() + p.mutex.Lock() if p.Pairs == nil { p.Pairs = make(map[asset.Item]*PairStore) } p.Pairs[a] = cpy - p.m.Unlock() + p.mutex.Unlock() return nil } // Delete deletes a map entry based on the supplied asset type func (p *PairsManager) Delete(a asset.Item) { - p.m.Lock() + p.mutex.Lock() delete(p.Pairs, a) - p.m.Unlock() + p.mutex.Unlock() } // GetPairs gets a list of stored pairs based on the asset type and whether @@ -96,8 +96,8 @@ func (p *PairsManager) GetPairs(a asset.Item, enabled bool) (Pairs, error) { return nil, fmt.Errorf("%s %w", a, asset.ErrNotSupported) } - p.m.RLock() - defer p.m.RUnlock() + p.mutex.RLock() + defer p.mutex.RUnlock() pairStore, ok := p.Pairs[a] if !ok { return nil, nil @@ -137,8 +137,8 @@ func (p *PairsManager) StoreFormat(a asset.Item, pFmt *PairFormat, config bool) cpy := *pFmt - p.m.Lock() - defer p.m.Unlock() + p.mutex.Lock() + defer p.mutex.Unlock() if p.Pairs == nil { p.Pairs = make(map[asset.Item]*PairStore) @@ -170,8 +170,8 @@ func (p *PairsManager) StorePairs(a asset.Item, pairs Pairs, enabled bool) error cpy := make(Pairs, len(pairs)) copy(cpy, pairs) - p.m.Lock() - defer p.m.Unlock() + p.mutex.Lock() + defer p.mutex.Unlock() if p.Pairs == nil { p.Pairs = make(map[asset.Item]*PairStore) @@ -200,8 +200,8 @@ func (p *PairsManager) EnsureOnePairEnabled() (Pair, asset.Item, error) { if p == nil { return EMPTYPAIR, asset.Empty, common.ErrNilPointer } - p.m.Lock() - defer p.m.Unlock() + p.mutex.Lock() + defer p.mutex.Unlock() for _, v := range p.Pairs { if v.AssetEnabled == nil || !*v.AssetEnabled || @@ -238,8 +238,8 @@ func (p *PairsManager) DisablePair(a asset.Item, pair Pair) error { return ErrCurrencyPairEmpty } - p.m.Lock() - defer p.m.Unlock() + p.mutex.Lock() + defer p.mutex.Unlock() pairStore, err := p.getPairStoreRequiresLock(a) if err != nil { @@ -265,8 +265,8 @@ func (p *PairsManager) EnablePair(a asset.Item, pair Pair) error { return ErrCurrencyPairEmpty } - p.m.Lock() - defer p.m.Unlock() + p.mutex.Lock() + defer p.mutex.Unlock() pairStore, err := p.getPairStoreRequiresLock(a) if err != nil { @@ -295,8 +295,8 @@ func (p *PairsManager) IsAssetPairEnabled(a asset.Item, pair Pair) error { return ErrCurrencyPairEmpty } - p.m.RLock() - defer p.m.RUnlock() + p.mutex.RLock() + defer p.mutex.RUnlock() pairStore, err := p.getPairStoreRequiresLock(a) if err != nil { @@ -322,8 +322,8 @@ func (p *PairsManager) IsAssetEnabled(a asset.Item) error { return fmt.Errorf("%s %w", a, asset.ErrNotSupported) } - p.m.RLock() - defer p.m.RUnlock() + p.mutex.RLock() + defer p.mutex.RUnlock() pairStore, err := p.getPairStoreRequiresLock(a) if err != nil { @@ -346,8 +346,8 @@ func (p *PairsManager) SetAssetEnabled(a asset.Item, enabled bool) error { return fmt.Errorf("%s %w", a, asset.ErrNotSupported) } - p.m.Lock() - defer p.m.Unlock() + p.mutex.Lock() + defer p.mutex.Unlock() pairStore, err := p.getPairStoreRequiresLock(a) if err != nil { @@ -369,6 +369,37 @@ func (p *PairsManager) SetAssetEnabled(a asset.Item, enabled bool) error { return nil } +// Load sets the pair manager from a seed without copying mutexes +func (p *PairsManager) Load(seed *PairsManager) error { + if seed == nil { + return fmt.Errorf("%w PairsManager", common.ErrNilPointer) + } + p.mutex.Lock() + defer p.mutex.Unlock() + seed.mutex.RLock() + defer seed.mutex.RUnlock() + + var pN PairsManager + j, err := json.Marshal(seed) + if err != nil { + return err + } + err = json.Unmarshal(j, &pN) + if err != nil { + return err + } + p.BypassConfigFormatUpgrades = pN.BypassConfigFormatUpgrades + if pN.UseGlobalFormat { + p.UseGlobalFormat = pN.UseGlobalFormat + p.RequestFormat = pN.RequestFormat + p.ConfigFormat = pN.ConfigFormat + } + p.LastUpdated = pN.LastUpdated + p.Pairs = pN.Pairs + + return nil +} + func (p *PairsManager) getPairStoreRequiresLock(a asset.Item) (*PairStore, error) { if p.Pairs == nil { return nil, fmt.Errorf("%w when requesting %v pairs", ErrPairManagerNotInitialised, a) diff --git a/currency/manager_test.go b/currency/manager_test.go index 5585c14e..66a2dd29 100644 --- a/currency/manager_test.go +++ b/currency/manager_test.go @@ -5,6 +5,8 @@ import ( "errors" "testing" + "github.com/stretchr/testify/assert" + "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/common/convert" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" ) @@ -672,3 +674,38 @@ func TestEnsureOnePairEnabled(t *testing.T) { t.Errorf("received: '%v' but expected: '%v'", err, ErrCurrencyPairsEmpty) } } + +func TestLoad(t *testing.T) { + t.Parallel() + base := PairsManager{} + fmt1 := &PairFormat{Uppercase: true} + fmt2 := &PairFormat{Uppercase: true, Delimiter: DashDelimiter} + p := NewPair(BTC, USDT) + tt := int64(1337) + seed := PairsManager{ + LastUpdated: tt, + UseGlobalFormat: true, + ConfigFormat: fmt1, + RequestFormat: fmt2, + Pairs: map[asset.Item]*PairStore{ + asset.Futures: { + AssetEnabled: convert.BoolPtr(true), + Available: []Pair{p}, + }, + asset.Options: { + AssetEnabled: convert.BoolPtr(false), + Available: []Pair{}, + }, + }, + } + + assert.ErrorIs(t, base.Load(nil), common.ErrNilPointer, "Load nil should error") + if assert.NoError(t, base.Load(&seed), "Loading from seed should not error") { + assert.True(t, *base.Pairs[asset.Futures].AssetEnabled, "Futures AssetEnabled should be true") + assert.True(t, base.Pairs[asset.Futures].Available.Contains(p, true), "Futures Available Pairs should contain BTCUSDT") + assert.False(t, *base.Pairs[asset.Options].AssetEnabled, "Options AssetEnabled should be false") + assert.Equal(t, tt, base.LastUpdated, "Last Updated should be correct") + assert.Equal(t, fmt1.Uppercase, base.ConfigFormat.Uppercase, "ConfigFormat Uppercase should be correct") + assert.Equal(t, fmt2.Delimiter, base.RequestFormat.Delimiter, "RequestFormat Delimiter should be correct") + } +} diff --git a/currency/manager_types.go b/currency/manager_types.go index 3575b824..2b94d7ca 100644 --- a/currency/manager_types.go +++ b/currency/manager_types.go @@ -8,13 +8,13 @@ import ( // PairsManager manages asset pairs type PairsManager struct { - BypassConfigFormatUpgrades bool `json:"bypassConfigFormatUpgrades"` - RequestFormat *PairFormat `json:"requestFormat,omitempty"` - ConfigFormat *PairFormat `json:"configFormat,omitempty"` - UseGlobalFormat bool `json:"useGlobalFormat,omitempty"` - LastUpdated int64 `json:"lastUpdated,omitempty"` - Pairs FullStore `json:"pairs"` - m sync.RWMutex + BypassConfigFormatUpgrades bool `json:"bypassConfigFormatUpgrades"` + RequestFormat *PairFormat `json:"requestFormat,omitempty"` + ConfigFormat *PairFormat `json:"configFormat,omitempty"` + UseGlobalFormat bool `json:"useGlobalFormat,omitempty"` + LastUpdated int64 `json:"lastUpdated,omitempty"` + Pairs FullStore `json:"pairs"` + mutex sync.RWMutex `json:"-"` } // FullStore holds all supported asset types with the enabled and available diff --git a/exchanges/bitstamp/bitstamp.go b/exchanges/bitstamp/bitstamp.go index 965f25e3..e30f98e3 100644 --- a/exchanges/bitstamp/bitstamp.go +++ b/exchanges/bitstamp/bitstamp.go @@ -31,6 +31,7 @@ const ( bitstampAPIEURUSD = "eur_usd" bitstampAPIBalance = "balance" bitstampAPIUserTransactions = "user_transactions" + bitstampAPIOHLC = "ohlc" bitstampAPIOpenOrders = "open_orders" bitstampAPIOrderStatus = "order_status" bitstampAPICancelOrder = "cancel_order" @@ -43,7 +44,12 @@ const ( bitstampAPITransferFromMain = "transfer-from-main" bitstampAPIReturnType = "string" bitstampAPITradingPairsInfo = "trading-pairs-info" - bitstampOHLC = "ohlc" + bitstampAPIWSAuthToken = "websockets_token" + bitstampAPIWSTrades = "live_trades" + bitstampAPIWSOrders = "live_orders" + bitstampAPIWSOrderbook = "order_book" + bitstampAPIWSMyOrders = "my_orders" + bitstampAPIWSMyTrades = "my_trades" bitstampRateInterval = time.Minute * 10 bitstampRequestRate = 8000 @@ -517,7 +523,7 @@ func (b *Bitstamp) OHLC(ctx context.Context, currency string, start, end time.Ti if !end.IsZero() { v.Add("end", strconv.FormatInt(end.Unix(), 10)) } - return resp, b.SendHTTPRequest(ctx, exchange.RestSpot, common.EncodeURLValues("/v"+bitstampAPIVersion+"/"+bitstampOHLC+"/"+currency, v), &resp) + return resp, b.SendHTTPRequest(ctx, exchange.RestSpot, common.EncodeURLValues("/v"+bitstampAPIVersion+"/"+bitstampAPIOHLC+"/"+currency, v), &resp) } // TransferAccountBalance transfers funds from either a main or sub account diff --git a/exchanges/bitstamp/bitstamp_test.go b/exchanges/bitstamp/bitstamp_test.go index 6afc6fdf..70e717bb 100644 --- a/exchanges/bitstamp/bitstamp_test.go +++ b/exchanges/bitstamp/bitstamp_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/core" "github.com/thrasher-corp/gocryptotrader/currency" @@ -750,11 +751,85 @@ func TestWsOrderbook2(t *testing.T) { } func TestWsOrderUpdate(t *testing.T) { - pressXToJSON := []byte(`{"data": {"microtimestamp": "1580336940972599", "amount": 0.6347086, "order_type": 0, "amount_str": "0.63470860", "price_str": "9350.49", "price": 9350.49, "id": 4621332237, "datetime": "1580336940"}, "event": "order_created", "channel": "live_orders_btcusd"}`) - err := b.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) + t.Parallel() + n := new(Bitstamp) + sharedtestvalues.TestFixtureToDataHandler(t, b, n, "testdata/wsMyOrders.json", n.wsHandleData) + seen := 0 + for reading := true; reading; { + select { + default: + reading = false + case resp := <-n.GetBase().Websocket.DataHandler: + seen++ + switch v := resp.(type) { + case *order.Detail: + switch seen { + case 1: + assert.Equal(t, "1658864794234880", v.OrderID, "OrderID") + assert.Equal(t, time.UnixMicro(1693831262313000), v.Date, "Date") + assert.Equal(t, "test_market_buy", v.ClientOrderID, "ClientOrderID") + assert.Equal(t, order.New, v.Status, "Status") + assert.Equal(t, order.Buy, v.Side, "Side") + assert.Equal(t, asset.Spot, v.AssetType, "AssetType") + assert.Equal(t, currency.NewPairWithDelimiter("BTC", "USD", "/"), v.Pair, "Pair") + assert.Equal(t, 0.0, v.ExecutedAmount, "ExecutedAmount") + assert.Equal(t, 999999999.0, v.Price, "Price") // Market Buy Price + // Note: Amount is 0 for market order create messages, oddly + case 2: + assert.Equal(t, "1658864794234880", v.OrderID, "OrderID") + assert.Equal(t, order.PartiallyFilled, v.Status, "Status") + assert.Equal(t, 0.00038667, v.Amount, "Amount") + assert.Equal(t, 0.00000001, v.RemainingAmount, "RemainingAmount") // During live tests we consistently got back this Sat remaining + assert.Equal(t, 0.00038666, v.ExecutedAmount, "ExecutedAmount") + assert.Equal(t, 25862.0, v.Price, "Price") + case 3: + assert.Equal(t, "1658864794234880", v.OrderID, "OrderID") + assert.Equal(t, order.Cancelled, v.Status, "Status") // Even though they probably consider it filled, Deleted + PartialFill = Cancelled + assert.Equal(t, 0.00038667, v.Amount, "Amount") + assert.Equal(t, 0.00000001, v.RemainingAmount, "RemainingAmount") + assert.Equal(t, 0.00038666, v.ExecutedAmount, "ExecutedAmount") + assert.Equal(t, 25862.0, v.Price, "Price") + case 4: + assert.Equal(t, "1658870500933632", v.OrderID, "OrderID") + assert.Equal(t, order.New, v.Status, "Status") + assert.Equal(t, order.Sell, v.Side, "Side") + assert.Equal(t, 0.0, v.Price, "Price") // Market Sell Price + case 5: + assert.Equal(t, "1658870500933632", v.OrderID, "OrderID") + assert.Equal(t, order.PartiallyFilled, v.Status, "Status") + assert.Equal(t, 0.00038679, v.Amount, "Amount") + assert.Equal(t, 0.00000001, v.RemainingAmount, "RemainingAmount") + assert.Equal(t, 0.00038678, v.ExecutedAmount, "ExecutedAmount") + assert.Equal(t, 25854.0, v.Price, "Price") + case 6: + assert.Equal(t, "1658870500933632", v.OrderID, "OrderID") + assert.Equal(t, order.Cancelled, v.Status, "Status") + assert.Equal(t, 0.00038679, v.Amount, "Amount") + assert.Equal(t, 0.00000001, v.RemainingAmount, "RemainingAmount") + assert.Equal(t, 0.00038678, v.ExecutedAmount, "ExecutedAmount") + assert.Equal(t, 25854.0, v.Price, "Price") + case 7: + assert.Equal(t, "1658869033291777", v.OrderID, "OrderID") + assert.Equal(t, order.New, v.Status, "Status") + assert.Equal(t, order.Sell, v.Side, "Side") + assert.Equal(t, 25845.0, v.Price, "Price") + assert.Equal(t, 0.00038692, v.Amount, "Amount") + case 8: + assert.Equal(t, "1658869033291777", v.OrderID, "OrderID") + assert.Equal(t, order.Filled, v.Status, "Status") + assert.Equal(t, 25845.0, v.Price, "Price") + assert.Equal(t, 0.00038692, v.Amount, "Amount") + assert.Equal(t, 0.0, v.RemainingAmount, "RemainingAmount") + assert.Equal(t, 0.00038692, v.ExecutedAmount, "ExecutedAmount") + } + case error: + t.Error(v) + default: + t.Errorf("Got unexpected data: %T %v", v, v) + } + } } + assert.Equal(t, 8, seen, "Number of messages") } func TestWsRequestReconnect(t *testing.T) { @@ -881,3 +956,14 @@ func TestGetOrderInfo(t *testing.T) { t.Error(err) } } + +func TestFetchWSAuth(t *testing.T) { + t.Parallel() + resp, err := b.FetchWSAuth(context.TODO()) + if assert.NoError(t, err, "FetchWSAuth should not error") { + assert.NotNil(t, resp, "resp should not be nil") + assert.Positive(t, resp.UserID, "UserID should be positive") + assert.Len(t, resp.Token, 32, "Token should be 32 chars") + assert.Positive(t, resp.ValidSecs, "ValidSecs should be positive") + } +} diff --git a/exchanges/bitstamp/bitstamp_type_convert.go b/exchanges/bitstamp/bitstamp_type_convert.go index 5c172f4d..e0af07f1 100644 --- a/exchanges/bitstamp/bitstamp_type_convert.go +++ b/exchanges/bitstamp/bitstamp_type_convert.go @@ -2,10 +2,63 @@ package bitstamp import ( "encoding/json" + "fmt" "strconv" "strings" + "time" + + "github.com/thrasher-corp/gocryptotrader/common/convert" + "github.com/thrasher-corp/gocryptotrader/exchanges/order" ) +// datetime provides an internal conversion helper +type datetime time.Time + +func (d *datetime) UnmarshalJSON(data []byte) error { + var s string + if err := json.Unmarshal(data, &s); err != nil { + return err + } + + t, err := convert.UnixTimestampStrToTime(s) + if err != nil { + return err + } + + *d = datetime(t) + + return nil +} + +// Time returns datetime cast directly as time.Time +func (d datetime) Time() time.Time { + return time.Time(d) +} + +// microTimestamp provides an internal conversion helper +type microTimestamp time.Time + +func (t *microTimestamp) UnmarshalJSON(data []byte) error { + var s string + if err := json.Unmarshal(data, &s); err != nil { + return err + } + + i, err := strconv.Atoi(s) + if err != nil { + return err + } + + *t = microTimestamp(time.UnixMicro(int64(i))) + + return nil +} + +// Time returns datetime cast directly as time.Time +func (t microTimestamp) Time() time.Time { + return time.Time(t) +} + // UnmarshalJSON deserializes JSON, and timestamp information. func (p *TradingPair) UnmarshalJSON(data []byte) error { type Alias TradingPair @@ -27,3 +80,26 @@ func (p *TradingPair) UnmarshalJSON(data []byte) error { p.MinimumOrder, err = strconv.ParseFloat(minOrderStr, 64) return err } + +type orderSide order.Side + +func (s *orderSide) UnmarshalJSON(data []byte) error { + var i int64 + if err := json.Unmarshal(data, &i); err != nil { + return err + } + switch i { + case 0: + *s = orderSide(order.Buy) + case 1: + *s = orderSide(order.Sell) + default: + return fmt.Errorf("invalid value for order side: %v", i) + } + + return nil +} + +func (s *orderSide) Side() order.Side { + return order.Side(*s) +} diff --git a/exchanges/bitstamp/bitstamp_types.go b/exchanges/bitstamp/bitstamp_types.go index 91ce6e5e..a544e9e6 100644 --- a/exchanges/bitstamp/bitstamp_types.go +++ b/exchanges/bitstamp/bitstamp_types.go @@ -195,11 +195,14 @@ type websocketEventRequest struct { type websocketData struct { Channel string `json:"channel"` + Auth string `json:"auth,omitempty"` } type websocketResponse struct { - Event string `json:"event"` - Channel string `json:"channel"` + Event string `json:"event"` + Channel string `json:"channel"` + channelType string + pair currency.Pair } type websocketTradeResponse struct { @@ -220,6 +223,13 @@ type websocketTradeData struct { ID int64 `json:"id"` } +// WebsocketAuthResponse holds the auth token for subscribing to auth channels +type WebsocketAuthResponse struct { + Token string `json:"token"` + UserID int64 `json:"user_id"` + ValidSecs int64 `json:"valid_sec"` +} + type websocketOrderBookResponse struct { websocketResponse Data websocketOrderBook `json:"data"` @@ -246,3 +256,21 @@ type OHLCResponse struct { } `json:"ohlc"` } `json:"data"` } + +type websocketOrderResponse struct { + websocketResponse + Order websocketOrderData `json:"data"` +} + +type websocketOrderData struct { + ID int64 `json:"id"` + IDStr string `json:"id_str"` + ClientOrderID string `json:"client_order_id"` + RemainingAmount float64 `json:"amount"` + ExecutedAmount float64 `json:"amount_traded,string"` // Not Cumulative; Partial fill amount + Amount float64 `json:"amount_at_create,string"` + Price float64 `json:"price"` + Side orderSide `json:"order_type"` + Datetime datetime `json:"datetime"` + Microtimestamp microTimestamp `json:"microtimestamp"` +} diff --git a/exchanges/bitstamp/bitstamp_websocket.go b/exchanges/bitstamp/bitstamp_websocket.go index 08b3ef60..e555f526 100644 --- a/exchanges/bitstamp/bitstamp_websocket.go +++ b/exchanges/bitstamp/bitstamp_websocket.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "net/http" "strconv" "strings" @@ -12,6 +13,7 @@ import ( "github.com/gorilla/websocket" "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/currency" + exchange "github.com/thrasher-corp/gocryptotrader/exchanges" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" @@ -25,7 +27,19 @@ const ( hbInterval = 8 * time.Second // Connection monitor defaults to 10s inactivity ) -var hbMsg = []byte(`{"event":"bts:heartbeat"}`) +var ( + hbMsg = []byte(`{"event":"bts:heartbeat"}`) + + defaultSubChannels = []string{ + bitstampAPIWSTrades, + bitstampAPIWSOrderbook, + } + + defaultAuthSubChannels = []string{ + bitstampAPIWSMyOrders, + bitstampAPIWSMyTrades, + } +) // WsConnect connects to a websocket feed func (b *Bitstamp) WsConnect() error { @@ -65,17 +79,19 @@ func (b *Bitstamp) wsReadData() { if resp.Raw == nil { return } - err := b.wsHandleData(resp.Raw) - if err != nil { + if err := b.wsHandleData(resp.Raw); err != nil { b.Websocket.DataHandler <- err } } } func (b *Bitstamp) wsHandleData(respRaw []byte) error { - var wsResponse websocketResponse - err := json.Unmarshal(respRaw, &wsResponse) - if err != nil { + wsResponse := &websocketResponse{} + if err := json.Unmarshal(respRaw, wsResponse); err != nil { + return err + } + + if err := b.parseChannelName(wsResponse); err != nil { return err } @@ -101,91 +117,19 @@ func (b *Bitstamp) wsHandleData(respRaw []byte) error { } }() // Connection monitor will reconnect case "data": - wsOrderBookTemp := websocketOrderBookResponse{} - err := json.Unmarshal(respRaw, &wsOrderBookTemp) - if err != nil { - return err - } - var currencyPair string - splitter := strings.Split(wsResponse.Channel, currency.UnderscoreDelimiter) - if len(splitter) == 3 { - currencyPair = splitter[2] - } else { - return errWSPairParsingError - } - pFmt, err := b.GetPairFormat(asset.Spot, true) - if err != nil { - return err - } - - enabledPairs, err := b.GetEnabledPairs(asset.Spot) - if err != nil { - return err - } - - p, err := currency.NewPairFromFormattedPairs(currencyPair, enabledPairs, pFmt) - if err != nil { - return err - } - - err = b.wsUpdateOrderbook(&wsOrderBookTemp.Data, p, asset.Spot) - if err != nil { + if err := b.handleWSOrderbook(wsResponse, respRaw); err != nil { return err } case "trade": - if !b.IsSaveTradeDataEnabled() { - return nil - } - wsTradeTemp := websocketTradeResponse{} - err := json.Unmarshal(respRaw, &wsTradeTemp) - if err != nil { + if err := b.handleWSTrade(wsResponse, respRaw); err != nil { return err } - - var currencyPair string - splitter := strings.Split(wsResponse.Channel, currency.UnderscoreDelimiter) - if len(splitter) == 3 { - currencyPair = splitter[2] - } else { - return errWSPairParsingError - } - pFmt, err := b.GetPairFormat(asset.Spot, true) - if err != nil { - return err - } - - enabledPairs, err := b.GetEnabledPairs(asset.Spot) - if err != nil { - return err - } - - p, err := currency.NewPairFromFormattedPairs(currencyPair, enabledPairs, pFmt) - if err != nil { - return err - } - - side := order.Buy - if wsTradeTemp.Data.Type == 1 { - side = order.Sell - } - var a asset.Item - a, err = b.GetPairAssetType(p) - if err != nil { - return err - } - return trade.AddTradesToBuffer(b.Name, trade.Data{ - Timestamp: time.Unix(wsTradeTemp.Data.Timestamp, 0), - CurrencyPair: p, - AssetType: a, - Exchange: b.Name, - Price: wsTradeTemp.Data.Price, - Amount: wsTradeTemp.Data.Amount, - Side: side, - TID: strconv.FormatInt(wsTradeTemp.Data.ID, 10), - }) case "order_created", "order_deleted", "order_changed": - if b.Verbose { - log.Debugf(log.ExchangeSys, "%v - Websocket order acknowledgement", b.Name) + // Only process MyOrders, not orders from the LiveOrder channel + if wsResponse.channelType == bitstampAPIWSMyOrders { + if err := b.handleWSOrder(wsResponse, respRaw); err != nil { + return err + } } default: b.Websocket.DataHandler <- stream.UnhandledMessageWarning{Message: b.Name + stream.UnhandledMessage + string(respRaw)} @@ -193,24 +137,129 @@ func (b *Bitstamp) wsHandleData(respRaw []byte) error { return nil } +func (b *Bitstamp) handleWSOrderbook(wsResp *websocketResponse, msg []byte) error { + if wsResp.pair.IsEmpty() { + return errWSPairParsingError + } + + wsOrderBookTemp := websocketOrderBookResponse{} + err := json.Unmarshal(msg, &wsOrderBookTemp) + if err != nil { + return err + } + + return b.wsUpdateOrderbook(&wsOrderBookTemp.Data, wsResp.pair, asset.Spot) +} + +func (b *Bitstamp) handleWSTrade(wsResp *websocketResponse, msg []byte) error { + if !b.IsSaveTradeDataEnabled() { + return nil + } + + if wsResp.pair.IsEmpty() { + return errWSPairParsingError + } + + wsTradeTemp := websocketTradeResponse{} + if err := json.Unmarshal(msg, &wsTradeTemp); err != nil { + return err + } + + side := order.Buy + if wsTradeTemp.Data.Type == 1 { + side = order.Sell + } + return trade.AddTradesToBuffer(b.Name, trade.Data{ + Timestamp: time.Unix(wsTradeTemp.Data.Timestamp, 0), + CurrencyPair: wsResp.pair, + AssetType: asset.Spot, + Exchange: b.Name, + Price: wsTradeTemp.Data.Price, + Amount: wsTradeTemp.Data.Amount, + Side: side, + TID: strconv.FormatInt(wsTradeTemp.Data.ID, 10), + }) +} + +func (b *Bitstamp) handleWSOrder(wsResp *websocketResponse, msg []byte) error { + r := &websocketOrderResponse{} + if err := json.Unmarshal(msg, &r); err != nil { + return err + } + + if r.Order.ID == 0 && r.Order.ClientOrderID == "" { + return fmt.Errorf("unable to parse an order id from order msg: %s", msg) + } + + var status order.Status + switch wsResp.Event { + case "order_created": + status = order.New + case "order_changed": + if r.Order.ExecutedAmount > 0 { + status = order.PartiallyFilled + } + case "order_deleted": + if r.Order.RemainingAmount == 0 && r.Order.Amount > 0 { + status = order.Filled + } else { + status = order.Cancelled + } + } + + // r.Order.ExecutedAmount is an atomic partial fill amount; We want total + executedAmount := r.Order.Amount - r.Order.RemainingAmount + + d := &order.Detail{ + Price: r.Order.Price, + Amount: r.Order.Amount, + RemainingAmount: r.Order.RemainingAmount, + ExecutedAmount: executedAmount, + Exchange: b.Name, + OrderID: r.Order.IDStr, + ClientOrderID: r.Order.ClientOrderID, + Side: r.Order.Side.Side(), + Status: status, + AssetType: asset.Spot, + Date: r.Order.Microtimestamp.Time(), + Pair: wsResp.pair, + } + + b.Websocket.DataHandler <- d + + return nil +} + func (b *Bitstamp) generateDefaultSubscriptions() ([]stream.ChannelSubscription, error) { - var channels = []string{"live_trades_", "order_book_"} enabledCurrencies, err := b.GetEnabledPairs(asset.Spot) if err != nil { return nil, err } var subscriptions []stream.ChannelSubscription - for i := range channels { - for j := range enabledCurrencies { - p, err := b.FormatExchangeCurrency(enabledCurrencies[j], asset.Spot) - if err != nil { - return nil, err - } + for i := range enabledCurrencies { + p, err := b.FormatExchangeCurrency(enabledCurrencies[i], asset.Spot) + if err != nil { + return nil, err + } + for j := range defaultSubChannels { subscriptions = append(subscriptions, stream.ChannelSubscription{ - Channel: channels[i] + p.String(), - Asset: asset.Spot, + Channel: defaultSubChannels[j] + "_" + p.String(), + Asset: asset.Spot, + Currency: p, }) } + if b.Websocket.CanUseAuthenticatedEndpoints() { + for j := range defaultAuthSubChannels { + subscriptions = append(subscriptions, stream.ChannelSubscription{ + Channel: defaultAuthSubChannels[j] + "_" + p.String(), + Asset: asset.Spot, + Currency: p, + Params: map[string]interface{}{ + "auth": struct{}{}, + }, + }) + } + } } return subscriptions, nil } @@ -218,6 +267,19 @@ func (b *Bitstamp) generateDefaultSubscriptions() ([]stream.ChannelSubscription, // Subscribe sends a websocket message to receive data from the channel func (b *Bitstamp) Subscribe(channelsToSubscribe []stream.ChannelSubscription) error { var errs error + var auth *WebsocketAuthResponse + + for i := range channelsToSubscribe { + if _, ok := channelsToSubscribe[i].Params["auth"]; ok { + var err error + auth, err = b.FetchWSAuth(context.TODO()) + if err != nil { + errs = common.AppendError(errs, err) + } + break + } + } + for i := range channelsToSubscribe { req := websocketEventRequest{ Event: "bts:subscribe", @@ -225,6 +287,10 @@ func (b *Bitstamp) Subscribe(channelsToSubscribe []stream.ChannelSubscription) e Channel: channelsToSubscribe[i].Channel, }, } + if _, ok := channelsToSubscribe[i].Params["auth"]; ok && auth != nil { + req.Data.Channel = "private-" + req.Data.Channel + "-" + strconv.Itoa(int(auth.UserID)) + req.Data.Auth = auth.Token + } err := b.Websocket.Conn.SendJSONMessage(req) if err != nil { errs = common.AppendError(errs, err) @@ -232,6 +298,7 @@ func (b *Bitstamp) Subscribe(channelsToSubscribe []stream.ChannelSubscription) e } b.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe[i]) } + return errs } @@ -344,3 +411,47 @@ func (b *Bitstamp) seedOrderBook(ctx context.Context) error { } return nil } + +// FetchWSAuth Retrieves a userID and auth-token from REST for subscribing to a websocket channel +// The token life-expectancy is only about 60s; use it immediately and do not store it +func (b *Bitstamp) FetchWSAuth(ctx context.Context) (*WebsocketAuthResponse, error) { + resp := &WebsocketAuthResponse{} + err := b.SendAuthenticatedHTTPRequest(ctx, exchange.RestSpot, bitstampAPIWSAuthToken, true, nil, resp) + if err != nil { + return nil, fmt.Errorf("error fetching auth token: %w", err) + } + return resp, nil +} + +// parseChannel splits the ws response channel and sets the channel type and pair +func (b *Bitstamp) parseChannelName(r *websocketResponse) error { + if r.Channel == "" { + return nil + } + + chanName := r.Channel + authParts := strings.Split(r.Channel, "-") + switch len(authParts) { + case 1: + // Not an auth channel + case 3: + chanName = authParts[1] + default: + return fmt.Errorf("channel name does not contain exactly 0 or 2 hyphens: %v", r.Channel) + } + + parts := strings.Split(chanName, "_") + if len(parts) != 3 { + return fmt.Errorf("%w: channel name does not contain exactly 2 underscores: %v", errWSPairParsingError, r.Channel) + } + + r.channelType = parts[0] + "_" + parts[1] + symbol := parts[2] + + enabledPairs, err := b.GetEnabledPairs(asset.Spot) + if err == nil { + r.pair, err = enabledPairs.DeriveFrom(symbol) + } + + return err +} diff --git a/exchanges/bitstamp/testdata/wsMyOrders.json b/exchanges/bitstamp/testdata/wsMyOrders.json new file mode 100644 index 00000000..4d281c1d --- /dev/null +++ b/exchanges/bitstamp/testdata/wsMyOrders.json @@ -0,0 +1,8 @@ +{"data":{"id":1658864794234880,"id_str":"1658864794234880","order_type":0,"datetime":"1693831262","microtimestamp":"1693831262313000","amount":0,"amount_str":"0","amount_traded":"0","amount_at_create":"0","price":999999999,"price_str":"999999999","trade_account_id":3141592,"client_order_id":"test_market_buy"},"channel":"private-my_orders_btcusd-314159","event":"order_created"} +{"data":{"id":1658864794234880,"id_str":"1658864794234880","order_type":0,"datetime":"1693831262","microtimestamp":"1693831262313000","amount":1e-8,"amount_str":"0.00000001","amount_traded":"0.00038666","amount_at_create":"0.00038667","price":25862,"price_str":"25862","trade_account_id":3141592,"client_order_id":"test_market_buy"},"channel":"private-my_orders_btcusd-314159","event":"order_changed"} +{"data":{"id":1658864794234880,"id_str":"1658864794234880","order_type":0,"datetime":"1693831262","microtimestamp":"1693831262313000","amount":1e-8,"amount_str":"0.00000001","amount_traded":"0","amount_at_create":"0.00038667","price":25862,"price_str":"25862","trade_account_id":3141592,"client_order_id":"test_market_buy"},"channel":"private-my_orders_btcusd-314159","event":"order_deleted"} +{"data":{"id":1658870500933632,"id_str":"1658870500933632","order_type":1,"datetime":"1693832656","microtimestamp":"1693832655550000","amount":0,"amount_str":"0","amount_traded":"0","amount_at_create":"0","price":0,"price_str":"0","trade_account_id":3141592,"client_order_id":"test_market_sell"},"channel":"private-my_orders_btcusd-314159","event":"order_created"} +{"data":{"id":1658870500933632,"id_str":"1658870500933632","order_type":1,"datetime":"1693832656","microtimestamp":"1693832655550000","amount":1e-8,"amount_str":"0.00000001","amount_traded":"0.00038678","amount_at_create":"0.00038679","price":25854,"price_str":"25854","trade_account_id":3141592,"client_order_id":"test_market_sell"},"channel":"private-my_orders_btcusd-314159","event":"order_changed"} +{"data":{"id":1658870500933632,"id_str":"1658870500933632","order_type":1,"datetime":"1693832656","microtimestamp":"1693832655550000","amount":1e-8,"amount_str":"0.00000001","amount_traded":"0","amount_at_create":"0.00038679","price":25854,"price_str":"25854","trade_account_id":3141592,"client_order_id":"test_market_sell"},"channel":"private-my_orders_btcusd-314159","event":"order_deleted"} +{"data":{"id":1658869033291777,"id_str":"1658869033291777","order_type":1,"datetime":"1693832297","microtimestamp":"1693832297239000","amount":0.00038692,"amount_str":"0.00038692","amount_traded":"0","amount_at_create":"0.00038692","price":25845,"price_str":"25845","trade_account_id":3141592,"client_order_id":"test_limit_sell"},"channel":"private-my_orders_btcusd-314159","event":"order_created"} +{"data":{"id":1658869033291777,"id_str":"1658869033291777","order_type":1,"datetime":"1693832303","microtimestamp":"1693832302664000","amount":0,"amount_str":"0","amount_traded":"0.00038692","amount_at_create":"0.00038692","price":25845,"price_str":"25845","trade_account_id":3141592,"client_order_id":"test_limit_sell"},"channel":"private-my_orders_btcusd-314159","event":"order_deleted"} diff --git a/exchanges/kraken/kraken_test.go b/exchanges/kraken/kraken_test.go index b71b0872..a2e3ef76 100644 --- a/exchanges/kraken/kraken_test.go +++ b/exchanges/kraken/kraken_test.go @@ -1,7 +1,6 @@ package kraken import ( - "bufio" "context" "errors" "fmt" @@ -1819,58 +1818,14 @@ func TestWsOwnTrades(t *testing.T) { func TestWsOpenOrders(t *testing.T) { t.Parallel() - pairs := currency.Pairs{ - currency.Pair{Base: currency.XBT, Quote: currency.USD}, - currency.Pair{Base: currency.XBT, Quote: currency.USDT}, - } - k := Kraken{ - Base: exchange.Base{ - Name: "dummy", - CurrencyPairs: currency.PairsManager{ - Pairs: map[asset.Item]*currency.PairStore{ - asset.Spot: { - Available: pairs, - Enabled: pairs, - ConfigFormat: ¤cy.PairFormat{ - Uppercase: true, - Delimiter: currency.DashDelimiter, - }, - }, - }, - }, - Websocket: &stream.Websocket{ - Wg: new(sync.WaitGroup), - DataHandler: make(chan interface{}, 128), - }, - }, - } - - k.API.Endpoints = k.NewEndpoints() - - fixture, err := os.Open("testdata/wsOpenTrades.json") - defer func() { assert.Nil(t, fixture.Close()) }() - if err != nil { - t.Errorf("Error opening test fixture 'testdata/wsOpenTrades.json': %v", err) - return - } - - s := bufio.NewScanner(fixture) - for s.Scan() { - if err = k.wsHandleData(s.Bytes()); err != nil { - t.Errorf("Error in wsHandleData; err: '%v', msg: '%v'", err, s.Bytes()) - } - } - if err := s.Err(); err != nil { - t.Error(err) - } - + n := new(Kraken) + sharedtestvalues.TestFixtureToDataHandler(t, k, n, "testdata/wsOpenTrades.json", n.wsHandleData) seen := 0 - for reading := true; reading; { select { default: reading = false - case resp := <-k.Websocket.DataHandler: + case resp := <-n.Websocket.DataHandler: seen++ switch v := resp.(type) { case *order.Detail: diff --git a/exchanges/sharedtestvalues/sharedtestvalues.go b/exchanges/sharedtestvalues/sharedtestvalues.go index 792d0335..8d0cce61 100644 --- a/exchanges/sharedtestvalues/sharedtestvalues.go +++ b/exchanges/sharedtestvalues/sharedtestvalues.go @@ -1,15 +1,18 @@ package sharedtestvalues import ( + "bufio" "bytes" "fmt" "os" "path/filepath" "regexp" "strings" + "sync" "testing" "time" + "github.com/stretchr/testify/assert" exchange "github.com/thrasher-corp/gocryptotrader/exchanges" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" ) @@ -147,3 +150,35 @@ func ForceFileStandard(t *testing.T, pattern string) error { } return nil } + +// TestFixtureToDataHandler takes a new empty exchange and configures a new websocket handler for it, and squirts the json path contents to it +// It accepts a reader function, which is probably e.wsHandleData but could be anything +func TestFixtureToDataHandler(t *testing.T, seed, e exchange.IBotExchange, fixturePath string, reader func([]byte) error) { + b := e.GetBase() + seedBase := seed.GetBase() + + err := b.CurrencyPairs.Load(&seedBase.CurrencyPairs) + assert.NoError(t, err, "Loading currency pairs should not error") + + b.Name = "fixture" + b.Websocket = &stream.Websocket{ + Wg: new(sync.WaitGroup), + DataHandler: make(chan interface{}, 128), + } + b.API.Endpoints = b.NewEndpoints() + + fixture, err := os.Open(fixturePath) + assert.NoError(t, err, "Opening fixture '%s' should not error", fixturePath) + defer func() { + assert.NoError(t, fixture.Close(), "Closing the fixture file should not error") + }() + + s := bufio.NewScanner(fixture) + for s.Scan() { + msg := s.Bytes() + if err := reader(msg); err != nil { + t.Errorf("%v from message: %s", err, msg) + } + } + assert.NoError(t, s.Err(), "Fixture Scanner should not error") +} diff --git a/testdata/http_mock/bitstamp/bitstamp.json b/testdata/http_mock/bitstamp/bitstamp.json index 2155c855..373a21bd 100644 --- a/testdata/http_mock/bitstamp/bitstamp.json +++ b/testdata/http_mock/bitstamp/bitstamp.json @@ -68230,6 +68230,23 @@ } } ] + }, + "/api/v2/websockets_token/": { + "POST": [ + { + "data": { + "token": "HorseWalksIntoBarBtenderLongFace", + "valid_sec": 60, + "user_id": 1337210 + }, + "bodyParams": "key=1\u0026nonce=2\u0026signature=SIGGYMSIGGY", + "headers": { + "Content-Type": [ + "application/x-www-form-urlencoded" + ] + } + } + ] } } } \ No newline at end of file