From dda5f8c67ac12aa185328bdac3252e17b507e212 Mon Sep 17 00:00:00 2001 From: Gareth Kirwan Date: Fri, 19 Sep 2025 11:02:28 +0700 Subject: [PATCH] Binance/BinanceUS: Fix various live test failures (#2019) * Binance: Refactor GetAggregatedTradesBatched and fix test Fixes #2010 * Binance: Fix TestGetHistoricTrades * BinanceUS: Refactor and fix GetAggregatedTradesBatched * Binance: Add QuoteAsset to UCompositeIndexInfo and fix intermittent test fail * fixup! Binance: Refactor GetAggregatedTradesBatched and fix test --- exchanges/binance/binance.go | 80 +++++---- exchanges/binance/binance_test.go | 193 +++++++++++----------- exchanges/binance/testdata/http.json | 202 ++++++++++++++++------- exchanges/binance/ufutures_types.go | 7 +- exchanges/binanceus/binanceus.go | 133 +++++++-------- exchanges/binanceus/binanceus_test.go | 59 ++++++- exchanges/binanceus/binanceus_types.go | 4 +- exchanges/binanceus/binanceus_wrapper.go | 4 +- 8 files changed, 401 insertions(+), 281 deletions(-) diff --git a/exchanges/binance/binance.go b/exchanges/binance/binance.go index 4d261b74..a6b137db 100644 --- a/exchanges/binance/binance.go +++ b/exchanges/binance/binance.go @@ -269,40 +269,42 @@ func (e *Exchange) GetAggregatedTrades(ctx context.Context, arg *AggregatedTrade } var resp []AggregatedTrade path := aggregatedTrades + "?" + params.Encode() - return resp, e.SendHTTPRequest(ctx, - exchange.RestSpotSupplementary, path, spotDefaultRate, &resp) + return resp, e.SendHTTPRequest(ctx, exchange.RestSpotSupplementary, path, spotDefaultRate, &resp) } // batchAggregateTrades fetches trades in multiple requests -// first phase, hourly requests until the first trade (or end time) is reached -// second phase, limit requests from previous trade until end time (or limit) is reached -func (e *Exchange) batchAggregateTrades(ctx context.Context, arg *AggregatedTradeRequestParams, params url.Values) ([]AggregatedTrade, error) { +// If no args.FromID is passed, requests are made intervals doubling from 10s until we get a viable starting position. +// Once the starting set is found, we continue scanning until we have all the trades in the time window +func (e *Exchange) batchAggregateTrades(ctx context.Context, args *AggregatedTradeRequestParams, params url.Values) ([]AggregatedTrade, error) { var resp []AggregatedTrade // prepare first request with only first hour and max limit - if arg.Limit == 0 || arg.Limit > 1000 { + if args.Limit == 0 || args.Limit > 1000 { // Extend from the default of 500 params.Set("limit", "1000") } var fromID int64 - if arg.FromID > 0 { - fromID = arg.FromID + if args.FromID > 0 { + fromID = args.FromID } else { - // Only 10 seconds is used to prevent limit of 1000 being reached in the first request, - // cutting off trades for high activity pairs + // Only 10 seconds is used to prevent limit of 1000 being reached in the first request, cutting off trades for high activity pairs + // If we don't find anything we keep increasing the window by doubling the interval and scanning again increment := time.Second * 10 - for start := arg.StartTime; len(resp) == 0; start = start.Add(increment) { - if !arg.EndTime.IsZero() && start.After(arg.EndTime) { - // All requests returned empty + for start := args.StartTime; len(resp) == 0; start, increment = start.Add(increment), increment*2 { + if !args.EndTime.IsZero() && start.After(args.EndTime) || increment <= 0 { + // All requests returned empty or we searched until increment overflowed return nil, nil } params.Set("startTime", strconv.FormatInt(start.UnixMilli(), 10)) - params.Set("endTime", strconv.FormatInt(start.Add(increment).UnixMilli(), 10)) + end := start.Add(increment) + if !args.EndTime.IsZero() && end.After(args.EndTime) { + end = args.EndTime + } + params.Set("endTime", strconv.FormatInt(end.UnixMilli(), 10)) path := aggregatedTrades + "?" + params.Encode() - err := e.SendHTTPRequest(ctx, - exchange.RestSpotSupplementary, path, spotDefaultRate, &resp) + err := e.SendHTTPRequest(ctx, exchange.RestSpotSupplementary, path, spotDefaultRate, &resp) if err != nil { - return resp, fmt.Errorf("%w %v", err, arg.Symbol) + return resp, fmt.Errorf("%w %v", err, args.Symbol) } } fromID = resp[len(resp)-1].ATradeID @@ -311,38 +313,34 @@ func (e *Exchange) batchAggregateTrades(ctx context.Context, arg *AggregatedTrad // other requests follow from the last aggregate trade id and have no time window params.Del("startTime") params.Del("endTime") - // while we haven't reached the limit - for ; arg.Limit == 0 || len(resp) < arg.Limit; fromID = resp[len(resp)-1].ATradeID { +outer: + for ; args.Limit == 0 || len(resp) < args.Limit; fromID = resp[len(resp)-1].ATradeID { // Keep requesting new data after last retrieved trade params.Set("fromId", strconv.FormatInt(fromID, 10)) path := aggregatedTrades + "?" + params.Encode() var additionalTrades []AggregatedTrade - err := e.SendHTTPRequest(ctx, - exchange.RestSpotSupplementary, - path, - spotDefaultRate, - &additionalTrades) - if err != nil { - return resp, fmt.Errorf("%w %v", err, arg.Symbol) + if err := e.SendHTTPRequest(ctx, exchange.RestSpotSupplementary, path, spotDefaultRate, &additionalTrades); err != nil { + return resp, fmt.Errorf("%w %v", err, args.Symbol) } - lastIndex := len(additionalTrades) - if !arg.EndTime.IsZero() { - // get index for truncating to end time - lastIndex = sort.Search(len(additionalTrades), func(i int) bool { - return arg.EndTime.Before(additionalTrades[i].TimeStamp.Time()) - }) + switch len(additionalTrades) { + case 0, 1: + break outer // We only got the one we already have + default: + additionalTrades = additionalTrades[1:] // Remove the record we already have } - // don't include the first as the request was inclusive from last ATradeID - resp = append(resp, additionalTrades[1:lastIndex]...) - // If only the starting trade is returned or if we received trades after end time - if len(additionalTrades) == 1 || lastIndex < len(additionalTrades) { - // We found the end - break + if !args.EndTime.IsZero() { + // Check if only some of the results are before EndTime + if afterEnd := sort.Search(len(additionalTrades), func(i int) bool { + return args.EndTime.Before(additionalTrades[i].TimeStamp.Time()) + }); afterEnd < len(additionalTrades) { + resp = append(resp, additionalTrades[:afterEnd]...) + break + } } + resp = append(resp, additionalTrades...) } - // Truncate if necessary - if arg.Limit > 0 && len(resp) > arg.Limit { - resp = resp[:arg.Limit] + if args.Limit > 0 && len(resp) > args.Limit { + resp = resp[:args.Limit] } return resp, nil } diff --git a/exchanges/binance/binance_test.go b/exchanges/binance/binance_test.go index 0f3b5305..75053a81 100644 --- a/exchanges/binance/binance_test.go +++ b/exchanges/binance/binance_test.go @@ -368,18 +368,20 @@ func TestUTakerBuySellVol(t *testing.T) { func TestUCompositeIndexInfo(t *testing.T) { t.Parallel() - cp, err := currency.NewPairFromString("DEFI-USDT") - if err != nil { - t.Error(err) - } - _, err = e.UCompositeIndexInfo(t.Context(), cp) - if err != nil { - t.Error(err) - } - _, err = e.UCompositeIndexInfo(t.Context(), currency.EMPTYPAIR) - if err != nil { - t.Error(err) - } + r, err := e.UCompositeIndexInfo(t.Context(), currency.EMPTYPAIR) + require.NoError(t, err, "UCompositeIndexInfo with no pair must not error") + require.NotEmpty(t, r, "UCompositeIndexInfo must return composite index info") + cp, err := currency.NewPairFromString(r[0].Symbol) + require.NoErrorf(t, err, "NewPairFromString must not error for symbol %s", r[0].Symbol) + r, err = e.UCompositeIndexInfo(t.Context(), cp) + require.NoErrorf(t, err, "UCompositeIndexInfo for pair %s must not error", cp) + require.NotEmptyf(t, r, "UCompositeIndexInfo for pair %s must return composite index info", cp) + require.NotEmptyf(t, r[0].BaseAssetList, "UCompositeIndexInfo for pair %s must return a non empty base asset list", cp) + b := r[0].BaseAssetList[0] + assert.NotEmpty(t, b.BaseAsset, "BaseAsset should be set") + assert.NotEmpty(t, b.QuoteAsset, "QuoteAsset asset should be set") + assert.NotZero(t, b.WeightInQuantity, "WeightInQuantity should be set") + assert.NotZero(t, b.WeightInPercentage, "WeightInPercentage should be set") } func TestUFuturesNewOrder(t *testing.T) { @@ -1477,109 +1479,108 @@ func TestNewOrderTest(t *testing.T) { func TestGetHistoricTrades(t *testing.T) { t.Parallel() p := currency.NewBTCUSDT() - start := time.Unix(1577977445, 0) // 2020-01-02 15:04:05 - end := start.Add(15 * time.Minute) // 2020-01-02 15:19:05 - result, err := e.GetHistoricTrades(t.Context(), p, asset.Spot, start, end) - assert.NoError(t, err, "GetHistoricTrades should not error") - expected := 2134 + start := time.Now().Add(-time.Hour * 24 * 90).Truncate(time.Minute) // 3 months ago if mockTests { - expected = 1002 + start = time.Unix(1577977445, 0) // 2020-01-02 15:04:05 } - assert.Equal(t, expected, len(result), "GetHistoricTrades should return correct number of entries") + end := start.Add(15 * time.Minute) + result, err := e.GetHistoricTrades(t.Context(), p, asset.Spot, start, end) + require.NoError(t, err, "GetHistoricTrades must not error") + assert.Greater(t, len(result), 1001, "GetHistoricTrades should have enough trades") for _, r := range result { - if !assert.WithinRange(t, r.Timestamp, start, end, "All trades should be within time range") { - break - } + require.WithinRange(t, r.Timestamp, start, end, "All trades must be within time range") } } +// TestGetAggregatedTradesBatched exercises TestGetAggregatedTradesBatched to ensure our date and limit scanning works correctly +// This test is susceptible to failure if volumes change a lot, during wash trading or zero-fee periods +// In live tests, 45 minutes is expected to return more than 1000 records func TestGetAggregatedTradesBatched(t *testing.T) { t.Parallel() - currencyPair, err := currency.NewPairFromString("BTCUSDT") - if err != nil { - t.Fatal(err) + type testCase struct { + name string + args *AggregatedTradeRequestParams + expFunc func(*testing.T, []AggregatedTrade) } - start := time.Date(2020, 1, 2, 15, 4, 5, 0, time.UTC) - expectTime := time.Date(2020, 1, 2, 16, 19, 4, 831_000_000, time.UTC) - tests := []struct { - name string - // mock test or live test - mock bool - args *AggregatedTradeRequestParams - numExpected int - lastExpected time.Time - }{ - { - name: "mock batch with timerange", - mock: true, - args: &AggregatedTradeRequestParams{ - Symbol: currencyPair, - StartTime: start, - EndTime: start.Add(75 * time.Minute), + var tests []testCase + if mockTests { + start := time.Date(2020, 1, 2, 15, 4, 5, 0, time.UTC) + tests = []testCase{ + { + name: "mock batch with timerange", + args: &AggregatedTradeRequestParams{StartTime: start, EndTime: start.Add(75 * time.Minute)}, + expFunc: func(t *testing.T, results []AggregatedTrade) { + t.Helper() + require.Equal(t, 1012, len(results), "must return correct number of records") + assert.Equal(t, + time.Date(2020, 1, 2, 16, 18, 31, int(919*time.Millisecond), time.UTC), + results[len(results)-1].TimeStamp.Time().UTC(), + "should return the correct time for the last record", + ) + }, }, - numExpected: 1012, - lastExpected: time.Date(2020, 1, 2, 16, 18, 31, int(919*time.Millisecond), time.UTC), - }, - { - name: "batch with timerange", - args: &AggregatedTradeRequestParams{ - Symbol: currencyPair, - StartTime: start, - EndTime: start.Add(75 * time.Minute), + { + name: "mock custom limit with start time set, no end time", + args: &AggregatedTradeRequestParams{StartTime: start, Limit: 1001}, + expFunc: func(t *testing.T, results []AggregatedTrade) { + t.Helper() + require.Equal(t, 1001, len(results), "must return correct number of records") + assert.Equal(t, + time.Date(2020, 1, 2, 15, 18, 39, int(226*time.Millisecond), time.UTC), + results[len(results)-1].TimeStamp.Time().UTC(), + "should return the correct time for the last record", + ) + }, }, - numExpected: 12130, - lastExpected: expectTime, - }, - { - name: "mock custom limit with start time set, no end time", - mock: true, - args: &AggregatedTradeRequestParams{ - Symbol: currency.NewBTCUSDT(), - StartTime: start, - Limit: 1001, + { + name: "mock limit less than returned", + args: &AggregatedTradeRequestParams{Limit: 3}, + expFunc: func(t *testing.T, results []AggregatedTrade) { + t.Helper() + require.Equal(t, 3, len(results), "must return correct number of records") + assert.Equal(t, + time.Date(2020, 1, 2, 16, 19, 5, int(200*time.Millisecond), time.UTC), + results[len(results)-1].TimeStamp.Time().UTC(), + "should return the correct time for the last record", + ) + }, }, - numExpected: 1001, - lastExpected: time.Date(2020, 1, 2, 15, 18, 39, int(226*time.Millisecond), time.UTC), - }, - { - name: "custom limit with start time set, no end time", - args: &AggregatedTradeRequestParams{ - Symbol: currency.NewBTCUSDT(), - StartTime: time.Date(2020, 11, 18, 23, 0, 28, 921, time.UTC), - Limit: 1001, + } + } else { + start := time.Now().Add(-time.Hour * 24 * 90).Truncate(time.Minute) // 3 months ago + tests = []testCase{ + { + name: "batch with timerange", + args: &AggregatedTradeRequestParams{StartTime: start, EndTime: start.Add(20 * time.Minute)}, + expFunc: func(t *testing.T, results []AggregatedTrade) { + t.Helper() + // 2000-50000 records range was valid in 2025; Adjust if Binance enters a phase of zero-fees or low-volume + require.Greater(t, len(results), 2000, "must return a quantity above a sane threshold of records") + assert.Less(t, len(results), 50000, "should return a quantity below a sane threshold of records") + assert.WithinDuration(t, results[len(results)-1].TimeStamp.Time(), start, 20*time.Minute, "last record should be within range of start time") + }, }, - numExpected: 1001, - lastExpected: time.Date(2020, 11, 18, 23, 1, 33, int(62*time.Millisecond*10), time.UTC), - }, - { - name: "mock recent trades", - mock: true, - args: &AggregatedTradeRequestParams{ - Symbol: currency.NewBTCUSDT(), - Limit: 3, + { + name: "custom limit with start time set, no end time", + args: &AggregatedTradeRequestParams{StartTime: start, Limit: 2042}, + expFunc: func(t *testing.T, results []AggregatedTrade) { + t.Helper() + // 2000 records in was about 6 minutes in 2025; Adjust if Binance enters a phase of zero-fees or low-volume + require.Equal(t, 2042, len(results), "must return exactly the limit number of records") + assert.WithinDuration(t, results[len(results)-1].TimeStamp.Time(), start, 20*time.Minute, "last record should be within 20 minutes of start time") + }, }, - numExpected: 3, - lastExpected: time.Date(2020, 1, 2, 16, 19, 5, int(200*time.Millisecond), time.UTC), - }, + } } + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { t.Parallel() - if tt.mock != mockTests { - t.Skip("mock mismatch, skipping") - } - result, err := e.GetAggregatedTrades(t.Context(), tt.args) - if err != nil { - t.Error(err) - } - if len(result) != tt.numExpected { - t.Errorf("GetAggregatedTradesBatched() expected %v entries, got %v", tt.numExpected, len(result)) - } - lastTradeTime := result[len(result)-1].TimeStamp - if !lastTradeTime.Time().Equal(tt.lastExpected) { - t.Errorf("last trade expected %v, got %v", tt.lastExpected.UTC(), lastTradeTime.Time().UTC()) - } + tt.args.Symbol = currency.NewBTCUSDT() + results, err := e.GetAggregatedTrades(t.Context(), tt.args) + require.NoError(t, err) + tt.expFunc(t, results) }) } } diff --git a/exchanges/binance/testdata/http.json b/exchanges/binance/testdata/http.json index e04deb54..765b837e 100644 --- a/exchanges/binance/testdata/http.json +++ b/exchanges/binance/testdata/http.json @@ -90751,89 +90751,165 @@ }, "/fapi/v1/indexInfo": { "GET": [ - { - "data": { - "baseAssetList": [ - { - "baseAsset": "AAVE", - "weightInPercentage": "0.07255900", - "weightInQuantity": "0.57321317" - }, - { - "baseAsset": "ALPHA", - "weightInPercentage": "0.02039700", - "weightInQuantity": "60.33241822" - }, - { - "baseAsset": "BAL", - "weightInPercentage": "0.02362800", - "weightInQuantity": "1.24967422" - }, - { - "baseAsset": "BAND", - "weightInPercentage": "0.03501400", - "weightInQuantity": "3.29870857" - }, - { - "baseAsset": "BEL", - "weightInPercentage": "0.01843900", - "weightInQuantity": "8.75756110" - }, - { - "baseAsset": "BZRX", - "weightInPercentage": "0.02294900", - "weightInQuantity": "67.18425714" - } - ], - "symbol": "DEFIUSDT", - "time": 1608086821000 - }, - "queryString": "symbol=DEFIUSDT", - "bodyParams": "", - "headers": {} - }, { "data": [ { "baseAssetList": [ { - "baseAsset": "AAVE", - "weightInPercentage": "0.07255900", - "weightInQuantity": "0.57321317" + "baseAsset": "BTC", + "quoteAsset": "AAVE", + "weightInPercentage": "0.00376500", + "weightInQuantity": "0.03738265" }, { - "baseAsset": "ALPHA", - "weightInPercentage": "0.02039700", - "weightInQuantity": "60.33241822" + "baseAsset": "BTC", + "quoteAsset": "ADA", + "weightInPercentage": "0.02638100", + "weightInQuantity": "0.00078048" }, { - "baseAsset": "BAL", - "weightInPercentage": "0.02362800", - "weightInQuantity": "1.24967422" + "baseAsset": "BTC", + "quoteAsset": "AVAX", + "weightInPercentage": "0.01082800", + "weightInQuantity": "0.01147365" }, { - "baseAsset": "BAND", - "weightInPercentage": "0.03501400", - "weightInQuantity": "3.29870857" + "baseAsset": "BTC", + "quoteAsset": "BCH", + "weightInPercentage": "0.00992300", + "weightInQuantity": "0.20359666" }, { - "baseAsset": "BEL", - "weightInPercentage": "0.01843900", - "weightInQuantity": "8.75756110" - }, - { - "baseAsset": "BZRX", - "weightInPercentage": "0.02294900", - "weightInQuantity": "67.18425714" + "baseAsset": "BTC", + "quoteAsset": "BNB", + "weightInPercentage": "0.11126600", + "weightInQuantity": "3.58068053" } ], - "symbol": "DEFIUSDT", - "time": 1608086822007 + "component": "quoteAsset", + "symbol": "BTCDOMUSDT", + "time": 1758250144006 + }, + { + "baseAssetList": [ + { + "baseAsset": "1000000BOB", + "quoteAsset": "USDT", + "weightInPercentage": "0.00204918", + "weightInQuantity": "0.04619791" + }, + { + "baseAsset": "1000000MOG", + "quoteAsset": "USDT", + "weightInPercentage": "0.00204918", + "weightInQuantity": "0.00254617" + }, + { + "baseAsset": "1000BONK", + "quoteAsset": "USDT", + "weightInPercentage": "0.00204918", + "weightInQuantity": "0.10008223" + }, + { + "baseAsset": "1000CAT", + "quoteAsset": "USDT", + "weightInPercentage": "0.00204918", + "weightInQuantity": "0.29881191" + }, + { + "baseAsset": "1000CHEEMS", + "quoteAsset": "USDT", + "weightInPercentage": "0.00204918", + "weightInQuantity": "1.98279472" + } + ], + "component": "baseAsset", + "symbol": "ALLUSDT", + "time": 1758250144006 + }, + { + "baseAssetList": [ + { + "baseAsset": "1000000BOB", + "quoteAsset": "USDT", + "weightInPercentage": "0.01000000", + "weightInQuantity": "0.21446069" + }, + { + "baseAsset": "1000WHY", + "quoteAsset": "USDT", + "weightInPercentage": "0.01000000", + "weightInQuantity": "333.26983733" + }, + { + "baseAsset": "1000X", + "quoteAsset": "USDT", + "weightInPercentage": "0.01000000", + "weightInQuantity": "0.22663334" + }, + { + "baseAsset": "AGT", + "quoteAsset": "USDT", + "weightInPercentage": "0.01000000", + "weightInQuantity": "1.63266620" + }, + { + "baseAsset": "AIN", + "quoteAsset": "USDT", + "weightInPercentage": "0.01000000", + "weightInQuantity": "0.08414804" + } + ], + "component": "baseAsset", + "symbol": "SMALLUSDT", + "time": 1758250144006 } ], "queryString": "", "bodyParams": "", "headers": {} + }, + { + "data": { + "baseAssetList": [ + { + "baseAsset": "BTC", + "quoteAsset": "AAVE", + "weightInPercentage": "0.00376500", + "weightInQuantity": "0.03738265" + }, + { + "baseAsset": "BTC", + "quoteAsset": "ADA", + "weightInPercentage": "0.02638100", + "weightInQuantity": "0.00078048" + }, + { + "baseAsset": "BTC", + "quoteAsset": "AVAX", + "weightInPercentage": "0.01082800", + "weightInQuantity": "0.01147365" + }, + { + "baseAsset": "BTC", + "quoteAsset": "BCH", + "weightInPercentage": "0.00992300", + "weightInQuantity": "0.20359666" + }, + { + "baseAsset": "BTC", + "quoteAsset": "BNB", + "weightInPercentage": "0.11126600", + "weightInQuantity": "3.58068053" + } + ], + "component": "quoteAsset", + "symbol": "BTCDOMUSDT", + "time": 1758250145000 + }, + "queryString": "symbol=BTCDOMUSDT", + "bodyParams": "", + "headers": {} } ] }, diff --git a/exchanges/binance/ufutures_types.go b/exchanges/binance/ufutures_types.go index 39b989d1..59cf5b57 100644 --- a/exchanges/binance/ufutures_types.go +++ b/exchanges/binance/ufutures_types.go @@ -185,9 +185,10 @@ type UCompositeIndexInfoData struct { Symbol string `json:"symbol"` Time types.Time `json:"time"` BaseAssetList []struct { - BaseAsset string `json:"baseAsset"` - WeightInQuantity float64 `json:"weightInQuantity,string"` - WeightInPercentage float64 `json:"weightInPercentage,string"` + BaseAsset currency.Code `json:"baseAsset"` + QuoteAsset currency.Code `json:"quoteAsset"` + WeightInQuantity float64 `json:"weightInQuantity,string"` + WeightInPercentage float64 `json:"weightInPercentage,string"` } `json:"baseAssetList"` } diff --git a/exchanges/binanceus/binanceus.go b/exchanges/binanceus/binanceus.go index dedac640..4790f4b8 100644 --- a/exchanges/binanceus/binanceus.go +++ b/exchanges/binanceus/binanceus.go @@ -123,7 +123,6 @@ var ( errInvalidAssetValue = errors.New("invalid asset ") errInvalidAssetAmount = errors.New("invalid asset amount") errIncompleteArguments = errors.New("missing required argument") - errStartTimeOrFromIDNotSet = errors.New("please set StartTime or FromId, but not both") errMissingRequiredArgumentCoin = errors.New("missing required argument,coin") errMissingRequiredArgumentNetwork = errors.New("missing required argument,network") errAmountValueMustBeGreaterThan0 = errors.New("amount must be greater than 0") @@ -203,84 +202,81 @@ func (e *Exchange) GetHistoricalTrades(ctx context.Context, hist HistoricalTrade // GetAggregateTrades to get compressed, aggregate trades. Trades that fill at the time, from the same order, with the same price will have the quantity aggregated. func (e *Exchange) GetAggregateTrades(ctx context.Context, agg *AggregatedTradeRequestParams) ([]AggregatedTrade, error) { params := url.Values{} - symbol, err := e.FormatSymbol(agg.Symbol, asset.Spot) + s, err := e.FormatSymbol(agg.Symbol, asset.Spot) if err != nil { return nil, err } - params.Set("symbol", symbol) - needBatch := false - if agg.Limit > 0 { - if agg.Limit > 1000 { - needBatch = true - } else { - params.Set("limit", strconv.Itoa(agg.Limit)) - } + params.Set("symbol", s) + // If the user request is directly not supported by the exchange, we might be able to fulfill it + // by merging results from multiple API requests + needBatch := true // Need to batch unless user has specified a limit + if agg.Limit > 0 && agg.Limit <= 1000 { + needBatch = false + params.Set("limit", strconv.Itoa(agg.Limit)) } if agg.FromID != 0 { params.Set("fromId", strconv.FormatInt(agg.FromID, 10)) } - startTime := time.UnixMilli(agg.StartTime) - endTime := time.UnixMilli(agg.EndTime) - - if (endTime.UnixNano() - startTime.UnixNano()) >= int64(time.Hour) { - endTime = startTime.Add(time.Minute * 59) + if !agg.StartTime.IsZero() { + params.Set("startTime", strconv.FormatInt(agg.StartTime.UnixMilli(), 10)) + } + if !agg.EndTime.IsZero() { + params.Set("endTime", strconv.FormatInt(agg.EndTime.UnixMilli(), 10)) } - if !startTime.IsZero() && startTime.Unix() != 0 { - params.Set("startTime", strconv.FormatInt(agg.StartTime, 10)) - } - if !endTime.IsZero() && endTime.Unix() != 0 { - params.Set("endTime", strconv.FormatInt(agg.EndTime, 10)) - } - needBatch = needBatch || (!startTime.IsZero() && !endTime.IsZero() && endTime.Sub(startTime) > time.Hour) + // startTime and endTime are set and time between startTime and endTime is more than 1 hour + needBatch = needBatch || (!agg.StartTime.IsZero() && !agg.EndTime.IsZero() && agg.EndTime.Sub(agg.StartTime) > time.Hour) + // Fall back to batch requests, if possible and necessary if needBatch { - // fromId xor start time must be set - canBatch := agg.FromID == 0 != startTime.IsZero() + // fromId or start time must be set + canBatch := agg.FromID == 0 != agg.StartTime.IsZero() if canBatch { + // Split the request into multiple return e.batchAggregateTrades(ctx, agg, params) } + // Can't handle this request locally or remotely // We would receive {"code":-1128,"msg":"Combination of optional parameters invalid."} - return nil, errStartTimeOrFromIDNotSet + return nil, errors.New("please set StartTime or FromId, but not both") } var resp []AggregatedTrade - path := common.EncodeURLValues(aggregatedTrades, params) - return resp, e.SendHTTPRequest(ctx, - exchange.RestSpotSupplementary, path, spotDefaultRate, &resp) + path := aggregatedTrades + "?" + params.Encode() + return resp, e.SendHTTPRequest(ctx, exchange.RestSpotSupplementary, path, spotDefaultRate, &resp) } // batchAggregateTrades fetches trades in multiple requests <-- copied and amended from the binance // first phase, hourly requests until the first trade (or end time) is reached // second phase, limit requests from previous trade until end time (or limit) is reached -func (e *Exchange) batchAggregateTrades(ctx context.Context, arg *AggregatedTradeRequestParams, params url.Values) ([]AggregatedTrade, error) { +func (e *Exchange) batchAggregateTrades(ctx context.Context, args *AggregatedTradeRequestParams, params url.Values) ([]AggregatedTrade, error) { var resp []AggregatedTrade // prepare first request with only first hour and max limit - if arg.Limit == 0 || arg.Limit > 1000 { + if args.Limit == 0 || args.Limit > 1000 { // Extend from the default of 500 params.Set("limit", "1000") } - startTime := time.UnixMilli(arg.StartTime) - endTime := time.UnixMilli(arg.EndTime) + var fromID int64 - if arg.FromID > 0 { - fromID = arg.FromID + if args.FromID > 0 { + fromID = args.FromID } else { - // Only 10 seconds is used to prevent limit of 1000 being reached in the first request, - // cutting off trades for high activity pairs + // Only 10 seconds is used to prevent limit of 1000 being reached in the first request, cutting off trades for high activity pairs + // If we don't find anything we keep increasing the window by doubling the interval and scanning again increment := time.Second * 10 - for len(resp) == 0 { - startTime = startTime.Add(increment) - if !endTime.IsZero() && startTime.After(endTime) { - // All requests returned empty + for start := args.StartTime; len(resp) == 0; start, increment = start.Add(increment), increment*2 { + if !args.EndTime.IsZero() && start.After(args.EndTime) || increment <= 0 { + // All requests returned empty or we searched until increment overflowed return nil, nil } - params.Set("startTime", strconv.FormatInt(startTime.UnixMilli(), 10)) - params.Set("endTime", strconv.FormatInt(startTime.Add(increment).UnixMilli(), 10)) - path := common.EncodeURLValues(aggregatedTrades, params) - err := e.SendHTTPRequest(ctx, - exchange.RestSpotSupplementary, path, spotDefaultRate, &resp) + params.Set("startTime", strconv.FormatInt(start.UnixMilli(), 10)) + end := start.Add(increment) + if !args.EndTime.IsZero() && end.After(args.EndTime) { + end = args.EndTime + } + params.Set("endTime", strconv.FormatInt(end.UnixMilli(), 10)) + path := aggregatedTrades + "?" + params.Encode() + err := e.SendHTTPRequest(ctx, exchange.RestSpotSupplementary, path, spotDefaultRate, &resp) if err != nil { - return resp, err + return resp, fmt.Errorf("%w %v", err, args.Symbol) } } fromID = resp[len(resp)-1].ATradeID @@ -289,37 +285,34 @@ func (e *Exchange) batchAggregateTrades(ctx context.Context, arg *AggregatedTrad // other requests follow from the last aggregate trade id and have no time window params.Del("startTime") params.Del("endTime") - // while we haven't reached the limit - for ; arg.Limit == 0 || len(resp) < arg.Limit; fromID = resp[len(resp)-1].ATradeID { +outer: + for ; args.Limit == 0 || len(resp) < args.Limit; fromID = resp[len(resp)-1].ATradeID { // Keep requesting new data after last retrieved trade params.Set("fromId", strconv.FormatInt(fromID, 10)) - path := common.EncodeURLValues(aggregatedTrades, params) + path := aggregatedTrades + "?" + params.Encode() var additionalTrades []AggregatedTrade - err := e.SendHTTPRequest(ctx, - exchange.RestSpotSupplementary, - path, - spotDefaultRate, - &additionalTrades) - if err != nil { - return resp, err + if err := e.SendHTTPRequest(ctx, exchange.RestSpotSupplementary, path, spotDefaultRate, &additionalTrades); err != nil { + return resp, fmt.Errorf("%w %v", err, args.Symbol) } - lastIndex := len(additionalTrades) - if !endTime.IsZero() && endTime.Unix() != 0 { - // get index for truncating to end time - lastIndex = sort.Search(len(additionalTrades), func(i int) bool { - return endTime.Before(additionalTrades[i].TimeStamp.Time()) - }) + switch len(additionalTrades) { + case 0, 1: + break outer // We only got the one we already have + default: + additionalTrades = additionalTrades[1:] // Remove the record we already have } - // don't include the first as the request was inclusive from last ATradeID - resp = append(resp, additionalTrades[1:lastIndex]...) - // If only the starting trade is returned or if we received trades after end time - if len(additionalTrades) == 1 || lastIndex < len(additionalTrades) { - break + if !args.EndTime.IsZero() { + // Check if only some of the results are before EndTime + if afterEnd := sort.Search(len(additionalTrades), func(i int) bool { + return args.EndTime.Before(additionalTrades[i].TimeStamp.Time()) + }); afterEnd < len(additionalTrades) { + resp = append(resp, additionalTrades[:afterEnd]...) + break + } } + resp = append(resp, additionalTrades...) } - // Truncate if necessary - if arg.Limit > 0 && len(resp) > arg.Limit { - resp = resp[:arg.Limit] + if args.Limit > 0 && len(resp) > args.Limit { + resp = resp[:args.Limit] } return resp, nil } diff --git a/exchanges/binanceus/binanceus_test.go b/exchanges/binanceus/binanceus_test.go index 6ddd8b06..a2b1b169 100644 --- a/exchanges/binanceus/binanceus_test.go +++ b/exchanges/binanceus/binanceus_test.go @@ -140,10 +140,14 @@ func TestGetRecentTrades(t *testing.T) { func TestGetHistoricTrades(t *testing.T) { t.Parallel() - pair := currency.Pair{Base: currency.BTC, Quote: currency.USD} - _, err := e.GetHistoricTrades(t.Context(), pair, asset.Spot, time.Time{}, time.Time{}) - if err != nil { - t.Error("Binanceus GetHistoricTrades() error", err) + p := currency.NewBTCUSDT() + start := time.Now().Add(-time.Hour * 24 * 90).Truncate(time.Minute) // 3 months ago + end := start.Add(15 * time.Minute) + result, err := e.GetHistoricTrades(t.Context(), p, asset.Spot, start, end) + require.NoError(t, err, "GetHistoricTrades must not error") + assert.NotEmpty(t, result, "GetHistoricTrades should have trades") + for _, r := range result { + require.WithinRange(t, r.Timestamp, start, end, "All trades must be within time range") } } @@ -1756,3 +1760,50 @@ func TestGetCurrencyTradeURL(t *testing.T) { assert.NotEmpty(t, resp) } } + +// TestGetAggregatedTradesBatched exercises TestGetAggregatedTradesBatched to ensure our date and limit scanning works correctly +// This test is susceptible to failure if volumes change a lot, during wash trading or zero-fee periods +// In live tests, 6 hours is expected to return about 1000 records +func TestGetAggregatedTradesBatched(t *testing.T) { + t.Parallel() + type testCase struct { + name string + args *AggregatedTradeRequestParams + expFunc func(*testing.T, []AggregatedTrade) + } + + var tests []testCase + start := time.Now().Add(-time.Hour * 24 * 90).Truncate(time.Minute) // 3 months ago + tests = []testCase{ + { + name: "batch with timerange", + args: &AggregatedTradeRequestParams{StartTime: start, EndTime: start.Add(6 * time.Hour)}, + expFunc: func(t *testing.T, results []AggregatedTrade) { + t.Helper() + require.NotEmpty(t, results, "must have records") + assert.Less(t, len(results), 10000, "should return a quantity below a sane threshold of records") + assert.WithinDuration(t, results[len(results)-1].TimeStamp.Time(), start, 6*time.Hour, "last record should be within range of start time") + }, + }, + { + name: "custom limit with start time set, no end time", + args: &AggregatedTradeRequestParams{StartTime: start, Limit: 2042}, + expFunc: func(t *testing.T, results []AggregatedTrade) { + t.Helper() + // 2000 records in was about 32 hours in 2025; Adjust if BinanceUS enters a phase of zero-fees or low-volume + require.Equal(t, 2042, len(results), "must return exactly the limit number of records") + assert.WithinDuration(t, results[len(results)-1].TimeStamp.Time(), start, 72*time.Hour, "last record should be within 72 hours of start time") + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + tt.args.Symbol = currency.NewBTCUSDT() + results, err := e.GetAggregateTrades(t.Context(), tt.args) + require.NoError(t, err) + tt.expFunc(t, results) + }) + } +} diff --git a/exchanges/binanceus/binanceus_types.go b/exchanges/binanceus/binanceus_types.go index 65566fba..a8743f4e 100644 --- a/exchanges/binanceus/binanceus_types.go +++ b/exchanges/binanceus/binanceus_types.go @@ -129,8 +129,8 @@ type AggregatedTradeRequestParams struct { // The first trade to retrieve FromID int64 // The API seems to accept (start and end time) or FromID and no other combinations - StartTime int64 - EndTime int64 + StartTime time.Time + EndTime time.Time // Default 500; max 1000. Limit int } diff --git a/exchanges/binanceus/binanceus_wrapper.go b/exchanges/binanceus/binanceus_wrapper.go index d805788c..3b387ae9 100644 --- a/exchanges/binanceus/binanceus_wrapper.go +++ b/exchanges/binanceus/binanceus_wrapper.go @@ -467,8 +467,8 @@ func (e *Exchange) GetHistoricTrades(ctx context.Context, p currency.Pair, asset } req := AggregatedTradeRequestParams{ Symbol: p, - StartTime: timestampStart.UnixMilli(), - EndTime: timestampEnd.UnixMilli(), + StartTime: timestampStart, + EndTime: timestampEnd, } trades, err := e.GetAggregateTrades(ctx, &req) if err != nil {