Binance/BinanceUS: Fix various live test failures (#2019)

* Binance: Refactor GetAggregatedTradesBatched and fix test

Fixes #2010

* Binance: Fix TestGetHistoricTrades

* BinanceUS: Refactor and fix GetAggregatedTradesBatched

* Binance: Add QuoteAsset to UCompositeIndexInfo and fix intermittent test fail

* fixup! Binance: Refactor GetAggregatedTradesBatched and fix test
This commit is contained in:
Gareth Kirwan
2025-09-19 11:02:28 +07:00
committed by GitHub
parent 9725191be6
commit dda5f8c67a
8 changed files with 401 additions and 281 deletions

View File

@@ -269,40 +269,42 @@ func (e *Exchange) GetAggregatedTrades(ctx context.Context, arg *AggregatedTrade
}
var resp []AggregatedTrade
path := aggregatedTrades + "?" + params.Encode()
return resp, e.SendHTTPRequest(ctx,
exchange.RestSpotSupplementary, path, spotDefaultRate, &resp)
return resp, e.SendHTTPRequest(ctx, exchange.RestSpotSupplementary, path, spotDefaultRate, &resp)
}
// batchAggregateTrades fetches trades in multiple requests
// first phase, hourly requests until the first trade (or end time) is reached
// second phase, limit requests from previous trade until end time (or limit) is reached
func (e *Exchange) batchAggregateTrades(ctx context.Context, arg *AggregatedTradeRequestParams, params url.Values) ([]AggregatedTrade, error) {
// If no args.FromID is passed, requests are made intervals doubling from 10s until we get a viable starting position.
// Once the starting set is found, we continue scanning until we have all the trades in the time window
func (e *Exchange) batchAggregateTrades(ctx context.Context, args *AggregatedTradeRequestParams, params url.Values) ([]AggregatedTrade, error) {
var resp []AggregatedTrade
// prepare first request with only first hour and max limit
if arg.Limit == 0 || arg.Limit > 1000 {
if args.Limit == 0 || args.Limit > 1000 {
// Extend from the default of 500
params.Set("limit", "1000")
}
var fromID int64
if arg.FromID > 0 {
fromID = arg.FromID
if args.FromID > 0 {
fromID = args.FromID
} else {
// Only 10 seconds is used to prevent limit of 1000 being reached in the first request,
// cutting off trades for high activity pairs
// Only 10 seconds is used to prevent limit of 1000 being reached in the first request, cutting off trades for high activity pairs
// If we don't find anything we keep increasing the window by doubling the interval and scanning again
increment := time.Second * 10
for start := arg.StartTime; len(resp) == 0; start = start.Add(increment) {
if !arg.EndTime.IsZero() && start.After(arg.EndTime) {
// All requests returned empty
for start := args.StartTime; len(resp) == 0; start, increment = start.Add(increment), increment*2 {
if !args.EndTime.IsZero() && start.After(args.EndTime) || increment <= 0 {
// All requests returned empty or we searched until increment overflowed
return nil, nil
}
params.Set("startTime", strconv.FormatInt(start.UnixMilli(), 10))
params.Set("endTime", strconv.FormatInt(start.Add(increment).UnixMilli(), 10))
end := start.Add(increment)
if !args.EndTime.IsZero() && end.After(args.EndTime) {
end = args.EndTime
}
params.Set("endTime", strconv.FormatInt(end.UnixMilli(), 10))
path := aggregatedTrades + "?" + params.Encode()
err := e.SendHTTPRequest(ctx,
exchange.RestSpotSupplementary, path, spotDefaultRate, &resp)
err := e.SendHTTPRequest(ctx, exchange.RestSpotSupplementary, path, spotDefaultRate, &resp)
if err != nil {
return resp, fmt.Errorf("%w %v", err, arg.Symbol)
return resp, fmt.Errorf("%w %v", err, args.Symbol)
}
}
fromID = resp[len(resp)-1].ATradeID
@@ -311,38 +313,34 @@ func (e *Exchange) batchAggregateTrades(ctx context.Context, arg *AggregatedTrad
// other requests follow from the last aggregate trade id and have no time window
params.Del("startTime")
params.Del("endTime")
// while we haven't reached the limit
for ; arg.Limit == 0 || len(resp) < arg.Limit; fromID = resp[len(resp)-1].ATradeID {
outer:
for ; args.Limit == 0 || len(resp) < args.Limit; fromID = resp[len(resp)-1].ATradeID {
// Keep requesting new data after last retrieved trade
params.Set("fromId", strconv.FormatInt(fromID, 10))
path := aggregatedTrades + "?" + params.Encode()
var additionalTrades []AggregatedTrade
err := e.SendHTTPRequest(ctx,
exchange.RestSpotSupplementary,
path,
spotDefaultRate,
&additionalTrades)
if err != nil {
return resp, fmt.Errorf("%w %v", err, arg.Symbol)
if err := e.SendHTTPRequest(ctx, exchange.RestSpotSupplementary, path, spotDefaultRate, &additionalTrades); err != nil {
return resp, fmt.Errorf("%w %v", err, args.Symbol)
}
lastIndex := len(additionalTrades)
if !arg.EndTime.IsZero() {
// get index for truncating to end time
lastIndex = sort.Search(len(additionalTrades), func(i int) bool {
return arg.EndTime.Before(additionalTrades[i].TimeStamp.Time())
})
switch len(additionalTrades) {
case 0, 1:
break outer // We only got the one we already have
default:
additionalTrades = additionalTrades[1:] // Remove the record we already have
}
// don't include the first as the request was inclusive from last ATradeID
resp = append(resp, additionalTrades[1:lastIndex]...)
// If only the starting trade is returned or if we received trades after end time
if len(additionalTrades) == 1 || lastIndex < len(additionalTrades) {
// We found the end
break
if !args.EndTime.IsZero() {
// Check if only some of the results are before EndTime
if afterEnd := sort.Search(len(additionalTrades), func(i int) bool {
return args.EndTime.Before(additionalTrades[i].TimeStamp.Time())
}); afterEnd < len(additionalTrades) {
resp = append(resp, additionalTrades[:afterEnd]...)
break
}
}
resp = append(resp, additionalTrades...)
}
// Truncate if necessary
if arg.Limit > 0 && len(resp) > arg.Limit {
resp = resp[:arg.Limit]
if args.Limit > 0 && len(resp) > args.Limit {
resp = resp[:args.Limit]
}
return resp, nil
}

View File

@@ -368,18 +368,20 @@ func TestUTakerBuySellVol(t *testing.T) {
func TestUCompositeIndexInfo(t *testing.T) {
t.Parallel()
cp, err := currency.NewPairFromString("DEFI-USDT")
if err != nil {
t.Error(err)
}
_, err = e.UCompositeIndexInfo(t.Context(), cp)
if err != nil {
t.Error(err)
}
_, err = e.UCompositeIndexInfo(t.Context(), currency.EMPTYPAIR)
if err != nil {
t.Error(err)
}
r, err := e.UCompositeIndexInfo(t.Context(), currency.EMPTYPAIR)
require.NoError(t, err, "UCompositeIndexInfo with no pair must not error")
require.NotEmpty(t, r, "UCompositeIndexInfo must return composite index info")
cp, err := currency.NewPairFromString(r[0].Symbol)
require.NoErrorf(t, err, "NewPairFromString must not error for symbol %s", r[0].Symbol)
r, err = e.UCompositeIndexInfo(t.Context(), cp)
require.NoErrorf(t, err, "UCompositeIndexInfo for pair %s must not error", cp)
require.NotEmptyf(t, r, "UCompositeIndexInfo for pair %s must return composite index info", cp)
require.NotEmptyf(t, r[0].BaseAssetList, "UCompositeIndexInfo for pair %s must return a non empty base asset list", cp)
b := r[0].BaseAssetList[0]
assert.NotEmpty(t, b.BaseAsset, "BaseAsset should be set")
assert.NotEmpty(t, b.QuoteAsset, "QuoteAsset asset should be set")
assert.NotZero(t, b.WeightInQuantity, "WeightInQuantity should be set")
assert.NotZero(t, b.WeightInPercentage, "WeightInPercentage should be set")
}
func TestUFuturesNewOrder(t *testing.T) {
@@ -1477,109 +1479,108 @@ func TestNewOrderTest(t *testing.T) {
func TestGetHistoricTrades(t *testing.T) {
t.Parallel()
p := currency.NewBTCUSDT()
start := time.Unix(1577977445, 0) // 2020-01-02 15:04:05
end := start.Add(15 * time.Minute) // 2020-01-02 15:19:05
result, err := e.GetHistoricTrades(t.Context(), p, asset.Spot, start, end)
assert.NoError(t, err, "GetHistoricTrades should not error")
expected := 2134
start := time.Now().Add(-time.Hour * 24 * 90).Truncate(time.Minute) // 3 months ago
if mockTests {
expected = 1002
start = time.Unix(1577977445, 0) // 2020-01-02 15:04:05
}
assert.Equal(t, expected, len(result), "GetHistoricTrades should return correct number of entries")
end := start.Add(15 * time.Minute)
result, err := e.GetHistoricTrades(t.Context(), p, asset.Spot, start, end)
require.NoError(t, err, "GetHistoricTrades must not error")
assert.Greater(t, len(result), 1001, "GetHistoricTrades should have enough trades")
for _, r := range result {
if !assert.WithinRange(t, r.Timestamp, start, end, "All trades should be within time range") {
break
}
require.WithinRange(t, r.Timestamp, start, end, "All trades must be within time range")
}
}
// TestGetAggregatedTradesBatched exercises TestGetAggregatedTradesBatched to ensure our date and limit scanning works correctly
// This test is susceptible to failure if volumes change a lot, during wash trading or zero-fee periods
// In live tests, 45 minutes is expected to return more than 1000 records
func TestGetAggregatedTradesBatched(t *testing.T) {
t.Parallel()
currencyPair, err := currency.NewPairFromString("BTCUSDT")
if err != nil {
t.Fatal(err)
type testCase struct {
name string
args *AggregatedTradeRequestParams
expFunc func(*testing.T, []AggregatedTrade)
}
start := time.Date(2020, 1, 2, 15, 4, 5, 0, time.UTC)
expectTime := time.Date(2020, 1, 2, 16, 19, 4, 831_000_000, time.UTC)
tests := []struct {
name string
// mock test or live test
mock bool
args *AggregatedTradeRequestParams
numExpected int
lastExpected time.Time
}{
{
name: "mock batch with timerange",
mock: true,
args: &AggregatedTradeRequestParams{
Symbol: currencyPair,
StartTime: start,
EndTime: start.Add(75 * time.Minute),
var tests []testCase
if mockTests {
start := time.Date(2020, 1, 2, 15, 4, 5, 0, time.UTC)
tests = []testCase{
{
name: "mock batch with timerange",
args: &AggregatedTradeRequestParams{StartTime: start, EndTime: start.Add(75 * time.Minute)},
expFunc: func(t *testing.T, results []AggregatedTrade) {
t.Helper()
require.Equal(t, 1012, len(results), "must return correct number of records")
assert.Equal(t,
time.Date(2020, 1, 2, 16, 18, 31, int(919*time.Millisecond), time.UTC),
results[len(results)-1].TimeStamp.Time().UTC(),
"should return the correct time for the last record",
)
},
},
numExpected: 1012,
lastExpected: time.Date(2020, 1, 2, 16, 18, 31, int(919*time.Millisecond), time.UTC),
},
{
name: "batch with timerange",
args: &AggregatedTradeRequestParams{
Symbol: currencyPair,
StartTime: start,
EndTime: start.Add(75 * time.Minute),
{
name: "mock custom limit with start time set, no end time",
args: &AggregatedTradeRequestParams{StartTime: start, Limit: 1001},
expFunc: func(t *testing.T, results []AggregatedTrade) {
t.Helper()
require.Equal(t, 1001, len(results), "must return correct number of records")
assert.Equal(t,
time.Date(2020, 1, 2, 15, 18, 39, int(226*time.Millisecond), time.UTC),
results[len(results)-1].TimeStamp.Time().UTC(),
"should return the correct time for the last record",
)
},
},
numExpected: 12130,
lastExpected: expectTime,
},
{
name: "mock custom limit with start time set, no end time",
mock: true,
args: &AggregatedTradeRequestParams{
Symbol: currency.NewBTCUSDT(),
StartTime: start,
Limit: 1001,
{
name: "mock limit less than returned",
args: &AggregatedTradeRequestParams{Limit: 3},
expFunc: func(t *testing.T, results []AggregatedTrade) {
t.Helper()
require.Equal(t, 3, len(results), "must return correct number of records")
assert.Equal(t,
time.Date(2020, 1, 2, 16, 19, 5, int(200*time.Millisecond), time.UTC),
results[len(results)-1].TimeStamp.Time().UTC(),
"should return the correct time for the last record",
)
},
},
numExpected: 1001,
lastExpected: time.Date(2020, 1, 2, 15, 18, 39, int(226*time.Millisecond), time.UTC),
},
{
name: "custom limit with start time set, no end time",
args: &AggregatedTradeRequestParams{
Symbol: currency.NewBTCUSDT(),
StartTime: time.Date(2020, 11, 18, 23, 0, 28, 921, time.UTC),
Limit: 1001,
}
} else {
start := time.Now().Add(-time.Hour * 24 * 90).Truncate(time.Minute) // 3 months ago
tests = []testCase{
{
name: "batch with timerange",
args: &AggregatedTradeRequestParams{StartTime: start, EndTime: start.Add(20 * time.Minute)},
expFunc: func(t *testing.T, results []AggregatedTrade) {
t.Helper()
// 2000-50000 records range was valid in 2025; Adjust if Binance enters a phase of zero-fees or low-volume
require.Greater(t, len(results), 2000, "must return a quantity above a sane threshold of records")
assert.Less(t, len(results), 50000, "should return a quantity below a sane threshold of records")
assert.WithinDuration(t, results[len(results)-1].TimeStamp.Time(), start, 20*time.Minute, "last record should be within range of start time")
},
},
numExpected: 1001,
lastExpected: time.Date(2020, 11, 18, 23, 1, 33, int(62*time.Millisecond*10), time.UTC),
},
{
name: "mock recent trades",
mock: true,
args: &AggregatedTradeRequestParams{
Symbol: currency.NewBTCUSDT(),
Limit: 3,
{
name: "custom limit with start time set, no end time",
args: &AggregatedTradeRequestParams{StartTime: start, Limit: 2042},
expFunc: func(t *testing.T, results []AggregatedTrade) {
t.Helper()
// 2000 records in was about 6 minutes in 2025; Adjust if Binance enters a phase of zero-fees or low-volume
require.Equal(t, 2042, len(results), "must return exactly the limit number of records")
assert.WithinDuration(t, results[len(results)-1].TimeStamp.Time(), start, 20*time.Minute, "last record should be within 20 minutes of start time")
},
},
numExpected: 3,
lastExpected: time.Date(2020, 1, 2, 16, 19, 5, int(200*time.Millisecond), time.UTC),
},
}
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
if tt.mock != mockTests {
t.Skip("mock mismatch, skipping")
}
result, err := e.GetAggregatedTrades(t.Context(), tt.args)
if err != nil {
t.Error(err)
}
if len(result) != tt.numExpected {
t.Errorf("GetAggregatedTradesBatched() expected %v entries, got %v", tt.numExpected, len(result))
}
lastTradeTime := result[len(result)-1].TimeStamp
if !lastTradeTime.Time().Equal(tt.lastExpected) {
t.Errorf("last trade expected %v, got %v", tt.lastExpected.UTC(), lastTradeTime.Time().UTC())
}
tt.args.Symbol = currency.NewBTCUSDT()
results, err := e.GetAggregatedTrades(t.Context(), tt.args)
require.NoError(t, err)
tt.expFunc(t, results)
})
}
}

View File

@@ -90751,89 +90751,165 @@
},
"/fapi/v1/indexInfo": {
"GET": [
{
"data": {
"baseAssetList": [
{
"baseAsset": "AAVE",
"weightInPercentage": "0.07255900",
"weightInQuantity": "0.57321317"
},
{
"baseAsset": "ALPHA",
"weightInPercentage": "0.02039700",
"weightInQuantity": "60.33241822"
},
{
"baseAsset": "BAL",
"weightInPercentage": "0.02362800",
"weightInQuantity": "1.24967422"
},
{
"baseAsset": "BAND",
"weightInPercentage": "0.03501400",
"weightInQuantity": "3.29870857"
},
{
"baseAsset": "BEL",
"weightInPercentage": "0.01843900",
"weightInQuantity": "8.75756110"
},
{
"baseAsset": "BZRX",
"weightInPercentage": "0.02294900",
"weightInQuantity": "67.18425714"
}
],
"symbol": "DEFIUSDT",
"time": 1608086821000
},
"queryString": "symbol=DEFIUSDT",
"bodyParams": "",
"headers": {}
},
{
"data": [
{
"baseAssetList": [
{
"baseAsset": "AAVE",
"weightInPercentage": "0.07255900",
"weightInQuantity": "0.57321317"
"baseAsset": "BTC",
"quoteAsset": "AAVE",
"weightInPercentage": "0.00376500",
"weightInQuantity": "0.03738265"
},
{
"baseAsset": "ALPHA",
"weightInPercentage": "0.02039700",
"weightInQuantity": "60.33241822"
"baseAsset": "BTC",
"quoteAsset": "ADA",
"weightInPercentage": "0.02638100",
"weightInQuantity": "0.00078048"
},
{
"baseAsset": "BAL",
"weightInPercentage": "0.02362800",
"weightInQuantity": "1.24967422"
"baseAsset": "BTC",
"quoteAsset": "AVAX",
"weightInPercentage": "0.01082800",
"weightInQuantity": "0.01147365"
},
{
"baseAsset": "BAND",
"weightInPercentage": "0.03501400",
"weightInQuantity": "3.29870857"
"baseAsset": "BTC",
"quoteAsset": "BCH",
"weightInPercentage": "0.00992300",
"weightInQuantity": "0.20359666"
},
{
"baseAsset": "BEL",
"weightInPercentage": "0.01843900",
"weightInQuantity": "8.75756110"
},
{
"baseAsset": "BZRX",
"weightInPercentage": "0.02294900",
"weightInQuantity": "67.18425714"
"baseAsset": "BTC",
"quoteAsset": "BNB",
"weightInPercentage": "0.11126600",
"weightInQuantity": "3.58068053"
}
],
"symbol": "DEFIUSDT",
"time": 1608086822007
"component": "quoteAsset",
"symbol": "BTCDOMUSDT",
"time": 1758250144006
},
{
"baseAssetList": [
{
"baseAsset": "1000000BOB",
"quoteAsset": "USDT",
"weightInPercentage": "0.00204918",
"weightInQuantity": "0.04619791"
},
{
"baseAsset": "1000000MOG",
"quoteAsset": "USDT",
"weightInPercentage": "0.00204918",
"weightInQuantity": "0.00254617"
},
{
"baseAsset": "1000BONK",
"quoteAsset": "USDT",
"weightInPercentage": "0.00204918",
"weightInQuantity": "0.10008223"
},
{
"baseAsset": "1000CAT",
"quoteAsset": "USDT",
"weightInPercentage": "0.00204918",
"weightInQuantity": "0.29881191"
},
{
"baseAsset": "1000CHEEMS",
"quoteAsset": "USDT",
"weightInPercentage": "0.00204918",
"weightInQuantity": "1.98279472"
}
],
"component": "baseAsset",
"symbol": "ALLUSDT",
"time": 1758250144006
},
{
"baseAssetList": [
{
"baseAsset": "1000000BOB",
"quoteAsset": "USDT",
"weightInPercentage": "0.01000000",
"weightInQuantity": "0.21446069"
},
{
"baseAsset": "1000WHY",
"quoteAsset": "USDT",
"weightInPercentage": "0.01000000",
"weightInQuantity": "333.26983733"
},
{
"baseAsset": "1000X",
"quoteAsset": "USDT",
"weightInPercentage": "0.01000000",
"weightInQuantity": "0.22663334"
},
{
"baseAsset": "AGT",
"quoteAsset": "USDT",
"weightInPercentage": "0.01000000",
"weightInQuantity": "1.63266620"
},
{
"baseAsset": "AIN",
"quoteAsset": "USDT",
"weightInPercentage": "0.01000000",
"weightInQuantity": "0.08414804"
}
],
"component": "baseAsset",
"symbol": "SMALLUSDT",
"time": 1758250144006
}
],
"queryString": "",
"bodyParams": "",
"headers": {}
},
{
"data": {
"baseAssetList": [
{
"baseAsset": "BTC",
"quoteAsset": "AAVE",
"weightInPercentage": "0.00376500",
"weightInQuantity": "0.03738265"
},
{
"baseAsset": "BTC",
"quoteAsset": "ADA",
"weightInPercentage": "0.02638100",
"weightInQuantity": "0.00078048"
},
{
"baseAsset": "BTC",
"quoteAsset": "AVAX",
"weightInPercentage": "0.01082800",
"weightInQuantity": "0.01147365"
},
{
"baseAsset": "BTC",
"quoteAsset": "BCH",
"weightInPercentage": "0.00992300",
"weightInQuantity": "0.20359666"
},
{
"baseAsset": "BTC",
"quoteAsset": "BNB",
"weightInPercentage": "0.11126600",
"weightInQuantity": "3.58068053"
}
],
"component": "quoteAsset",
"symbol": "BTCDOMUSDT",
"time": 1758250145000
},
"queryString": "symbol=BTCDOMUSDT",
"bodyParams": "",
"headers": {}
}
]
},

View File

@@ -185,9 +185,10 @@ type UCompositeIndexInfoData struct {
Symbol string `json:"symbol"`
Time types.Time `json:"time"`
BaseAssetList []struct {
BaseAsset string `json:"baseAsset"`
WeightInQuantity float64 `json:"weightInQuantity,string"`
WeightInPercentage float64 `json:"weightInPercentage,string"`
BaseAsset currency.Code `json:"baseAsset"`
QuoteAsset currency.Code `json:"quoteAsset"`
WeightInQuantity float64 `json:"weightInQuantity,string"`
WeightInPercentage float64 `json:"weightInPercentage,string"`
} `json:"baseAssetList"`
}

View File

@@ -123,7 +123,6 @@ var (
errInvalidAssetValue = errors.New("invalid asset ")
errInvalidAssetAmount = errors.New("invalid asset amount")
errIncompleteArguments = errors.New("missing required argument")
errStartTimeOrFromIDNotSet = errors.New("please set StartTime or FromId, but not both")
errMissingRequiredArgumentCoin = errors.New("missing required argument,coin")
errMissingRequiredArgumentNetwork = errors.New("missing required argument,network")
errAmountValueMustBeGreaterThan0 = errors.New("amount must be greater than 0")
@@ -203,84 +202,81 @@ func (e *Exchange) GetHistoricalTrades(ctx context.Context, hist HistoricalTrade
// GetAggregateTrades to get compressed, aggregate trades. Trades that fill at the time, from the same order, with the same price will have the quantity aggregated.
func (e *Exchange) GetAggregateTrades(ctx context.Context, agg *AggregatedTradeRequestParams) ([]AggregatedTrade, error) {
params := url.Values{}
symbol, err := e.FormatSymbol(agg.Symbol, asset.Spot)
s, err := e.FormatSymbol(agg.Symbol, asset.Spot)
if err != nil {
return nil, err
}
params.Set("symbol", symbol)
needBatch := false
if agg.Limit > 0 {
if agg.Limit > 1000 {
needBatch = true
} else {
params.Set("limit", strconv.Itoa(agg.Limit))
}
params.Set("symbol", s)
// If the user request is directly not supported by the exchange, we might be able to fulfill it
// by merging results from multiple API requests
needBatch := true // Need to batch unless user has specified a limit
if agg.Limit > 0 && agg.Limit <= 1000 {
needBatch = false
params.Set("limit", strconv.Itoa(agg.Limit))
}
if agg.FromID != 0 {
params.Set("fromId", strconv.FormatInt(agg.FromID, 10))
}
startTime := time.UnixMilli(agg.StartTime)
endTime := time.UnixMilli(agg.EndTime)
if (endTime.UnixNano() - startTime.UnixNano()) >= int64(time.Hour) {
endTime = startTime.Add(time.Minute * 59)
if !agg.StartTime.IsZero() {
params.Set("startTime", strconv.FormatInt(agg.StartTime.UnixMilli(), 10))
}
if !agg.EndTime.IsZero() {
params.Set("endTime", strconv.FormatInt(agg.EndTime.UnixMilli(), 10))
}
if !startTime.IsZero() && startTime.Unix() != 0 {
params.Set("startTime", strconv.FormatInt(agg.StartTime, 10))
}
if !endTime.IsZero() && endTime.Unix() != 0 {
params.Set("endTime", strconv.FormatInt(agg.EndTime, 10))
}
needBatch = needBatch || (!startTime.IsZero() && !endTime.IsZero() && endTime.Sub(startTime) > time.Hour)
// startTime and endTime are set and time between startTime and endTime is more than 1 hour
needBatch = needBatch || (!agg.StartTime.IsZero() && !agg.EndTime.IsZero() && agg.EndTime.Sub(agg.StartTime) > time.Hour)
// Fall back to batch requests, if possible and necessary
if needBatch {
// fromId xor start time must be set
canBatch := agg.FromID == 0 != startTime.IsZero()
// fromId or start time must be set
canBatch := agg.FromID == 0 != agg.StartTime.IsZero()
if canBatch {
// Split the request into multiple
return e.batchAggregateTrades(ctx, agg, params)
}
// Can't handle this request locally or remotely
// We would receive {"code":-1128,"msg":"Combination of optional parameters invalid."}
return nil, errStartTimeOrFromIDNotSet
return nil, errors.New("please set StartTime or FromId, but not both")
}
var resp []AggregatedTrade
path := common.EncodeURLValues(aggregatedTrades, params)
return resp, e.SendHTTPRequest(ctx,
exchange.RestSpotSupplementary, path, spotDefaultRate, &resp)
path := aggregatedTrades + "?" + params.Encode()
return resp, e.SendHTTPRequest(ctx, exchange.RestSpotSupplementary, path, spotDefaultRate, &resp)
}
// batchAggregateTrades fetches trades in multiple requests <-- copied and amended from the binance
// first phase, hourly requests until the first trade (or end time) is reached
// second phase, limit requests from previous trade until end time (or limit) is reached
func (e *Exchange) batchAggregateTrades(ctx context.Context, arg *AggregatedTradeRequestParams, params url.Values) ([]AggregatedTrade, error) {
func (e *Exchange) batchAggregateTrades(ctx context.Context, args *AggregatedTradeRequestParams, params url.Values) ([]AggregatedTrade, error) {
var resp []AggregatedTrade
// prepare first request with only first hour and max limit
if arg.Limit == 0 || arg.Limit > 1000 {
if args.Limit == 0 || args.Limit > 1000 {
// Extend from the default of 500
params.Set("limit", "1000")
}
startTime := time.UnixMilli(arg.StartTime)
endTime := time.UnixMilli(arg.EndTime)
var fromID int64
if arg.FromID > 0 {
fromID = arg.FromID
if args.FromID > 0 {
fromID = args.FromID
} else {
// Only 10 seconds is used to prevent limit of 1000 being reached in the first request,
// cutting off trades for high activity pairs
// Only 10 seconds is used to prevent limit of 1000 being reached in the first request, cutting off trades for high activity pairs
// If we don't find anything we keep increasing the window by doubling the interval and scanning again
increment := time.Second * 10
for len(resp) == 0 {
startTime = startTime.Add(increment)
if !endTime.IsZero() && startTime.After(endTime) {
// All requests returned empty
for start := args.StartTime; len(resp) == 0; start, increment = start.Add(increment), increment*2 {
if !args.EndTime.IsZero() && start.After(args.EndTime) || increment <= 0 {
// All requests returned empty or we searched until increment overflowed
return nil, nil
}
params.Set("startTime", strconv.FormatInt(startTime.UnixMilli(), 10))
params.Set("endTime", strconv.FormatInt(startTime.Add(increment).UnixMilli(), 10))
path := common.EncodeURLValues(aggregatedTrades, params)
err := e.SendHTTPRequest(ctx,
exchange.RestSpotSupplementary, path, spotDefaultRate, &resp)
params.Set("startTime", strconv.FormatInt(start.UnixMilli(), 10))
end := start.Add(increment)
if !args.EndTime.IsZero() && end.After(args.EndTime) {
end = args.EndTime
}
params.Set("endTime", strconv.FormatInt(end.UnixMilli(), 10))
path := aggregatedTrades + "?" + params.Encode()
err := e.SendHTTPRequest(ctx, exchange.RestSpotSupplementary, path, spotDefaultRate, &resp)
if err != nil {
return resp, err
return resp, fmt.Errorf("%w %v", err, args.Symbol)
}
}
fromID = resp[len(resp)-1].ATradeID
@@ -289,37 +285,34 @@ func (e *Exchange) batchAggregateTrades(ctx context.Context, arg *AggregatedTrad
// other requests follow from the last aggregate trade id and have no time window
params.Del("startTime")
params.Del("endTime")
// while we haven't reached the limit
for ; arg.Limit == 0 || len(resp) < arg.Limit; fromID = resp[len(resp)-1].ATradeID {
outer:
for ; args.Limit == 0 || len(resp) < args.Limit; fromID = resp[len(resp)-1].ATradeID {
// Keep requesting new data after last retrieved trade
params.Set("fromId", strconv.FormatInt(fromID, 10))
path := common.EncodeURLValues(aggregatedTrades, params)
path := aggregatedTrades + "?" + params.Encode()
var additionalTrades []AggregatedTrade
err := e.SendHTTPRequest(ctx,
exchange.RestSpotSupplementary,
path,
spotDefaultRate,
&additionalTrades)
if err != nil {
return resp, err
if err := e.SendHTTPRequest(ctx, exchange.RestSpotSupplementary, path, spotDefaultRate, &additionalTrades); err != nil {
return resp, fmt.Errorf("%w %v", err, args.Symbol)
}
lastIndex := len(additionalTrades)
if !endTime.IsZero() && endTime.Unix() != 0 {
// get index for truncating to end time
lastIndex = sort.Search(len(additionalTrades), func(i int) bool {
return endTime.Before(additionalTrades[i].TimeStamp.Time())
})
switch len(additionalTrades) {
case 0, 1:
break outer // We only got the one we already have
default:
additionalTrades = additionalTrades[1:] // Remove the record we already have
}
// don't include the first as the request was inclusive from last ATradeID
resp = append(resp, additionalTrades[1:lastIndex]...)
// If only the starting trade is returned or if we received trades after end time
if len(additionalTrades) == 1 || lastIndex < len(additionalTrades) {
break
if !args.EndTime.IsZero() {
// Check if only some of the results are before EndTime
if afterEnd := sort.Search(len(additionalTrades), func(i int) bool {
return args.EndTime.Before(additionalTrades[i].TimeStamp.Time())
}); afterEnd < len(additionalTrades) {
resp = append(resp, additionalTrades[:afterEnd]...)
break
}
}
resp = append(resp, additionalTrades...)
}
// Truncate if necessary
if arg.Limit > 0 && len(resp) > arg.Limit {
resp = resp[:arg.Limit]
if args.Limit > 0 && len(resp) > args.Limit {
resp = resp[:args.Limit]
}
return resp, nil
}

View File

@@ -140,10 +140,14 @@ func TestGetRecentTrades(t *testing.T) {
func TestGetHistoricTrades(t *testing.T) {
t.Parallel()
pair := currency.Pair{Base: currency.BTC, Quote: currency.USD}
_, err := e.GetHistoricTrades(t.Context(), pair, asset.Spot, time.Time{}, time.Time{})
if err != nil {
t.Error("Binanceus GetHistoricTrades() error", err)
p := currency.NewBTCUSDT()
start := time.Now().Add(-time.Hour * 24 * 90).Truncate(time.Minute) // 3 months ago
end := start.Add(15 * time.Minute)
result, err := e.GetHistoricTrades(t.Context(), p, asset.Spot, start, end)
require.NoError(t, err, "GetHistoricTrades must not error")
assert.NotEmpty(t, result, "GetHistoricTrades should have trades")
for _, r := range result {
require.WithinRange(t, r.Timestamp, start, end, "All trades must be within time range")
}
}
@@ -1756,3 +1760,50 @@ func TestGetCurrencyTradeURL(t *testing.T) {
assert.NotEmpty(t, resp)
}
}
// TestGetAggregatedTradesBatched exercises TestGetAggregatedTradesBatched to ensure our date and limit scanning works correctly
// This test is susceptible to failure if volumes change a lot, during wash trading or zero-fee periods
// In live tests, 6 hours is expected to return about 1000 records
func TestGetAggregatedTradesBatched(t *testing.T) {
t.Parallel()
type testCase struct {
name string
args *AggregatedTradeRequestParams
expFunc func(*testing.T, []AggregatedTrade)
}
var tests []testCase
start := time.Now().Add(-time.Hour * 24 * 90).Truncate(time.Minute) // 3 months ago
tests = []testCase{
{
name: "batch with timerange",
args: &AggregatedTradeRequestParams{StartTime: start, EndTime: start.Add(6 * time.Hour)},
expFunc: func(t *testing.T, results []AggregatedTrade) {
t.Helper()
require.NotEmpty(t, results, "must have records")
assert.Less(t, len(results), 10000, "should return a quantity below a sane threshold of records")
assert.WithinDuration(t, results[len(results)-1].TimeStamp.Time(), start, 6*time.Hour, "last record should be within range of start time")
},
},
{
name: "custom limit with start time set, no end time",
args: &AggregatedTradeRequestParams{StartTime: start, Limit: 2042},
expFunc: func(t *testing.T, results []AggregatedTrade) {
t.Helper()
// 2000 records in was about 32 hours in 2025; Adjust if BinanceUS enters a phase of zero-fees or low-volume
require.Equal(t, 2042, len(results), "must return exactly the limit number of records")
assert.WithinDuration(t, results[len(results)-1].TimeStamp.Time(), start, 72*time.Hour, "last record should be within 72 hours of start time")
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
tt.args.Symbol = currency.NewBTCUSDT()
results, err := e.GetAggregateTrades(t.Context(), tt.args)
require.NoError(t, err)
tt.expFunc(t, results)
})
}
}

View File

@@ -129,8 +129,8 @@ type AggregatedTradeRequestParams struct {
// The first trade to retrieve
FromID int64
// The API seems to accept (start and end time) or FromID and no other combinations
StartTime int64
EndTime int64
StartTime time.Time
EndTime time.Time
// Default 500; max 1000.
Limit int
}

View File

@@ -467,8 +467,8 @@ func (e *Exchange) GetHistoricTrades(ctx context.Context, p currency.Pair, asset
}
req := AggregatedTradeRequestParams{
Symbol: p,
StartTime: timestampStart.UnixMilli(),
EndTime: timestampEnd.UnixMilli(),
StartTime: timestampStart,
EndTime: timestampEnd,
}
trades, err := e.GetAggregateTrades(ctx, &req)
if err != nil {