Feature: Data history manager engine subsystem (#693)

* Adds lovely initial concept for historical data doer

* Adds ability to save tasks. Adds config. Adds startStop to engine

* Has a database microservice without use of globals! Further infrastructure design. Adds readme

* Commentary to help design

* Adds migrations for database

* readme and adds database models

* Some modelling that doesn't work end of day

* Completes datahistoryjob sql.Begins datahistoryjobresult

* Adds datahistoryjob functions to retreive job results. Adapts subsystem

* Adds process for upserting jobs and job results to the database

* Broken end of day weird sqlboiler crap

* Fixes issue with SQL generation.

* RPC generation and addition of basic upsert command

* Renames types

* Adds rpc functions

* quick commit before context swithc. Exchanges aren't being populated

* Begin the tests!

* complete sql tests. stop failed jobs. CLI command creation

* Defines rpc commands

* Fleshes out RPC implementation

* Expands testing

* Expands testing, removes double remove

* Adds coverage of data history subsystem, expands errors and nil checks

* Minor logic improvement

* streamlines datahistory test setup

* End of day minor linting

* Lint, convert simplify, rpc expansion, type expansion, readme expansion

* Documentation update

* Renames for consistency

* Completes RPC server commands

* Fixes tests

* Speeds up testing by reducing unnecessary actions. Adds maxjobspercycle config

* Comments for everything

* Adds missing result string. checks interval supported. default start end cli

* Fixes ID problem. Improves binance trade fetch. job ranges are processed

* adds dbservice coverage. adds rpcserver coverage

* docs regen, uses dbcon interface, reverts binance, fixes races, toggle manager

* Speed up tests, remove bad global usage, fix uuid check

* Adds verbose. Updates docs. Fixes postgres

* Minor changes to logging and start stop

* Fixes postgres db tests, fixes postgres column typo

* Fixes old string typo,removes constraint,error parsing for nonreaders

* prevents dhm running when table doesn't exist. Adds prereq documentation

* Adds parallel, rmlines, err fix, comment fix, minor param fixes

* doc regen, common time range check and test updating

* Fixes job validation issues. Updates candle range checker.

* Ensures test cannot fail due to time.Now() shenanigans

* Fixes oopsie, adds documentation and a warn

* Fixes another time test, adjusts copy

* Drastically speeds up data history manager tests via function overrides

* Fixes summary bug and better logs

* Fixes local time test, fixes websocket tests

* removes defaults and comment,updates error messages,sets cli command args

* Fixes FTX trade processing

* Fixes issue where jobs got stuck if data wasn't returned but retrieval was successful

* Improves test speed. Simplifies trade verification SQL. Adds command help

* Fixes the oopsies

* Fixes use of query within transaction. Fixes trade err

* oopsie, not needed

* Adds missing data status. Properly ends job even when data is missing

* errors are more verbose and so have more words to describe them

* Doc regen for new status

* tiny test tinkering

* str := string("Removes .String()").String()

* Merge fixups

* Fixes a data race discovered during github actions

* Allows websocket test to pass consistently

* Fixes merge issue preventing datahistorymanager from starting via config

* Niterinos cmd defaults and explanations

* fixes default oopsie

* Fixes lack of nil protection

* Additional oopsie

* More detailed error for validating job exchange
This commit is contained in:
Scott
2021-07-01 16:21:48 +10:00
committed by GitHub
parent c109cfb6b4
commit 197ef2df21
133 changed files with 17770 additions and 1367 deletions

View File

@@ -80,7 +80,7 @@ func Series(exchangeName, base, quote string, interval int64, asset string, star
}
}
if len(out.Candles) < 1 {
return out, fmt.Errorf(errNoCandleDataFound, exchangeName, base, quote, interval, asset)
return out, fmt.Errorf("%w: %s %s %s %v %s", ErrNoCandleDataFound, exchangeName, base, quote, interval, asset)
}
out.ExchangeID = exchangeName

View File

@@ -242,10 +242,8 @@ func TestSeries(t *testing.T) {
}
ret, err = Series("", "", "", 0, "", start, end)
if err != nil {
if !errors.Is(err, errInvalidInput) {
t.Fatal(err)
}
if !errors.Is(err, errInvalidInput) {
t.Fatal(err)
}
ret, err = Series(testExchanges[0].Name,
@@ -254,9 +252,7 @@ func TestSeries(t *testing.T) {
start, end)
if err != nil {
if !errors.Is(err, errInvalidInput) {
if err.Error() != fmt.Errorf(errNoCandleDataFound, testExchanges[0].Name,
"BTC", "MOON",
"864000", "spot").Error() {
if !errors.Is(err, ErrNoCandleDataFound) {
t.Fatal(err)
}
}

View File

@@ -5,13 +5,11 @@ import (
"time"
)
const (
errNoCandleDataFound = "no candle data found: %v %v %v %v %v"
)
var (
errInvalidInput = errors.New("exchange, base, quote, asset, interval, start & end cannot be empty")
errNoCandleData = errors.New("no candle data provided")
// ErrNoCandleDataFound returns when no candle data is found
ErrNoCandleDataFound = errors.New("no candle data found")
)
// Item generic candle holder for modelPSQL & modelSQLite

View File

@@ -0,0 +1,694 @@
package datahistoryjob
import (
"context"
"database/sql"
"fmt"
"strings"
"time"
"github.com/thrasher-corp/gocryptotrader/database"
"github.com/thrasher-corp/gocryptotrader/database/models/postgres"
"github.com/thrasher-corp/gocryptotrader/database/models/sqlite3"
"github.com/thrasher-corp/gocryptotrader/database/repository/datahistoryjobresult"
"github.com/thrasher-corp/gocryptotrader/log"
"github.com/thrasher-corp/sqlboiler/boil"
"github.com/thrasher-corp/sqlboiler/queries/qm"
)
// Setup returns a DBService
func Setup(db database.IDatabase) (*DBService, error) {
if db == nil {
return nil, database.ErrNilInstance
}
if !db.IsConnected() {
return nil, database.ErrDatabaseNotConnected
}
cfg := db.GetConfig()
dbCon, err := db.GetSQL()
if err != nil {
return nil, err
}
return &DBService{
sql: dbCon,
driver: cfg.Driver,
}, nil
}
// Upsert inserts or updates jobs into the database
func (db *DBService) Upsert(jobs ...*DataHistoryJob) error {
ctx := context.Background()
tx, err := db.sql.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("beginTx %w", err)
}
defer func() {
if err != nil {
errRB := tx.Rollback()
if errRB != nil {
log.Errorf(log.DatabaseMgr, "Insert tx.Rollback %v", errRB)
}
}
}()
switch db.driver {
case database.DBSQLite3, database.DBSQLite:
err = upsertSqlite(ctx, tx, jobs...)
case database.DBPostgreSQL:
err = upsertPostgres(ctx, tx, jobs...)
default:
return database.ErrNoDatabaseProvided
}
if err != nil {
return err
}
return tx.Commit()
}
// GetByNickName returns a job by its nickname
func (db *DBService) GetByNickName(nickname string) (*DataHistoryJob, error) {
switch db.driver {
case database.DBSQLite3, database.DBSQLite:
return db.getByNicknameSQLite(nickname)
case database.DBPostgreSQL:
return db.getByNicknamePostgres(nickname)
default:
return nil, database.ErrNoDatabaseProvided
}
}
// GetByID returns a job by its id
func (db *DBService) GetByID(id string) (*DataHistoryJob, error) {
switch db.driver {
case database.DBSQLite3, database.DBSQLite:
return db.getByIDSQLite(id)
case database.DBPostgreSQL:
return db.getByIDPostgres(id)
default:
return nil, database.ErrNoDatabaseProvided
}
}
// GetJobsBetween will return all jobs between two dates
func (db *DBService) GetJobsBetween(startDate, endDate time.Time) ([]DataHistoryJob, error) {
switch db.driver {
case database.DBSQLite3, database.DBSQLite:
return db.getJobsBetweenSQLite(startDate, endDate)
case database.DBPostgreSQL:
return db.getJobsBetweenPostgres(startDate, endDate)
default:
return nil, database.ErrNoDatabaseProvided
}
}
// GetAllIncompleteJobsAndResults returns all jobs that have the status "active"
func (db *DBService) GetAllIncompleteJobsAndResults() ([]DataHistoryJob, error) {
switch db.driver {
case database.DBSQLite3, database.DBSQLite:
return db.getAllIncompleteJobsAndResultsSQLite()
case database.DBPostgreSQL:
return db.getAllIncompleteJobsAndResultsPostgres()
default:
return nil, database.ErrNoDatabaseProvided
}
}
// GetJobAndAllResults returns a job and joins all job results
func (db *DBService) GetJobAndAllResults(nickname string) (*DataHistoryJob, error) {
switch db.driver {
case database.DBSQLite3, database.DBSQLite:
return db.getJobAndAllResultsSQLite(nickname)
case database.DBPostgreSQL:
return db.getJobAndAllResultsPostgres(nickname)
default:
return nil, database.ErrNoDatabaseProvided
}
}
func upsertSqlite(ctx context.Context, tx *sql.Tx, jobs ...*DataHistoryJob) error {
for i := range jobs {
r, err := sqlite3.Exchanges(
qm.Where("name = ?", strings.ToLower(jobs[i].ExchangeName))).One(ctx, tx)
if err != nil {
return err
}
var tempEvent = sqlite3.Datahistoryjob{
ID: jobs[i].ID,
ExchangeNameID: r.ID,
Nickname: strings.ToLower(jobs[i].Nickname),
Asset: strings.ToLower(jobs[i].Asset),
Base: strings.ToUpper(jobs[i].Base),
Quote: strings.ToUpper(jobs[i].Quote),
StartTime: jobs[i].StartDate.UTC().Format(time.RFC3339),
EndTime: jobs[i].EndDate.UTC().Format(time.RFC3339),
Interval: float64(jobs[i].Interval),
DataType: float64(jobs[i].DataType),
RequestSize: float64(jobs[i].RequestSizeLimit),
MaxRetries: float64(jobs[i].MaxRetryAttempts),
BatchCount: float64(jobs[i].BatchSize),
Status: float64(jobs[i].Status),
Created: time.Now().UTC().Format(time.RFC3339),
}
err = tempEvent.Insert(ctx, tx, boil.Infer())
if err != nil {
return err
}
}
return nil
}
func upsertPostgres(ctx context.Context, tx *sql.Tx, jobs ...*DataHistoryJob) error {
for i := range jobs {
r, err := postgres.Exchanges(
qm.Where("name = ?", strings.ToLower(jobs[i].ExchangeName))).One(ctx, tx)
if err != nil {
return err
}
var tempEvent = postgres.Datahistoryjob{
ID: jobs[i].ID,
Nickname: strings.ToLower(jobs[i].Nickname),
ExchangeNameID: r.ID,
Asset: strings.ToLower(jobs[i].Asset),
Base: strings.ToUpper(jobs[i].Base),
Quote: strings.ToUpper(jobs[i].Quote),
StartTime: jobs[i].StartDate.UTC(),
EndTime: jobs[i].EndDate.UTC(),
Interval: float64(jobs[i].Interval),
DataType: float64(jobs[i].DataType),
BatchCount: float64(jobs[i].BatchSize),
RequestSize: float64(jobs[i].RequestSizeLimit),
MaxRetries: float64(jobs[i].MaxRetryAttempts),
Status: float64(jobs[i].Status),
Created: time.Now().UTC(),
}
err = tempEvent.Upsert(ctx, tx, true, []string{"nickname"}, boil.Infer(), boil.Infer())
if err != nil {
return err
}
}
return nil
}
func (db *DBService) getByNicknameSQLite(nickname string) (*DataHistoryJob, error) {
var job *DataHistoryJob
result, err := sqlite3.Datahistoryjobs(qm.Where("nickname = ?", strings.ToLower(nickname))).One(context.Background(), db.sql)
if err != nil {
return job, err
}
exchangeResult, err := result.ExchangeName().One(context.Background(), db.sql)
if err != nil {
return job, err
}
ts, err := time.Parse(time.RFC3339, result.StartTime)
if err != nil {
return nil, err
}
te, err := time.Parse(time.RFC3339, result.EndTime)
if err != nil {
return nil, err
}
c, err := time.Parse(time.RFC3339, result.Created)
if err != nil {
return nil, err
}
job = &DataHistoryJob{
ID: result.ID,
Nickname: result.Nickname,
ExchangeID: result.ExchangeNameID,
ExchangeName: exchangeResult.Name,
Asset: result.Asset,
Base: result.Base,
Quote: result.Quote,
StartDate: ts,
EndDate: te,
Interval: int64(result.Interval),
BatchSize: int64(result.BatchCount),
RequestSizeLimit: int64(result.RequestSize),
DataType: int64(result.DataType),
MaxRetryAttempts: int64(result.MaxRetries),
Status: int64(result.Status),
CreatedDate: c,
}
return job, nil
}
func (db *DBService) getByNicknamePostgres(nickname string) (*DataHistoryJob, error) {
var job *DataHistoryJob
query := postgres.Datahistoryjobs(qm.Where("nickname = ?", strings.ToLower(nickname)))
result, err := query.One(context.Background(), db.sql)
if err != nil {
return job, err
}
exchangeResult, err := result.ExchangeName().One(context.Background(), db.sql)
if err != nil {
return job, err
}
job = &DataHistoryJob{
ID: result.ID,
Nickname: result.Nickname,
ExchangeID: result.ExchangeNameID,
ExchangeName: exchangeResult.Name,
Asset: result.Asset,
Base: result.Base,
Quote: result.Quote,
StartDate: result.StartTime,
EndDate: result.EndTime,
Interval: int64(result.Interval),
BatchSize: int64(result.BatchCount),
RequestSizeLimit: int64(result.RequestSize),
DataType: int64(result.DataType),
MaxRetryAttempts: int64(result.MaxRetries),
Status: int64(result.Status),
CreatedDate: result.Created,
}
return job, nil
}
func (db *DBService) getByIDSQLite(id string) (*DataHistoryJob, error) {
var job *DataHistoryJob
result, err := sqlite3.Datahistoryjobs(qm.Where("id = ?", id)).One(context.Background(), db.sql)
if err != nil {
return job, err
}
exchangeResult, err := result.ExchangeName().One(context.Background(), db.sql)
if err != nil {
return job, err
}
ts, err := time.Parse(time.RFC3339, result.StartTime)
if err != nil {
return nil, err
}
te, err := time.Parse(time.RFC3339, result.EndTime)
if err != nil {
return nil, err
}
c, err := time.Parse(time.RFC3339, result.Created)
if err != nil {
return nil, err
}
job = &DataHistoryJob{
ID: result.ID,
Nickname: result.Nickname,
ExchangeID: result.ExchangeNameID,
ExchangeName: exchangeResult.Name,
Asset: result.Asset,
Base: result.Base,
Quote: result.Quote,
StartDate: ts,
EndDate: te,
Interval: int64(result.Interval),
RequestSizeLimit: int64(result.RequestSize),
DataType: int64(result.DataType),
MaxRetryAttempts: int64(result.MaxRetries),
BatchSize: int64(result.BatchCount),
Status: int64(result.Status),
CreatedDate: c,
}
return job, nil
}
func (db *DBService) getByIDPostgres(id string) (*DataHistoryJob, error) {
var job *DataHistoryJob
query := postgres.Datahistoryjobs(qm.Where("id = ?", id))
result, err := query.One(context.Background(), db.sql)
if err != nil {
return job, err
}
exchangeResult, err := result.ExchangeName().One(context.Background(), db.sql)
if err != nil {
return job, err
}
job = &DataHistoryJob{
ID: result.ID,
Nickname: result.Nickname,
ExchangeID: result.ExchangeNameID,
ExchangeName: exchangeResult.Name,
Asset: result.Asset,
Base: result.Base,
Quote: result.Quote,
StartDate: result.StartTime,
EndDate: result.EndTime,
Interval: int64(result.Interval),
BatchSize: int64(result.BatchCount),
RequestSizeLimit: int64(result.RequestSize),
DataType: int64(result.DataType),
MaxRetryAttempts: int64(result.MaxRetries),
Status: int64(result.Status),
CreatedDate: result.Created,
}
return job, nil
}
func (db *DBService) getJobsBetweenSQLite(startDate, endDate time.Time) ([]DataHistoryJob, error) {
var jobs []DataHistoryJob
query := sqlite3.Datahistoryjobs(qm.Where("created BETWEEN ? AND ? ", startDate.UTC().Format(time.RFC3339), endDate.UTC().Format(time.RFC3339)))
results, err := query.All(context.Background(), db.sql)
if err != nil {
return jobs, err
}
for i := range results {
exchangeResult, err := results[i].ExchangeName(qm.Where("id = ?", results[i].ExchangeNameID)).One(context.Background(), db.sql)
if err != nil {
return nil, err
}
ts, err := time.Parse(time.RFC3339, results[i].StartTime)
if err != nil {
return nil, err
}
te, err := time.Parse(time.RFC3339, results[i].EndTime)
if err != nil {
return nil, err
}
c, err := time.Parse(time.RFC3339, results[i].Created)
if err != nil {
return nil, err
}
jobs = append(jobs, DataHistoryJob{
ID: results[i].ID,
Nickname: results[i].Nickname,
ExchangeID: results[i].ExchangeNameID,
ExchangeName: exchangeResult.Name,
Asset: results[i].Asset,
Base: results[i].Base,
Quote: results[i].Quote,
StartDate: ts,
EndDate: te,
Interval: int64(results[i].Interval),
RequestSizeLimit: int64(results[i].RequestSize),
BatchSize: int64(results[i].BatchCount),
DataType: int64(results[i].DataType),
MaxRetryAttempts: int64(results[i].MaxRetries),
Status: int64(results[i].Status),
CreatedDate: c,
})
}
return jobs, nil
}
func (db *DBService) getJobsBetweenPostgres(startDate, endDate time.Time) ([]DataHistoryJob, error) {
var jobs []DataHistoryJob
query := postgres.Datahistoryjobs(qm.Where("created BETWEEN ? AND ? ", startDate, endDate))
results, err := query.All(context.Background(), db.sql)
if err != nil {
return jobs, err
}
for i := range results {
exchangeResult, err := results[i].ExchangeName(qm.Where("id = ?", results[i].ExchangeNameID)).One(context.Background(), db.sql)
if err != nil {
return nil, err
}
jobs = append(jobs, DataHistoryJob{
ID: results[i].ID,
Nickname: results[i].Nickname,
ExchangeID: results[i].ExchangeNameID,
ExchangeName: exchangeResult.Name,
Asset: results[i].Asset,
Base: results[i].Base,
Quote: results[i].Quote,
StartDate: results[i].StartTime,
EndDate: results[i].EndTime,
Interval: int64(results[i].Interval),
BatchSize: int64(results[i].BatchCount),
RequestSizeLimit: int64(results[i].RequestSize),
DataType: int64(results[i].DataType),
MaxRetryAttempts: int64(results[i].MaxRetries),
Status: int64(results[i].Status),
CreatedDate: results[i].Created,
})
}
return jobs, nil
}
func (db *DBService) getJobAndAllResultsSQLite(nickname string) (*DataHistoryJob, error) {
var job *DataHistoryJob
query := sqlite3.Datahistoryjobs(
qm.Load(sqlite3.DatahistoryjobRels.JobDatahistoryjobresults),
qm.Load(sqlite3.DatahistoryjobRels.ExchangeName),
qm.Where("nickname = ?", strings.ToLower(nickname)))
result, err := query.One(context.Background(), db.sql)
if err != nil {
return nil, err
}
var jobResults []*datahistoryjobresult.DataHistoryJobResult
for i := range result.R.JobDatahistoryjobresults {
var start, end, run time.Time
start, err = time.Parse(time.RFC3339, result.R.JobDatahistoryjobresults[i].IntervalStartTime)
if err != nil {
return nil, err
}
end, err = time.Parse(time.RFC3339, result.R.JobDatahistoryjobresults[i].IntervalEndTime)
if err != nil {
return nil, err
}
run, err = time.Parse(time.RFC3339, result.R.JobDatahistoryjobresults[i].RunTime)
if err != nil {
return nil, err
}
jobResults = append(jobResults, &datahistoryjobresult.DataHistoryJobResult{
ID: result.R.JobDatahistoryjobresults[i].ID,
JobID: result.R.JobDatahistoryjobresults[i].JobID,
IntervalStartDate: start,
IntervalEndDate: end,
Status: int64(result.R.JobDatahistoryjobresults[i].Status),
Result: result.R.JobDatahistoryjobresults[i].Result.String,
Date: run,
})
}
start, err := time.Parse(time.RFC3339, result.StartTime)
if err != nil {
return nil, err
}
end, err := time.Parse(time.RFC3339, result.EndTime)
if err != nil {
return nil, err
}
created, err := time.Parse(time.RFC3339, result.Created)
if err != nil {
return nil, err
}
job = &DataHistoryJob{
ID: result.ID,
Nickname: result.Nickname,
ExchangeID: result.ExchangeNameID,
ExchangeName: result.R.ExchangeName.Name,
Asset: result.Asset,
Base: result.Base,
Quote: result.Quote,
StartDate: start,
EndDate: end,
Interval: int64(result.Interval),
BatchSize: int64(result.BatchCount),
RequestSizeLimit: int64(result.RequestSize),
DataType: int64(result.DataType),
MaxRetryAttempts: int64(result.MaxRetries),
Status: int64(result.Status),
CreatedDate: created,
Results: jobResults,
}
return job, nil
}
func (db *DBService) getJobAndAllResultsPostgres(nickname string) (*DataHistoryJob, error) {
var job *DataHistoryJob
query := postgres.Datahistoryjobs(
qm.Load(postgres.DatahistoryjobRels.ExchangeName),
qm.Load(postgres.DatahistoryjobRels.JobDatahistoryjobresults),
qm.Where("nickname = ?", strings.ToLower(nickname)))
result, err := query.One(context.Background(), db.sql)
if err != nil {
return job, err
}
var jobResults []*datahistoryjobresult.DataHistoryJobResult
for i := range result.R.JobDatahistoryjobresults {
jobResults = append(jobResults, &datahistoryjobresult.DataHistoryJobResult{
ID: result.R.JobDatahistoryjobresults[i].ID,
JobID: result.R.JobDatahistoryjobresults[i].JobID,
IntervalStartDate: result.R.JobDatahistoryjobresults[i].IntervalStartTime,
IntervalEndDate: result.R.JobDatahistoryjobresults[i].IntervalEndTime,
Status: int64(result.R.JobDatahistoryjobresults[i].Status),
Result: result.R.JobDatahistoryjobresults[i].Result.String,
Date: result.R.JobDatahistoryjobresults[i].RunTime,
})
}
job = &DataHistoryJob{
ID: result.ID,
Nickname: result.Nickname,
ExchangeID: result.ExchangeNameID,
ExchangeName: result.R.ExchangeName.Name,
Asset: result.Asset,
Base: result.Base,
Quote: result.Quote,
StartDate: result.StartTime,
EndDate: result.EndTime,
Interval: int64(result.Interval),
BatchSize: int64(result.BatchCount),
RequestSizeLimit: int64(result.RequestSize),
DataType: int64(result.DataType),
MaxRetryAttempts: int64(result.MaxRetries),
Status: int64(result.Status),
CreatedDate: result.Created,
Results: jobResults,
}
return job, nil
}
func (db *DBService) getAllIncompleteJobsAndResultsSQLite() ([]DataHistoryJob, error) {
var jobs []DataHistoryJob
query := sqlite3.Datahistoryjobs(
qm.Load(sqlite3.DatahistoryjobRels.ExchangeName),
qm.Load(sqlite3.DatahistoryjobRels.JobDatahistoryjobresults),
qm.Where("status = ?", 0))
results, err := query.All(context.Background(), db.sql)
if err != nil {
return jobs, err
}
for i := range results {
var jobResults []*datahistoryjobresult.DataHistoryJobResult
for j := range results[i].R.JobDatahistoryjobresults {
var start, end, run time.Time
start, err = time.Parse(time.RFC3339, results[i].R.JobDatahistoryjobresults[j].IntervalStartTime)
if err != nil {
return nil, err
}
end, err = time.Parse(time.RFC3339, results[i].R.JobDatahistoryjobresults[j].IntervalEndTime)
if err != nil {
return nil, err
}
run, err = time.Parse(time.RFC3339, results[i].R.JobDatahistoryjobresults[j].RunTime)
if err != nil {
return nil, err
}
jobResults = append(jobResults, &datahistoryjobresult.DataHistoryJobResult{
ID: results[i].R.JobDatahistoryjobresults[j].ID,
JobID: results[i].R.JobDatahistoryjobresults[j].JobID,
IntervalStartDate: start,
IntervalEndDate: end,
Status: int64(results[i].R.JobDatahistoryjobresults[j].Status),
Result: results[i].R.JobDatahistoryjobresults[j].Result.String,
Date: run,
})
}
start, err := time.Parse(time.RFC3339, results[i].StartTime)
if err != nil {
return nil, err
}
end, err := time.Parse(time.RFC3339, results[i].EndTime)
if err != nil {
return nil, err
}
created, err := time.Parse(time.RFC3339, results[i].Created)
if err != nil {
return nil, err
}
jobs = append(jobs, DataHistoryJob{
ID: results[i].ID,
Nickname: results[i].Nickname,
ExchangeID: results[i].ExchangeNameID,
ExchangeName: results[i].R.ExchangeName.Name,
Asset: results[i].Asset,
Base: results[i].Base,
Quote: results[i].Quote,
StartDate: start,
EndDate: end,
Interval: int64(results[i].Interval),
BatchSize: int64(results[i].BatchCount),
RequestSizeLimit: int64(results[i].RequestSize),
DataType: int64(results[i].DataType),
MaxRetryAttempts: int64(results[i].MaxRetries),
Status: int64(results[i].Status),
CreatedDate: created,
Results: jobResults,
})
}
return jobs, nil
}
func (db *DBService) getAllIncompleteJobsAndResultsPostgres() ([]DataHistoryJob, error) {
var jobs []DataHistoryJob
query := postgres.Datahistoryjobs(
qm.Load(postgres.DatahistoryjobRels.ExchangeName),
qm.Load(postgres.DatahistoryjobRels.JobDatahistoryjobresults),
qm.Where("status = ?", 0))
results, err := query.All(context.Background(), db.sql)
if err != nil {
return jobs, err
}
for i := range results {
var jobResults []*datahistoryjobresult.DataHistoryJobResult
for j := range results[i].R.JobDatahistoryjobresults {
jobResults = append(jobResults, &datahistoryjobresult.DataHistoryJobResult{
ID: results[i].R.JobDatahistoryjobresults[j].ID,
JobID: results[i].R.JobDatahistoryjobresults[j].JobID,
IntervalStartDate: results[i].R.JobDatahistoryjobresults[j].IntervalStartTime,
IntervalEndDate: results[i].R.JobDatahistoryjobresults[j].IntervalEndTime,
Status: int64(results[i].R.JobDatahistoryjobresults[j].Status),
Result: results[i].R.JobDatahistoryjobresults[j].Result.String,
Date: results[i].R.JobDatahistoryjobresults[j].RunTime,
})
}
jobs = append(jobs, DataHistoryJob{
ID: results[i].ID,
Nickname: results[i].Nickname,
ExchangeID: results[i].ExchangeNameID,
ExchangeName: results[i].R.ExchangeName.Name,
Asset: results[i].Asset,
Base: results[i].Base,
Quote: results[i].Quote,
StartDate: results[i].StartTime,
EndDate: results[i].EndTime,
Interval: int64(results[i].Interval),
BatchSize: int64(results[i].BatchCount),
RequestSizeLimit: int64(results[i].RequestSize),
DataType: int64(results[i].DataType),
MaxRetryAttempts: int64(results[i].MaxRetries),
Status: int64(results[i].Status),
CreatedDate: results[i].Created,
Results: jobResults,
})
}
return jobs, nil
}

View File

@@ -0,0 +1,212 @@
package datahistoryjob
import (
"fmt"
"io/ioutil"
"log"
"os"
"strings"
"testing"
"time"
"github.com/gofrs/uuid"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/database"
"github.com/thrasher-corp/gocryptotrader/database/drivers"
"github.com/thrasher-corp/gocryptotrader/database/repository/exchange"
"github.com/thrasher-corp/gocryptotrader/database/testhelpers"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
)
var (
verbose = false
testExchanges = []exchange.Details{
{
Name: "one",
},
{
Name: "two",
},
}
)
func TestMain(m *testing.M) {
if verbose {
testhelpers.EnableVerboseTestOutput()
}
var err error
testhelpers.PostgresTestDatabase = testhelpers.GetConnectionDetails()
testhelpers.TempDir, err = ioutil.TempDir("", "gct-temp")
if err != nil {
log.Fatal(err)
}
t := m.Run()
err = os.RemoveAll(testhelpers.TempDir)
if err != nil {
fmt.Printf("Failed to remove temp db file: %v", err)
}
os.Exit(t)
}
func seedDB() error {
err := exchange.InsertMany(testExchanges)
if err != nil {
return err
}
for i := range testExchanges {
lol, err := exchange.One(testExchanges[i].Name)
if err != nil {
return err
}
testExchanges[i].UUID = lol.UUID
}
return nil
}
func TestDataHistoryJob(t *testing.T) {
testCases := []struct {
name string
config *database.Config
seedDB func() error
runner func(t *testing.T)
closer func(dbConn *database.Instance) error
}{
{
name: "postgresql",
config: testhelpers.PostgresTestDatabase,
seedDB: seedDB,
},
{
name: "SQLite",
config: &database.Config{
Driver: database.DBSQLite3,
ConnectionDetails: drivers.ConnectionDetails{Database: "./testdb"},
},
seedDB: seedDB,
},
}
for x := range testCases {
test := testCases[x]
t.Run(test.name, func(t *testing.T) {
if !testhelpers.CheckValidConfig(&test.config.ConnectionDetails) {
t.Skip("database not configured skipping test")
}
dbConn, err := testhelpers.ConnectToDatabase(test.config)
if err != nil {
t.Fatal(err)
}
if test.seedDB != nil {
err = test.seedDB()
if err != nil {
t.Error(err)
}
}
db, err := Setup(dbConn)
if err != nil {
log.Fatal(err)
}
var jerberinos, jerberoos []*DataHistoryJob
for i := 0; i < 20; i++ {
uu, _ := uuid.NewV4()
jerberinos = append(jerberinos, &DataHistoryJob{
ID: uu.String(),
Nickname: fmt.Sprintf("TestDataHistoryJob%v", i),
ExchangeID: testExchanges[0].UUID.String(),
ExchangeName: testExchanges[0].Name,
Asset: asset.Spot.String(),
Base: currency.BTC.String(),
Quote: currency.USD.String(),
StartDate: time.Now().Add(time.Duration(i) * time.Second),
EndDate: time.Now().Add(time.Minute * time.Duration(i)),
Interval: int64(i),
})
}
err = db.Upsert(jerberinos...)
if err != nil {
t.Fatal(err)
}
// insert the same jerbs to test conflict resolution
for i := 0; i < 20; i++ {
uu, _ := uuid.NewV4()
j := &DataHistoryJob{
ID: uu.String(),
Nickname: fmt.Sprintf("TestDataHistoryJob%v", i),
ExchangeID: testExchanges[0].UUID.String(),
ExchangeName: testExchanges[0].Name,
Asset: asset.Spot.String(),
Base: currency.BTC.String(),
Quote: currency.USD.String(),
StartDate: time.Now().Add(time.Duration(i) * time.Second),
EndDate: time.Now().Add(time.Minute * time.Duration(i)),
Interval: int64(i),
}
if i == 19 {
j.Status = 1
}
jerberoos = append(jerberoos, j)
}
err = db.Upsert(jerberoos...)
if err != nil {
t.Fatal(err)
}
_, err = db.GetJobsBetween(time.Now(), time.Now().Add(time.Hour))
if err != nil {
t.Fatal(err)
}
resp, err := db.GetByNickName("TestDataHistoryJob19")
if err != nil {
t.Fatal(err)
}
if !strings.EqualFold(resp.Nickname, "TestDataHistoryJob19") {
t.Fatal("the database no longer functions")
}
results, err := db.GetAllIncompleteJobsAndResults()
if err != nil {
t.Error(err)
}
if len(results) != 19 {
t.Errorf("expected 19, received %v", len(results))
}
jerb, err := db.getJobAndAllResultsPostgres(jerberoos[0].Nickname)
if err != nil {
t.Fatal(err)
}
if !strings.EqualFold(jerb.Nickname, jerberoos[0].Nickname) {
t.Errorf("expected %v, received %v", jerb.Nickname, jerberoos[0].Nickname)
}
results, err = db.GetJobsBetween(time.Now().Add(-time.Hour), time.Now())
if err != nil {
t.Error(err)
}
if len(results) != 20 {
t.Errorf("expected 20, received %v", len(results))
}
jerb, err = db.GetJobAndAllResults(jerberoos[0].Nickname)
if err != nil {
t.Error(err)
}
if !strings.EqualFold(jerb.Nickname, jerberoos[0].Nickname) {
t.Errorf("expected %v, received %v", jerb.Nickname, jerberoos[0].Nickname)
}
err = testhelpers.CloseDatabase(dbConn)
if err != nil {
t.Error(err)
}
})
}
}

View File

@@ -0,0 +1,47 @@
package datahistoryjob
import (
"time"
"github.com/thrasher-corp/gocryptotrader/database"
"github.com/thrasher-corp/gocryptotrader/database/repository/datahistoryjobresult"
)
// DataHistoryJob is a DTO for database data
type DataHistoryJob struct {
ID string
Nickname string
ExchangeID string
ExchangeName string
Asset string
Base string
Quote string
StartDate time.Time
EndDate time.Time
Interval int64
RequestSizeLimit int64
DataType int64
MaxRetryAttempts int64
BatchSize int64
Status int64
CreatedDate time.Time
Results []*datahistoryjobresult.DataHistoryJobResult
}
// DBService is a service which allows the interaction with
// the database without a direct reference to a global
type DBService struct {
sql database.ISQL
driver string
}
// IDBService allows using data history job database service
// without needing to care about implementation
type IDBService interface {
Upsert(jobs ...*DataHistoryJob) error
GetByNickName(nickname string) (*DataHistoryJob, error)
GetByID(id string) (*DataHistoryJob, error)
GetJobsBetween(startDate, endDate time.Time) ([]DataHistoryJob, error)
GetAllIncompleteJobsAndResults() ([]DataHistoryJob, error)
GetJobAndAllResults(nickname string) (*DataHistoryJob, error)
}

View File

@@ -0,0 +1,280 @@
package datahistoryjobresult
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/gofrs/uuid"
"github.com/thrasher-corp/gocryptotrader/database"
"github.com/thrasher-corp/gocryptotrader/database/models/postgres"
"github.com/thrasher-corp/gocryptotrader/database/models/sqlite3"
"github.com/thrasher-corp/gocryptotrader/log"
"github.com/thrasher-corp/sqlboiler/boil"
"github.com/thrasher-corp/sqlboiler/queries/qm"
"github.com/volatiletech/null"
)
// Setup returns a DBService
func Setup(db database.IDatabase) (*DBService, error) {
if db == nil {
return nil, nil
}
if !db.IsConnected() {
return nil, nil
}
cfg := db.GetConfig()
dbCon, err := db.GetSQL()
if err != nil {
return nil, err
}
return &DBService{
sql: dbCon,
driver: cfg.Driver,
}, nil
}
// Upsert inserts or updates jobs into the database
func (db *DBService) Upsert(jobs ...*DataHistoryJobResult) error {
if len(jobs) == 0 {
return nil
}
ctx := context.Background()
tx, err := db.sql.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("beginTx %w", err)
}
defer func() {
if err != nil {
errRB := tx.Rollback()
if errRB != nil {
log.Errorf(log.DatabaseMgr, "Insert tx.Rollback %v", errRB)
}
}
}()
switch db.driver {
case database.DBSQLite3, database.DBSQLite:
err = upsertSqlite(ctx, tx, jobs...)
case database.DBPostgreSQL:
err = upsertPostgres(ctx, tx, jobs...)
default:
return database.ErrNoDatabaseProvided
}
if err != nil {
return err
}
return tx.Commit()
}
// GetByJobID returns a job by its related JobID
func (db *DBService) GetByJobID(jobID string) ([]DataHistoryJobResult, error) {
var err error
var job []DataHistoryJobResult
switch db.driver {
case database.DBSQLite3, database.DBSQLite:
job, err = db.getByJobIDSQLite(jobID)
case database.DBPostgreSQL:
job, err = db.getByJobIDPostgres(jobID)
default:
return nil, database.ErrNoDatabaseProvided
}
if err != nil {
return nil, err
}
return job, nil
}
// GetJobResultsBetween will return all jobs between two dates
func (db *DBService) GetJobResultsBetween(jobID string, startDate, endDate time.Time) ([]DataHistoryJobResult, error) {
var err error
var jobs []DataHistoryJobResult
switch db.driver {
case database.DBSQLite3, database.DBSQLite:
jobs, err = db.getJobResultsBetweenSQLite(jobID, startDate, endDate)
case database.DBPostgreSQL:
jobs, err = db.getJobResultsBetweenPostgres(jobID, startDate, endDate)
default:
return nil, database.ErrNoDatabaseProvided
}
if err != nil {
return nil, err
}
return jobs, nil
}
func upsertSqlite(ctx context.Context, tx *sql.Tx, results ...*DataHistoryJobResult) error {
for i := range results {
if results[i].ID == "" {
freshUUID, err := uuid.NewV4()
if err != nil {
return err
}
results[i].ID = freshUUID.String()
}
var tempEvent = sqlite3.Datahistoryjobresult{
ID: results[i].ID,
JobID: results[i].JobID,
Result: null.NewString(results[i].Result, results[i].Result != ""),
Status: float64(results[i].Status),
IntervalStartTime: results[i].IntervalStartDate.UTC().Format(time.RFC3339),
IntervalEndTime: results[i].IntervalEndDate.UTC().Format(time.RFC3339),
RunTime: results[i].Date.UTC().Format(time.RFC3339),
}
err := tempEvent.Insert(ctx, tx, boil.Infer())
if err != nil {
return err
}
}
return nil
}
func upsertPostgres(ctx context.Context, tx *sql.Tx, results ...*DataHistoryJobResult) error {
var err error
for i := range results {
if results[i].ID == "" {
var freshUUID uuid.UUID
freshUUID, err = uuid.NewV4()
if err != nil {
return err
}
results[i].ID = freshUUID.String()
}
var tempEvent = postgres.Datahistoryjobresult{
ID: results[i].ID,
JobID: results[i].JobID,
Result: null.NewString(results[i].Result, results[i].Result != ""),
Status: float64(results[i].Status),
IntervalStartTime: results[i].IntervalStartDate.UTC(),
IntervalEndTime: results[i].IntervalEndDate.UTC(),
RunTime: results[i].Date.UTC(),
}
err = tempEvent.Upsert(ctx, tx, true, nil, boil.Infer(), boil.Infer())
if err != nil {
return err
}
}
return nil
}
func (db *DBService) getByJobIDSQLite(jobID string) ([]DataHistoryJobResult, error) {
query := sqlite3.Datahistoryjobresults(qm.Where("job_id = ?", jobID))
results, err := query.All(context.Background(), db.sql)
if err != nil {
return nil, err
}
var resp []DataHistoryJobResult
for i := range results {
var start, end, run time.Time
start, err = time.Parse(time.RFC3339, results[i].IntervalStartTime)
if err != nil {
return nil, err
}
end, err = time.Parse(time.RFC3339, results[i].IntervalEndTime)
if err != nil {
return nil, err
}
run, err = time.Parse(time.RFC3339, results[i].RunTime)
if err != nil {
return nil, err
}
resp = append(resp, DataHistoryJobResult{
ID: results[i].ID,
JobID: results[i].JobID,
IntervalStartDate: start,
IntervalEndDate: end,
Status: int64(results[i].Status),
Result: results[i].Result.String,
Date: run,
})
}
return resp, nil
}
func (db *DBService) getByJobIDPostgres(jobID string) ([]DataHistoryJobResult, error) {
query := postgres.Datahistoryjobresults(qm.Where("job_id = ?", jobID))
results, err := query.All(context.Background(), db.sql)
if err != nil {
return nil, err
}
var resp []DataHistoryJobResult
for i := range results {
resp = append(resp, DataHistoryJobResult{
ID: results[i].ID,
JobID: results[i].JobID,
IntervalStartDate: results[i].IntervalStartTime,
IntervalEndDate: results[i].IntervalEndTime,
Status: int64(results[i].Status),
Result: results[i].Result.String,
Date: results[i].RunTime,
})
}
return resp, nil
}
func (db *DBService) getJobResultsBetweenSQLite(jobID string, startDate, endDate time.Time) ([]DataHistoryJobResult, error) {
var results []DataHistoryJobResult
query := sqlite3.Datahistoryjobresults(qm.Where("job_id = ? AND run_time BETWEEN ? AND ? ", jobID, startDate.UTC().Format(time.RFC3339), endDate.UTC().Format(time.RFC3339)))
resp, err := query.All(context.Background(), db.sql)
if err != nil {
return results, err
}
for i := range resp {
var start, end, run time.Time
start, err = time.Parse(time.RFC3339, resp[i].IntervalStartTime)
if err != nil {
return nil, err
}
end, err = time.Parse(time.RFC3339, resp[i].IntervalEndTime)
if err != nil {
return nil, err
}
run, err = time.Parse(time.RFC3339, resp[i].RunTime)
if err != nil {
return nil, err
}
results = append(results, DataHistoryJobResult{
ID: resp[i].ID,
JobID: resp[i].JobID,
IntervalStartDate: start,
IntervalEndDate: end,
Status: int64(resp[i].Status),
Result: resp[i].Result.String,
Date: run,
})
}
return results, nil
}
func (db *DBService) getJobResultsBetweenPostgres(jobID string, startDate, endDate time.Time) ([]DataHistoryJobResult, error) {
var jobs []DataHistoryJobResult
query := postgres.Datahistoryjobresults(qm.Where("job_id = ? AND run_time BETWEEN ? AND ? ", jobID, startDate, endDate))
results, err := query.All(context.Background(), db.sql)
if err != nil {
return jobs, err
}
for i := range results {
jobs = append(jobs, DataHistoryJobResult{
ID: results[i].ID,
JobID: results[i].JobID,
IntervalStartDate: results[i].IntervalStartTime,
IntervalEndDate: results[i].IntervalEndTime,
Status: int64(results[i].Status),
Result: results[i].Result.String,
Date: results[i].RunTime,
})
}
return jobs, nil
}

View File

@@ -0,0 +1,200 @@
package datahistoryjobresult
import (
"database/sql"
"fmt"
"io/ioutil"
"log"
"os"
"testing"
"time"
"github.com/gofrs/uuid"
"github.com/thrasher-corp/gocryptotrader/database"
"github.com/thrasher-corp/gocryptotrader/database/drivers"
"github.com/thrasher-corp/gocryptotrader/database/repository/exchange"
"github.com/thrasher-corp/gocryptotrader/database/testhelpers"
)
var (
verbose = false
testExchanges = []exchange.Details{
{
Name: "one",
},
{
Name: "two",
},
}
)
func TestMain(m *testing.M) {
if verbose {
testhelpers.EnableVerboseTestOutput()
}
var err error
testhelpers.PostgresTestDatabase = testhelpers.GetConnectionDetails()
testhelpers.TempDir, err = ioutil.TempDir("", "gct-temp")
if err != nil {
log.Fatal(err)
}
t := m.Run()
err = os.RemoveAll(testhelpers.TempDir)
if err != nil {
fmt.Printf("Failed to remove temp db file: %v", err)
}
os.Exit(t)
}
func seedDB() error {
err := exchange.InsertMany(testExchanges)
if err != nil {
return err
}
for i := range testExchanges {
lol, err := exchange.One(testExchanges[i].Name)
if err != nil {
return err
}
testExchanges[i].UUID = lol.UUID
}
return nil
}
func TestDataHistoryJob(t *testing.T) {
testCases := []struct {
name string
config *database.Config
seedDB func() error
runner func(t *testing.T)
closer func(dbConn *database.Instance) error
}{
{
name: "postgresql",
config: testhelpers.PostgresTestDatabase,
seedDB: seedDB,
},
{
name: "SQLite",
config: &database.Config{
Driver: database.DBSQLite3,
ConnectionDetails: drivers.ConnectionDetails{Database: "./testdb"},
},
seedDB: seedDB,
},
}
for x := range testCases {
test := testCases[x]
t.Run(test.name, func(t *testing.T) {
if !testhelpers.CheckValidConfig(&test.config.ConnectionDetails) {
t.Skip("database not configured skipping test")
}
dbConn, err := testhelpers.ConnectToDatabase(test.config)
if err != nil {
t.Fatal(err)
}
if test.seedDB != nil {
err = test.seedDB()
if err != nil {
t.Error(err)
}
}
db, err := Setup(dbConn)
if err != nil {
t.Fatal(err)
}
// postgres requires job for tests to function
var id string
if test.name == "postgresql" {
var selectID *sql.Rows
selectID, err = db.sql.Query("select id from datahistoryjob where nickname = 'testdatahistoryjob1'")
if err != nil {
t.Fatal(err)
}
defer func() {
err = selectID.Close()
if err != nil {
t.Fatal(err)
}
if selectID.Err() != nil {
t.Fatal(selectID.Err())
}
}()
selectID.Next()
err = selectID.Scan(&id)
if err != nil {
t.Error(err)
}
}
var resulterinos, resultaroos []*DataHistoryJobResult
for i := 0; i < 20; i++ {
uu, _ := uuid.NewV4()
resulterinos = append(resulterinos, &DataHistoryJobResult{
ID: uu.String(),
JobID: id,
IntervalStartDate: time.Now(),
IntervalEndDate: time.Now().Add(time.Second),
Status: 0,
Result: "Yay",
Date: time.Now(),
})
}
err = db.Upsert(resulterinos...)
if err != nil {
t.Fatal(err)
}
// insert the same results to test conflict resolution
for i := 0; i < 20; i++ {
uu, _ := uuid.NewV4()
j := &DataHistoryJobResult{
ID: uu.String(),
JobID: id,
IntervalStartDate: time.Now(),
IntervalEndDate: time.Now().Add(time.Second),
Status: 0,
Result: "Wow",
Date: time.Now(),
}
if i == 19 {
j.Status = 1
j.Date = time.Now().Add(time.Hour * 24)
}
resultaroos = append(resultaroos, j)
}
err = db.Upsert(resultaroos...)
if err != nil {
t.Fatal(err)
}
results, err := db.GetByJobID(id)
if err != nil {
t.Fatal(err)
}
if len(results) == 0 {
t.Error("expected job results")
}
results, err = db.GetJobResultsBetween(id, time.Now().Add(time.Hour*23), time.Now().Add(time.Hour*25))
if err != nil {
t.Fatal(err)
}
if len(results) == 0 {
t.Errorf("expected job result, received %v", len(results))
}
err = testhelpers.CloseDatabase(dbConn)
if err != nil {
t.Error(err)
}
})
}
}

View File

@@ -0,0 +1,33 @@
package datahistoryjobresult
import (
"time"
"github.com/thrasher-corp/gocryptotrader/database"
)
// DataHistoryJobResult is a DTO for database data
type DataHistoryJobResult struct {
ID string
JobID string
IntervalStartDate time.Time
IntervalEndDate time.Time
Status int64
Result string
Date time.Time
}
// DBService is a service which allows the interaction with
// the database without a direct reference to a global
type DBService struct {
sql database.ISQL
driver string
}
// IDBService allows using data history job result database service
// without needing to care about implementation
type IDBService interface {
Upsert(jobs ...*DataHistoryJobResult) error
GetByJobID(jobID string) ([]DataHistoryJobResult, error)
GetJobResultsBetween(jobID string, startDate, endDate time.Time) ([]DataHistoryJobResult, error)
}

View File

@@ -8,7 +8,7 @@ import (
)
var (
exchangeCache = cache.New(10)
exchangeCache = cache.New(30)
// ErrNoExchangeFound is a basic predefined error
ErrNoExchangeFound = errors.New("exchange not found")
)

View File

@@ -32,9 +32,10 @@ func TestGetSQLDialect(t *testing.T) {
test := testCases[x]
t.Run(test.driver, func(t *testing.T) {
err := database.DB.SetConfig(&database.Config{
cfg := &database.Config{
Driver: test.driver,
})
}
err := database.DB.SetConfig(cfg)
if err != nil {
t.Error(err)
}

View File

@@ -10,10 +10,11 @@ import (
"github.com/gofrs/uuid"
"github.com/thrasher-corp/gocryptotrader/database"
modelPSQL "github.com/thrasher-corp/gocryptotrader/database/models/postgres"
modelSQLite "github.com/thrasher-corp/gocryptotrader/database/models/sqlite3"
"github.com/thrasher-corp/gocryptotrader/database/models/postgres"
"github.com/thrasher-corp/gocryptotrader/database/models/sqlite3"
"github.com/thrasher-corp/gocryptotrader/database/repository"
"github.com/thrasher-corp/gocryptotrader/database/repository/exchange"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/log"
"github.com/thrasher-corp/sqlboiler/boil"
"github.com/thrasher-corp/sqlboiler/queries/qm"
@@ -61,6 +62,89 @@ func Insert(trades ...Data) error {
return tx.Commit()
}
// VerifyTradeInIntervals will query for ONE trade within each kline interval and verify if data exists
// if it does, it will set the range holder property "HasData" to true
func VerifyTradeInIntervals(exchangeName, assetType, base, quote string, irh *kline.IntervalRangeHolder) error {
ctx := context.Background()
ctx = boil.SkipTimestamps(ctx)
tx, err := database.DB.SQL.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("beginTx %w", err)
}
defer func() {
if err != nil {
errRB := tx.Rollback()
if errRB != nil {
log.Errorf(log.DatabaseMgr, "Insert tx.Rollback %v", errRB)
}
}
}()
if repository.GetSQLDialect() == database.DBSQLite3 || repository.GetSQLDialect() == database.DBSQLite {
err = verifyTradeInIntervalsSqlite(ctx, tx, exchangeName, assetType, base, quote, irh)
} else {
err = verifyTradeInIntervalsPostgres(ctx, tx, exchangeName, assetType, base, quote, irh)
}
if err != nil {
return err
}
return tx.Commit()
}
func verifyTradeInIntervalsSqlite(ctx context.Context, tx *sql.Tx, exchangeName, assetType, base, quote string, irh *kline.IntervalRangeHolder) error {
exch, err := sqlite3.Exchanges(qm.Where("name = ?", exchangeName)).One(ctx, tx)
if err != nil {
return err
}
for i := range irh.Ranges {
for j := range irh.Ranges[i].Intervals {
result, err := sqlite3.Trades(qm.Where("exchange_name_id = ? AND asset = ? AND base = ? AND quote = ? AND timestamp between ? AND ?",
exch.ID,
assetType,
base,
quote,
irh.Ranges[i].Intervals[j].Start.Time.UTC().Format(time.RFC3339),
irh.Ranges[i].Intervals[j].End.Time.UTC().Format(time.RFC3339))).One(ctx, tx)
if err != nil {
return err
}
if result != nil {
irh.Ranges[i].Intervals[j].HasData = true
}
}
}
return nil
}
func verifyTradeInIntervalsPostgres(ctx context.Context, tx *sql.Tx, exchangeName, assetType, base, quote string, irh *kline.IntervalRangeHolder) error {
exch, err := postgres.Exchanges(qm.Where("name = ?", exchangeName)).One(ctx, tx)
if err != nil {
return err
}
for i := range irh.Ranges {
for j := range irh.Ranges[i].Intervals {
result, err := postgres.Trades(qm.Where("exchange_name_id = ? AND asset = ? AND base = ? AND quote = ? timestamp between ? AND ?",
exch.ID,
assetType,
base,
quote,
irh.Ranges[i].Intervals[j].Start.Time.UTC().Format(time.RFC3339),
irh.Ranges[i].Intervals[j].End.Time.UTC().Format(time.RFC3339))).One(ctx, tx)
if err != nil {
return err
}
if result != nil {
irh.Ranges[i].Intervals[j].HasData = true
}
}
}
return nil
}
func insertSQLite(ctx context.Context, tx *sql.Tx, trades ...Data) error {
for i := range trades {
if trades[i].ID == "" {
@@ -70,7 +154,7 @@ func insertSQLite(ctx context.Context, tx *sql.Tx, trades ...Data) error {
}
trades[i].ID = freshUUID.String()
}
var tempEvent = modelSQLite.Trade{
var tempEvent = sqlite3.Trade{
ID: trades[i].ID,
ExchangeNameID: trades[i].ExchangeNameID,
Base: strings.ToUpper(trades[i].Base),
@@ -106,7 +190,7 @@ func insertPostgres(ctx context.Context, tx *sql.Tx, trades ...Data) error {
}
trades[i].ID = freshUUID.String()
}
var tempEvent = modelPSQL.Trade{
var tempEvent = postgres.Trade{
ExchangeNameID: trades[i].ExchangeNameID,
Base: strings.ToUpper(trades[i].Base),
Quote: strings.ToUpper(trades[i].Quote),
@@ -152,7 +236,7 @@ func GetByUUID(uuid string) (td Data, err error) {
func getByUUIDSQLite(uuid string) (Data, error) {
var td Data
var ts time.Time
query := modelSQLite.Trades(qm.Where("id = ?", uuid))
query := sqlite3.Trades(qm.Where("id = ?", uuid))
result, err := query.One(context.Background(), database.DB.SQL)
if err != nil {
return td, err
@@ -179,8 +263,8 @@ func getByUUIDSQLite(uuid string) (Data, error) {
}
func getByUUIDPostgres(uuid string) (td Data, err error) {
query := modelPSQL.Trades(qm.Where("id = ?", uuid))
var result *modelPSQL.Trade
query := postgres.Trades(qm.Where("id = ?", uuid))
var result *postgres.Trade
result, err = query.One(context.Background(), database.DB.SQL)
if err != nil {
return td, err
@@ -232,8 +316,8 @@ func getInRangeSQLite(exchangeName, assetType, base, quote string, startDate, en
"quote": strings.ToUpper(quote),
}
q := generateQuery(wheres, startDate, endDate)
query := modelSQLite.Trades(q...)
var result []*modelSQLite.Trade
query := sqlite3.Trades(q...)
var result []*sqlite3.Trade
result, err = query.All(context.Background(), database.DB.SQL)
if err != nil {
return td, err
@@ -274,8 +358,8 @@ func getInRangePostgres(exchangeName, assetType, base, quote string, startDate,
"quote": strings.ToUpper(quote),
}
q := generateQuery(wheres, startDate, endDate)
query := modelPSQL.Trades(q...)
var result []*modelPSQL.Trade
query := postgres.Trades(q...)
var result []*postgres.Trade
result, err = query.All(context.Background(), database.DB.SQL)
if err != nil {
return td, err
@@ -333,7 +417,7 @@ func deleteTradesSQLite(ctx context.Context, tx *sql.Tx, trades ...Data) error {
for i := range trades {
tradeIDs = append(tradeIDs, trades[i].ID)
}
query := modelSQLite.Trades(qm.WhereIn(`id in ?`, tradeIDs...))
query := sqlite3.Trades(qm.WhereIn(`id in ?`, tradeIDs...))
_, err := query.DeleteAll(ctx, tx)
return err
}
@@ -343,7 +427,7 @@ func deleteTradesPostgres(ctx context.Context, tx *sql.Tx, trades ...Data) error
for i := range trades {
tradeIDs = append(tradeIDs, trades[i].ID)
}
query := modelPSQL.Trades(qm.WhereIn(`id in ?`, tradeIDs...))
query := postgres.Trades(qm.WhereIn(`id in ?`, tradeIDs...))
_, err := query.DeleteAll(ctx, tx)
return err
}

View File

@@ -15,6 +15,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/database/repository/exchange"
"github.com/thrasher-corp/gocryptotrader/database/testhelpers"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
)
@@ -97,12 +98,12 @@ func TestTrades(t *testing.T) {
func tradeSQLTester(t *testing.T) {
var trades, trades2 []Data
firstTime := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)
for i := 0; i < 20; i++ {
uu, _ := uuid.NewV4()
trades = append(trades, Data{
ID: uu.String(),
Timestamp: time.Now(),
Timestamp: firstTime.Add(time.Minute * time.Duration(i)),
Exchange: testExchanges[0].Name,
Base: currency.BTC.String(),
Quote: currency.USD.String(),
@@ -122,7 +123,7 @@ func tradeSQLTester(t *testing.T) {
uu, _ := uuid.NewV4()
trades2 = append(trades2, Data{
ID: uu.String(),
Timestamp: time.Now(),
Timestamp: firstTime.Add(time.Minute * time.Duration(i)),
Exchange: testExchanges[0].Name,
Base: currency.BTC.String(),
Quote: currency.USD.String(),
@@ -142,8 +143,8 @@ func tradeSQLTester(t *testing.T) {
asset.Spot.String(),
currency.BTC.String(),
currency.USD.String(),
time.Now().Add(-time.Hour),
time.Now().Add(time.Hour),
firstTime.Add(-time.Hour),
firstTime.Add(time.Hour),
)
if err != nil {
t.Error(err)
@@ -157,8 +158,8 @@ func tradeSQLTester(t *testing.T) {
asset.Spot.String(),
currency.BTC.String(),
currency.USD.String(),
time.Now().Add(-time.Hour),
time.Now().Add(time.Hour))
firstTime.Add(-time.Hour),
firstTime.Add(time.Hour))
if err != nil {
t.Error(err)
}
@@ -166,6 +167,24 @@ func tradeSQLTester(t *testing.T) {
t.Error("Bad get!")
}
ranges, err := kline.CalculateCandleDateRanges(firstTime, firstTime.Add(20*time.Minute), kline.OneMin, 100)
if err != nil {
t.Error(err)
}
err = VerifyTradeInIntervals(testExchanges[0].Name,
asset.Spot.String(),
currency.BTC.String(),
currency.USD.String(),
ranges)
if err != nil {
t.Error(err)
}
if !ranges.HasDataAtDate(firstTime) {
t.Error("expected data")
}
err = DeleteTrades(trades...)
if err != nil {
t.Error(err)