Feature: Candle conversion & Candle validation (#716)

* Remove old concept. Introduce new job types and candle scaling

* Adds extra processing, commands

* new concept for queued jobs. Jobs can pause. New commands to manage status

* =End of day commit designing tables and implementing prerequisites further.

* Adds postgres data history relations

* Fixes table design for sqlite. Fixes all issues from merge

* Fixes craziness of database design. Adds some functions to get related jobs

* Fixes errors

* Updates some documentation, manages prerequisite jobs a little better, adds rpc funcs

* Fixes database design and adjust repo functions

* Tests database relationship

* Test coverage of new job functions

* Finishes coverage of new functions

* Commands and RPC coverage

* New database modifications for new job types

* Adds db support of new columns. Adds conversion validation. lint

* command blurb changes

* Allows websocket test to pass consistently

* Fixes merge issue preventing datahistorymanager from starting via config

* Minor fixes for different job type processing

* Fixes rangeholder issue, fixes validation, does not address jobs not starting or wrong status

* Fixes database tests, but at what cost. Fixes dhm tests

* Fixes dhj completion issue. Adds prerequisite by nickname

* Fixes validation processing. Adds db tests and validation

* Fixes validation job processing range

* Fixes trade sql. Reduces defaults. Validation processing and errors

* Updates cli job commands. adds validation decimal. fix job validation

* Expands run job handling and tests

* Validation work

* Fixes validation processing

* candle relations. new job type. updating database design

* Adds secondary exchange support. Sets stage for candle override

* Re adds accidentally deleted relationship

* Updates loading and saving candles to have relationship data when relevant

* Now validates and replaces candle data appropriately

* Fixes getting and setting datahistory data. Neatens DHM

* Test coverage

* Updates proto for new db types. New test coverage. Secondary exchange work

* Investigation into never-ending validation jobs. Now that intervals are ruled out, now need to complete the job....

* Fixes issues with validation job completion. Fixes validation volume issue for secondary exchange

* Adds candle warning support to the backtester

* Fixes warnings

* lint and begin docs

* Documentation updates. Final testing changes

* Minor fixes

* docs, prerequisite checks, more testing

* Fixes binance trade test. Rename err

* Documentation fixes. Figure fixes

* documentation update

* Fixes remote PSQL tests

* Fix binance mock test

* Remove unnecessary JSON

* regen proto

* Some minor nit fixes

* Var usage, query sorting, log improving, sql mirroring

* Extra coverage

* Experimental removal of m.jobs and mutex. Fix messaging

* Fixes error

* Lint fixes, command description improvements. More isRunning gates

* description improvements

* Lint

* BUFF regenerate

* Rough concept to fix insertions taking up long periods of time

* New calculation for trade data. Adds batch saving

This also adds an experimental request feature to shut down lingering requests. However, its uncertain whether or not this is having any impact. Initially thought it was the trades that was taking time and not SQL. Will investigate further

* Removes experimental requester. Adds documentation. Fixes typo

* rm unused error

* re-adds more forgotten contributors

* Now with proper commit count
This commit is contained in:
Scott
2021-08-05 10:27:27 +10:00
committed by GitHub
parent 3b1fe81d8b
commit 48434dfd46
67 changed files with 24651 additions and 12894 deletions

View File

@@ -271,13 +271,16 @@ func (b *Binance) batchAggregateTrades(arg *AggregatedTradeRequestParams, params
if arg.FromID > 0 {
fromID = arg.FromID
} else {
for start := arg.StartTime; len(resp) == 0; start = start.Add(time.Hour) {
// Only 10 seconds is used to prevent limit of 1000 being reached in the first request,
// cutting off trades for high activity pairs
increment := time.Second * 10
for start := arg.StartTime; len(resp) == 0; start = start.Add(increment) {
if !arg.EndTime.IsZero() && !start.Before(arg.EndTime) {
// All requests returned empty
return nil, nil
}
params.Set("startTime", timeString(start))
params.Set("endTime", timeString(start.Add(time.Hour)))
params.Set("endTime", timeString(start.Add(increment)))
path := aggregatedTrades + "?" + params.Encode()
err := b.SendHTTPRequest(exchange.RestSpotSupplementary, path, spotDefaultRate, &resp)
if err != nil {

View File

@@ -1528,10 +1528,6 @@ func TestGetAggregatedTradesBatched(t *testing.T) {
if err != nil {
t.Fatal(err)
}
mockExpectTime, err := time.Parse(time.RFC3339, "2020-01-02T16:19:04.8Z")
if err != nil {
t.Fatal(err)
}
expectTime, err := time.Parse(time.RFC3339Nano, "2020-01-02T16:19:04.831Z")
if err != nil {
t.Fatal(err)
@@ -1552,8 +1548,8 @@ func TestGetAggregatedTradesBatched(t *testing.T) {
StartTime: start,
EndTime: start.Add(75 * time.Minute),
},
numExpected: 3,
lastExpected: mockExpectTime,
numExpected: 1012,
lastExpected: time.Date(2020, 1, 2, 16, 18, 31, int(919*time.Millisecond), time.UTC),
},
{
name: "batch with timerange",
@@ -1562,7 +1558,7 @@ func TestGetAggregatedTradesBatched(t *testing.T) {
StartTime: start,
EndTime: start.Add(75 * time.Minute),
},
numExpected: 4303,
numExpected: 12130,
lastExpected: expectTime,
},
{
@@ -1573,18 +1569,18 @@ func TestGetAggregatedTradesBatched(t *testing.T) {
StartTime: start,
Limit: 1001,
},
numExpected: 4,
lastExpected: time.Date(2020, 1, 2, 16, 19, 5, int(200*time.Millisecond), time.UTC),
numExpected: 1001,
lastExpected: time.Date(2020, 1, 2, 15, 18, 39, int(226*time.Millisecond), time.UTC),
},
{
name: "custom limit with start time set, no end time",
args: &AggregatedTradeRequestParams{
Symbol: currency.NewPair(currency.BTC, currency.USDT),
StartTime: time.Date(2020, 11, 18, 12, 0, 0, 0, time.UTC),
StartTime: time.Date(2020, 11, 18, 23, 0, 28, 921, time.UTC),
Limit: 1001,
},
numExpected: 1001,
lastExpected: time.Date(2020, 11, 18, 13, 0, 0, int(34*time.Millisecond), time.UTC),
lastExpected: time.Date(2020, 11, 18, 23, 1, 33, int(62*time.Millisecond*10), time.UTC),
},
{
name: "mock recent trades",
@@ -1612,7 +1608,7 @@ func TestGetAggregatedTradesBatched(t *testing.T) {
}
lastTradeTime := result[len(result)-1].TimeStamp
if !lastTradeTime.Equal(tt.lastExpected) {
t.Errorf("last trade expected %v, got %v", tt.lastExpected, lastTradeTime)
t.Errorf("last trade expected %v, got %v", tt.lastExpected.UTC(), lastTradeTime.UTC())
}
})
}

View File

@@ -331,6 +331,68 @@ func (i *Interval) IntervalsPerYear() float64 {
return float64(OneYear.Duration().Nanoseconds()) / float64(i.Duration().Nanoseconds())
}
// ConvertToNewInterval allows the scaling of candles to larger candles
// eg convert OneDay candles to ThreeDay candles, if there are adequate candles
// incomplete candles are NOT converted
// eg an 4 OneDay candles will convert to one ThreeDay candle, skipping the fourth
func ConvertToNewInterval(item *Item, newInterval Interval) (*Item, error) {
if item == nil {
return nil, errNilKline
}
if newInterval <= 0 {
return nil, ErrUnsetInterval
}
if newInterval.Duration() <= item.Interval.Duration() {
return nil, ErrCanOnlyDownscaleCandles
}
if newInterval.Duration()%item.Interval.Duration() != 0 {
return nil, ErrWholeNumberScaling
}
oldIntervalsPerNewCandle := int64(newInterval / item.Interval)
var candleBundles [][]Candle
var candles []Candle
for i := range item.Candles {
candles = append(candles, item.Candles[i])
intervalCount := int64(i + 1)
if oldIntervalsPerNewCandle == intervalCount {
candleBundles = append(candleBundles, candles)
candles = []Candle{}
}
}
responseCandle := &Item{
Exchange: item.Exchange,
Pair: item.Pair,
Asset: item.Asset,
Interval: newInterval,
}
for i := range candleBundles {
var lowest, highest, volume float64
lowest = candleBundles[i][0].Low
highest = candleBundles[i][0].High
for j := range candleBundles[i] {
volume += candleBundles[i][j].Volume
if candleBundles[i][j].Low < lowest {
lowest = candleBundles[i][j].Low
}
if candleBundles[i][j].High > highest {
lowest = candleBundles[i][j].High
}
volume += candleBundles[i][j].Volume
}
responseCandle.Candles = append(responseCandle.Candles, Candle{
Time: candleBundles[i][0].Time,
Open: candleBundles[i][0].Open,
High: highest,
Low: lowest,
Close: candleBundles[i][len(candleBundles[i])-1].Close,
Volume: volume,
})
}
return responseCandle, nil
}
// CalculateCandleDateRanges will calculate the expected candle data in intervals in a date range
// If an API is limited in the amount of candles it can make in a request, it will automatically separate
// ranges into the limit

View File

@@ -9,6 +9,7 @@ import (
"strconv"
"time"
"github.com/gofrs/uuid"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/database/repository/candle"
"github.com/thrasher-corp/gocryptotrader/database/repository/exchange"
@@ -33,13 +34,26 @@ func LoadFromDatabase(exchange string, pair currency.Pair, a asset.Item, interva
}
for x := range retCandle.Candles {
if ret.SourceJobID == uuid.Nil && retCandle.Candles[x].SourceJobID != "" {
ret.SourceJobID, err = uuid.FromString(retCandle.Candles[x].SourceJobID)
if err != nil {
return Item{}, err
}
}
if ret.ValidationJobID == uuid.Nil && retCandle.Candles[x].ValidationJobID != "" {
ret.ValidationJobID, err = uuid.FromString(retCandle.Candles[x].ValidationJobID)
if err != nil {
return Item{}, err
}
}
ret.Candles = append(ret.Candles, Candle{
Time: retCandle.Candles[x].Timestamp,
Open: retCandle.Candles[x].Open,
High: retCandle.Candles[x].High,
Low: retCandle.Candles[x].Low,
Close: retCandle.Candles[x].Close,
Volume: retCandle.Candles[x].Volume,
Time: retCandle.Candles[x].Timestamp,
Open: retCandle.Candles[x].Open,
High: retCandle.Candles[x].High,
Low: retCandle.Candles[x].Low,
Close: retCandle.Candles[x].Close,
Volume: retCandle.Candles[x].Volume,
ValidationIssues: retCandle.Candles[x].ValidationIssues,
})
}
return ret, nil
@@ -50,15 +64,12 @@ func StoreInDatabase(in *Item, force bool) (uint64, error) {
if in.Exchange == "" {
return 0, errors.New("name cannot be blank")
}
if (in.Pair == currency.Pair{}) {
if in.Pair.IsEmpty() {
return 0, errors.New("currency pair cannot be empty")
}
if in.Asset == "" {
if !in.Asset.IsValid() {
return 0, errors.New("asset cannot be blank")
}
if len(in.Candles) < 1 {
return 0, errors.New("candle data is empty")
}
@@ -77,14 +88,23 @@ func StoreInDatabase(in *Item, force bool) (uint64, error) {
}
for x := range in.Candles {
databaseCandles.Candles = append(databaseCandles.Candles, candle.Candle{
can := candle.Candle{
Timestamp: in.Candles[x].Time.Truncate(in.Interval.Duration()),
Open: in.Candles[x].Open,
High: in.Candles[x].High,
Low: in.Candles[x].Low,
Close: in.Candles[x].Close,
Volume: in.Candles[x].Volume,
})
}
if in.ValidationJobID != uuid.Nil {
can.ValidationJobID = in.ValidationJobID.String()
can.ValidationIssues = in.Candles[x].ValidationIssues
}
if in.SourceJobID != uuid.Nil {
can.SourceJobID = in.SourceJobID.String()
}
databaseCandles.Candles = append(databaseCandles.Candles, can)
}
if force {
_, err := candle.DeleteCandles(&databaseCandles)

View File

@@ -890,3 +890,90 @@ func BenchmarkJustifyIntervalTimeStoringUnixValues2(b *testing.B) {
}
}
}
func TestConvertToNewInterval(t *testing.T) {
_, err := ConvertToNewInterval(nil, OneMin)
if !errors.Is(err, errNilKline) {
t.Errorf("received '%v' expectec '%v'", err, errNilKline)
}
old := &Item{
Exchange: "lol",
Pair: currency.NewPair(currency.BTC, currency.USDT),
Asset: asset.Spot,
Interval: OneDay,
Candles: []Candle{
{
Time: time.Now(),
Open: 1337,
High: 1339,
Low: 1336,
Close: 1338,
Volume: 1337,
},
{
Time: time.Now().AddDate(0, 0, 1),
Open: 1338,
High: 2000,
Low: 1332,
Close: 1696,
Volume: 6420,
},
{
Time: time.Now().AddDate(0, 0, 2),
Open: 1696,
High: 1998,
Low: 1337,
Close: 6969,
Volume: 2520,
},
},
}
_, err = ConvertToNewInterval(old, 0)
if !errors.Is(err, ErrUnsetInterval) {
t.Errorf("received '%v' expectec '%v'", err, ErrUnsetInterval)
}
_, err = ConvertToNewInterval(old, OneMin)
if !errors.Is(err, ErrCanOnlyDownscaleCandles) {
t.Errorf("received '%v' expectec '%v'", err, ErrCanOnlyDownscaleCandles)
}
old.Interval = ThreeDay
_, err = ConvertToNewInterval(old, OneWeek)
if !errors.Is(err, ErrWholeNumberScaling) {
t.Errorf("received '%v' expectec '%v'", err, ErrWholeNumberScaling)
}
old.Interval = OneDay
newInterval := ThreeDay
newCandle, err := ConvertToNewInterval(old, newInterval)
if !errors.Is(err, nil) {
t.Errorf("received '%v' expectec '%v'", err, nil)
}
if len(newCandle.Candles) != 1 {
t.Error("expected one candle")
}
if newCandle.Candles[0].Open != 1337 &&
newCandle.Candles[0].High != 2000 &&
newCandle.Candles[0].Low != 1332 &&
newCandle.Candles[0].Close != 6969 &&
newCandle.Candles[0].Volume != (2520+6420+1337) {
t.Error("unexpected updoot")
}
old.Candles = append(old.Candles, Candle{
Time: time.Now().AddDate(0, 0, 3),
Open: 6969,
High: 1998,
Low: 2342,
Close: 7777,
Volume: 111,
})
newCandle, err = ConvertToNewInterval(old, newInterval)
if !errors.Is(err, nil) {
t.Errorf("received '%v' expectec '%v'", err, nil)
}
if len(newCandle.Candles) != 1 {
t.Error("expected one candle")
}
}

View File

@@ -4,6 +4,7 @@ import (
"errors"
"time"
"github.com/gofrs/uuid"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
)
@@ -39,12 +40,15 @@ const (
)
var (
// ErrMissingCandleData is an error for missing candle data
ErrMissingCandleData = errors.New("missing candle data")
// ErrUnsetInterval is an error for date range calculation
ErrUnsetInterval = errors.New("cannot calculate range, interval unset")
// ErrUnsupportedInterval returns when the provided interval is not supported by an exchange
ErrUnsupportedInterval = errors.New("interval unsupported by exchange")
// ErrCanOnlyDownscaleCandles returns when attempting to upscale candles
ErrCanOnlyDownscaleCandles = errors.New("interval must be a longer duration to scale")
// ErrWholeNumberScaling returns when old interval data cannot neatly fit into new interval size
ErrWholeNumberScaling = errors.New("new interval must scale properly into new candle")
errNilKline = errors.New("kline item is nil")
// SupportedIntervals is a list of all supported intervals
SupportedIntervals = []Interval{
@@ -74,21 +78,24 @@ var (
// Item holds all the relevant information for internal kline elements
type Item struct {
Exchange string
Pair currency.Pair
Asset asset.Item
Interval Interval
Candles []Candle
Exchange string
Pair currency.Pair
Asset asset.Item
Interval Interval
Candles []Candle
SourceJobID uuid.UUID
ValidationJobID uuid.UUID
}
// Candle holds historic rate information.
type Candle struct {
Time time.Time
Open float64
High float64
Low float64
Close float64
Volume float64
Time time.Time
Open float64
High float64
Low float64
Close float64
Volume float64
ValidationIssues string
}
// ByDate allows for sorting candle entries by date

View File

@@ -736,7 +736,7 @@ func (o *OKGroup) GetHistoricCandlesExtended(pair currency.Pair, a asset.Item, s
dates.SetHasDataFromCandles(ret.Candles)
summary := dates.DataSummary(false)
if len(summary) > 0 {
log.Warnf(log.ExchangeSys, "%v - %v", o.ExchangeName, summary)
log.Warnf(log.ExchangeSys, "%v - %v", o.Base.Name, summary)
}
ret.RemoveDuplicates()
ret.RemoveOutsideRange(start, end)

View File

@@ -88,7 +88,6 @@ func (i *Item) validateRequest(ctx context.Context, r *Requester) (*http.Request
return nil, errors.New("header response is nil")
}
}
req, err := http.NewRequestWithContext(ctx, i.Method, i.Path, i.Body)
if err != nil {
return nil, err
@@ -110,7 +109,6 @@ func (r *Requester) doRequest(req *http.Request, p *Item) error {
if p == nil {
return errors.New("request item cannot be nil")
}
if p.Verbose {
log.Debugf(log.RequestSys,
"%s request path: %s",

View File

@@ -129,6 +129,9 @@ func GetTradesInRange(exchangeName, assetType, base, quote string, startDate, en
if exchangeName == "" || assetType == "" || base == "" || quote == "" || startDate.IsZero() || endDate.IsZero() {
return nil, errors.New("invalid arguments received")
}
if !database.DB.IsConnected() {
return nil, fmt.Errorf("cannot process trades in range %s-%s as %w", startDate, endDate, database.ErrDatabaseNotConnected)
}
results, err := tradesql.GetInRange(exchangeName, assetType, base, quote, startDate, endDate)
if err != nil {
return nil, err