Files
gocryptotrader/engine/datahistory_manager.go
Scott 63257ce4ca Improvement: Speeding up slow tests (#707)
* Speeds up tests

* Reduces time.Sleeps, lowers CreateTestBot complexity. Breaks things

* Removal of unecessary config reads. Parallel tests. Lower times

* Speeds up recent trades results

* mini update

* zoooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooom

* Removes the dupes

* Lint

* post cherrypick

* Fix rare kraken data race

* Fixes banking global issues. Fixes postgres trades

* rmline for appveyor test

* Expands timeout in event that channel is closed before send

* Fix data race

* No rows, no bows and definitely no shows

* Removes parallel from createsnapshot tests

* Extends timedmutext test a smidge. Exchange fatality

* Shorter end timeframe and bigger candle
2021-07-07 12:42:03 +10:00

931 lines
27 KiB
Go

package engine
import (
"database/sql"
"errors"
"fmt"
"strings"
"sync/atomic"
"time"
"github.com/gofrs/uuid"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/config"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/database/repository/candle"
"github.com/thrasher-corp/gocryptotrader/database/repository/datahistoryjob"
"github.com/thrasher-corp/gocryptotrader/database/repository/datahistoryjobresult"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/trade"
"github.com/thrasher-corp/gocryptotrader/log"
)
// SetupDataHistoryManager creates a data history manager subsystem
func SetupDataHistoryManager(em iExchangeManager, dcm iDatabaseConnectionManager, cfg *config.DataHistoryManager) (*DataHistoryManager, error) {
if em == nil {
return nil, errNilExchangeManager
}
if dcm == nil {
return nil, errNilDatabaseConnectionManager
}
if cfg == nil {
return nil, errNilConfig
}
if cfg.CheckInterval <= 0 {
cfg.CheckInterval = defaultDataHistoryTicker
}
if cfg.MaxJobsPerCycle == 0 {
cfg.MaxJobsPerCycle = defaultDataHistoryMaxJobsPerCycle
}
db := dcm.GetInstance()
dhj, err := datahistoryjob.Setup(db)
if err != nil {
return nil, err
}
dhjr, err := datahistoryjobresult.Setup(db)
if err != nil {
return nil, err
}
return &DataHistoryManager{
exchangeManager: em,
databaseConnectionInstance: db,
shutdown: make(chan struct{}),
interval: time.NewTicker(cfg.CheckInterval),
jobDB: dhj,
jobResultDB: dhjr,
maxJobsPerCycle: cfg.MaxJobsPerCycle,
verbose: cfg.Verbose,
tradeLoader: trade.HasTradesInRanges,
candleLoader: kline.LoadFromDatabase,
}, nil
}
// Start runs the subsystem
func (m *DataHistoryManager) Start() error {
if m == nil {
return ErrNilSubsystem
}
if !atomic.CompareAndSwapInt32(&m.started, 0, 1) {
return ErrSubSystemAlreadyStarted
}
m.shutdown = make(chan struct{})
m.run()
log.Debugf(log.DataHistory, "Data history manager %v", MsgSubSystemStarted)
return nil
}
// IsRunning checks whether the subsystem is running
func (m *DataHistoryManager) IsRunning() bool {
if m == nil {
return false
}
return atomic.LoadInt32(&m.started) == 1
}
// Stop stops the subsystem
func (m *DataHistoryManager) Stop() error {
if m == nil {
return ErrNilSubsystem
}
if !atomic.CompareAndSwapInt32(&m.started, 1, 0) {
return ErrSubSystemNotStarted
}
close(m.shutdown)
log.Debugf(log.DataHistory, "Data history manager %v", MsgSubSystemShutdown)
return nil
}
// retrieveJobs will connect to the database and look for existing jobs
func (m *DataHistoryManager) retrieveJobs() ([]*DataHistoryJob, error) {
if m == nil {
return nil, ErrNilSubsystem
}
if atomic.LoadInt32(&m.started) == 0 {
return nil, ErrSubSystemNotStarted
}
dbJobs, err := m.jobDB.GetAllIncompleteJobsAndResults()
if err != nil {
return nil, err
}
var response []*DataHistoryJob
for i := range dbJobs {
dbJob, err := m.convertDBModelToJob(&dbJobs[i])
if err != nil {
return nil, err
}
err = m.validateJob(dbJob)
if err != nil {
log.Error(log.DataHistory, err)
continue
}
response = append(response, dbJob)
}
return response, nil
}
// PrepareJobs will validate the config jobs, verify their status with the database
// and return all valid jobs to be processed
// m.jobs will be overridden by this function
func (m *DataHistoryManager) PrepareJobs() ([]*DataHistoryJob, error) {
if m == nil {
return nil, ErrNilSubsystem
}
if atomic.LoadInt32(&m.started) == 0 {
return nil, ErrSubSystemNotStarted
}
m.m.Lock()
defer m.m.Unlock()
jobs, err := m.retrieveJobs()
if err != nil {
defer func() {
err = m.Stop()
if err != nil {
log.Error(log.DataHistory, err)
}
}()
return nil, fmt.Errorf("error retrieving jobs, has everything been setup? Data history manager will shut down. %w", err)
}
err = m.compareJobsToData(jobs...)
if err != nil {
return nil, err
}
return jobs, nil
}
func (m *DataHistoryManager) compareJobsToData(jobs ...*DataHistoryJob) error {
if m == nil {
return ErrNilSubsystem
}
if atomic.LoadInt32(&m.started) == 0 {
return ErrSubSystemNotStarted
}
var err error
for i := range jobs {
jobs[i].rangeHolder, err = kline.CalculateCandleDateRanges(jobs[i].StartDate, jobs[i].EndDate, jobs[i].Interval, uint32(jobs[i].RequestSizeLimit))
if err != nil {
return err
}
var candles kline.Item
switch jobs[i].DataType {
case dataHistoryCandleDataType:
candles, err = m.candleLoader(jobs[i].Exchange, jobs[i].Pair, jobs[i].Asset, jobs[i].Interval, jobs[i].StartDate, jobs[i].EndDate)
if err != nil && !errors.Is(err, candle.ErrNoCandleDataFound) {
return fmt.Errorf("%s could not load candle data: %w", jobs[i].Nickname, err)
}
jobs[i].rangeHolder.SetHasDataFromCandles(candles.Candles)
case dataHistoryTradeDataType:
err := m.tradeLoader(jobs[i].Exchange, jobs[i].Asset.String(), jobs[i].Pair.Base.String(), jobs[i].Pair.Quote.String(), jobs[i].rangeHolder)
if err != nil && err != sql.ErrNoRows {
return fmt.Errorf("%s could not load trade data: %w", jobs[i].Nickname, err)
}
default:
return fmt.Errorf("%s %w %s", jobs[i].Nickname, errUnknownDataType, jobs[i].DataType)
}
}
return nil
}
func (m *DataHistoryManager) run() {
go func() {
validJobs, err := m.PrepareJobs()
if err != nil {
log.Error(log.DataHistory, err)
}
m.m.Lock()
m.jobs = validJobs
m.m.Unlock()
for {
select {
case <-m.shutdown:
return
case <-m.interval.C:
if m.databaseConnectionInstance.IsConnected() {
go func() {
if err := m.runJobs(); err != nil {
log.Error(log.DataHistory, err)
}
}()
}
}
}
}()
}
func (m *DataHistoryManager) runJobs() error {
if m == nil {
return ErrNilSubsystem
}
if atomic.LoadInt32(&m.started) == 0 {
return ErrSubSystemNotStarted
}
if !atomic.CompareAndSwapInt32(&m.processing, 0, 1) {
return fmt.Errorf("runJobs %w", errAlreadyRunning)
}
defer atomic.StoreInt32(&m.processing, 0)
validJobs, err := m.PrepareJobs()
if err != nil {
return err
}
m.m.Lock()
defer func() {
m.m.Unlock()
}()
m.jobs = validJobs
log.Infof(log.DataHistory, "processing data history jobs")
for i := 0; (i < int(m.maxJobsPerCycle) || m.maxJobsPerCycle == -1) && i < len(m.jobs); i++ {
err := m.runJob(m.jobs[i])
if err != nil {
log.Error(log.DataHistory, err)
}
if m.verbose {
log.Debugf(log.DataHistory, "completed run of data history job %v", m.jobs[i].Nickname)
}
}
log.Infof(log.DataHistory, "completed run of data history jobs")
return nil
}
// runJob processes an active job, retrieves candle or trade data
// for a given date range and saves all results to the database
func (m *DataHistoryManager) runJob(job *DataHistoryJob) error {
if m == nil {
return ErrNilSubsystem
}
if atomic.LoadInt32(&m.started) == 0 {
return ErrSubSystemNotStarted
}
if job.Status != dataHistoryStatusActive {
return nil
}
var intervalsProcessed int64
if job.rangeHolder == nil || len(job.rangeHolder.Ranges) == 0 {
return fmt.Errorf("%s %w invalid start/end range %s-%s",
job.Nickname,
errJobInvalid,
job.StartDate.Format(common.SimpleTimeFormatWithTimezone),
job.EndDate.Format(common.SimpleTimeFormatWithTimezone),
)
}
exch := m.exchangeManager.GetExchangeByName(job.Exchange)
if exch == nil {
return fmt.Errorf("%s %w, cannot process job %s for %s %s",
job.Exchange,
errExchangeNotLoaded,
job.Nickname,
job.Asset,
job.Pair)
}
if m.verbose {
log.Debugf(log.DataHistory, "running data history job %v start: %s end: %s interval: %s datatype: %s",
job.Nickname,
job.StartDate,
job.EndDate,
job.Interval,
job.DataType)
}
ranges:
for i := range job.rangeHolder.Ranges {
isCompleted := true
for j := range job.rangeHolder.Ranges[i].Intervals {
if !job.rangeHolder.Ranges[i].Intervals[j].HasData {
isCompleted = false
break
}
}
if isCompleted ||
intervalsProcessed >= job.RunBatchLimit {
continue
}
var failures int64
hasDataInRange := false
resultLookup := job.Results[job.rangeHolder.Ranges[i].Start.Time]
for x := range resultLookup {
switch resultLookup[x].Status {
case dataHistoryIntervalMissingData:
continue ranges
case dataHistoryStatusFailed:
failures++
case dataHistoryStatusComplete:
// this can occur in the scenario where data is missing
// however no errors were encountered when data is missing
// eg an exchange only returns an empty slice
// or the exchange is simply missing the data and does not have an error
hasDataInRange = true
}
}
if failures >= job.MaxRetryAttempts {
// failure threshold reached, we should not attempt
// to check this interval again
for x := range resultLookup {
resultLookup[x].Status = dataHistoryIntervalMissingData
}
job.Results[job.rangeHolder.Ranges[i].Start.Time] = resultLookup
continue
}
if hasDataInRange {
continue
}
if m.verbose {
log.Debugf(log.DataHistory, "job %s processing range %v-%v", job.Nickname, job.rangeHolder.Ranges[i].Start, job.rangeHolder.Ranges[i].End)
}
intervalsProcessed++
id, err := uuid.NewV4()
if err != nil {
return err
}
result := DataHistoryJobResult{
ID: id,
JobID: job.ID,
IntervalStartDate: job.rangeHolder.Ranges[i].Start.Time,
IntervalEndDate: job.rangeHolder.Ranges[i].End.Time,
Status: dataHistoryStatusComplete,
Date: time.Now(),
}
// processing the job
switch job.DataType {
case dataHistoryCandleDataType:
candles, err := exch.GetHistoricCandlesExtended(job.Pair, job.Asset, job.rangeHolder.Ranges[i].Start.Time, job.rangeHolder.Ranges[i].End.Time, job.Interval)
if err != nil {
result.Result += "could not get candles: " + err.Error() + ". "
result.Status = dataHistoryStatusFailed
break
}
job.rangeHolder.SetHasDataFromCandles(candles.Candles)
for j := range job.rangeHolder.Ranges[i].Intervals {
if !job.rangeHolder.Ranges[i].Intervals[j].HasData {
result.Status = dataHistoryStatusFailed
result.Result += fmt.Sprintf("missing data from %v - %v. ",
job.rangeHolder.Ranges[i].Intervals[j].Start.Time.Format(common.SimpleTimeFormatWithTimezone),
job.rangeHolder.Ranges[i].Intervals[j].End.Time.Format(common.SimpleTimeFormatWithTimezone))
}
}
_, err = kline.StoreInDatabase(&candles, true)
if err != nil {
result.Result += "could not save results: " + err.Error() + ". "
result.Status = dataHistoryStatusFailed
}
case dataHistoryTradeDataType:
trades, err := exch.GetHistoricTrades(job.Pair, job.Asset, job.rangeHolder.Ranges[i].Start.Time, job.rangeHolder.Ranges[i].End.Time)
if err != nil {
result.Result += "could not get trades: " + err.Error() + ". "
result.Status = dataHistoryStatusFailed
break
}
candles, err := trade.ConvertTradesToCandles(job.Interval, trades...)
if err != nil {
result.Result += "could not convert candles to trades: " + err.Error() + ". "
result.Status = dataHistoryStatusFailed
break
}
job.rangeHolder.SetHasDataFromCandles(candles.Candles)
for j := range job.rangeHolder.Ranges[i].Intervals {
if !job.rangeHolder.Ranges[i].Intervals[j].HasData {
result.Status = dataHistoryStatusFailed
result.Result += fmt.Sprintf("missing data from %v - %v. ",
job.rangeHolder.Ranges[i].Intervals[j].Start.Time.Format(common.SimpleTimeFormatWithTimezone),
job.rangeHolder.Ranges[i].Intervals[j].End.Time.Format(common.SimpleTimeFormatWithTimezone))
}
}
err = trade.SaveTradesToDatabase(trades...)
if err != nil {
result.Result += "could not save results: " + err.Error() + ". "
result.Status = dataHistoryStatusFailed
}
default:
return errUnknownDataType
}
lookup := job.Results[result.IntervalStartDate]
lookup = append(lookup, result)
job.Results[result.IntervalStartDate] = lookup
}
completed := true
allResultsSuccessful := true
allResultsFailed := true
completionCheck:
for i := range job.rangeHolder.Ranges {
result, ok := job.Results[job.rangeHolder.Ranges[i].Start.Time]
if !ok {
completed = false
}
results:
for j := range result {
switch result[j].Status {
case dataHistoryIntervalMissingData:
allResultsSuccessful = false
break results
case dataHistoryStatusComplete:
allResultsFailed = false
break results
default:
completed = false
break completionCheck
}
}
}
if completed {
switch {
case allResultsSuccessful:
job.Status = dataHistoryStatusComplete
case allResultsFailed:
job.Status = dataHistoryStatusFailed
default:
job.Status = dataHistoryIntervalMissingData
}
log.Infof(log.DataHistory, "job %s finished! Status: %s", job.Nickname, job.Status)
}
dbJob := m.convertJobToDBModel(job)
err := m.jobDB.Upsert(dbJob)
if err != nil {
return fmt.Errorf("job %s failed to update database: %w", job.Nickname, err)
}
dbJobResults := m.convertJobResultToDBResult(job.Results)
err = m.jobResultDB.Upsert(dbJobResults...)
if err != nil {
return fmt.Errorf("job %s failed to insert job results to database: %w", job.Nickname, err)
}
return nil
}
// UpsertJob allows for GRPC interaction to upsert a job to be processed
func (m *DataHistoryManager) UpsertJob(job *DataHistoryJob, insertOnly bool) error {
if m == nil {
return ErrNilSubsystem
}
if !m.IsRunning() {
return ErrSubSystemNotStarted
}
if job == nil {
return errNilJob
}
if job.Nickname == "" {
return fmt.Errorf("upsert job %w", errNicknameUnset)
}
j, err := m.GetByNickname(job.Nickname, false)
if err != nil && !errors.Is(err, errJobNotFound) {
return err
}
if insertOnly && j != nil ||
(j != nil && j.Status != dataHistoryStatusActive) {
return fmt.Errorf("upsert job %w nickname: %s - status: %s ", errNicknameInUse, j.Nickname, j.Status)
}
m.m.Lock()
defer m.m.Unlock()
err = m.validateJob(job)
if err != nil {
return err
}
toUpdate := false
if !insertOnly {
for i := range m.jobs {
if !strings.EqualFold(m.jobs[i].Nickname, job.Nickname) {
continue
}
toUpdate = true
job.ID = m.jobs[i].ID
if job.Exchange != "" && m.jobs[i].Exchange != job.Exchange {
m.jobs[i].Exchange = job.Exchange
}
if job.Asset != "" && m.jobs[i].Asset != job.Asset {
m.jobs[i].Asset = job.Asset
}
if !job.Pair.IsEmpty() && !m.jobs[i].Pair.Equal(job.Pair) {
m.jobs[i].Pair = job.Pair
}
if !job.StartDate.IsZero() && !m.jobs[i].StartDate.Equal(job.StartDate) {
m.jobs[i].StartDate = job.StartDate
}
if !job.EndDate.IsZero() && !m.jobs[i].EndDate.Equal(job.EndDate) {
m.jobs[i].EndDate = job.EndDate
}
if job.Interval != 0 && m.jobs[i].Interval != job.Interval {
m.jobs[i].Interval = job.Interval
}
if job.RunBatchLimit != 0 && m.jobs[i].RunBatchLimit != job.RunBatchLimit {
m.jobs[i].RunBatchLimit = job.RunBatchLimit
}
if job.RequestSizeLimit != 0 && m.jobs[i].RequestSizeLimit != job.RequestSizeLimit {
m.jobs[i].RequestSizeLimit = job.RequestSizeLimit
}
if job.MaxRetryAttempts != 0 && m.jobs[i].MaxRetryAttempts != job.MaxRetryAttempts {
m.jobs[i].MaxRetryAttempts = job.MaxRetryAttempts
}
m.jobs[i].DataType = job.DataType
m.jobs[i].Status = job.Status
break
}
}
if job.ID == uuid.Nil {
job.ID, err = uuid.NewV4()
if err != nil {
return err
}
}
job.rangeHolder, err = kline.CalculateCandleDateRanges(job.StartDate, job.EndDate, job.Interval, uint32(job.RequestSizeLimit))
if err != nil {
return err
}
if !toUpdate {
m.jobs = append(m.jobs, job)
}
dbJob := m.convertJobToDBModel(job)
return m.jobDB.Upsert(dbJob)
}
func (m *DataHistoryManager) validateJob(job *DataHistoryJob) error {
if job == nil {
return errNilJob
}
if !job.Asset.IsValid() {
return fmt.Errorf("job %s %w %s", job.Nickname, asset.ErrNotSupported, job.Asset)
}
if job.Pair.IsEmpty() {
return fmt.Errorf("job %s %w", job.Nickname, errCurrencyPairUnset)
}
if !job.Status.Valid() {
return fmt.Errorf("job %s %w: %s", job.Nickname, errInvalidDataHistoryStatus, job.Status)
}
if !job.DataType.Valid() {
return fmt.Errorf("job %s %w: %s", job.Nickname, errInvalidDataHistoryDataType, job.DataType)
}
exch := m.exchangeManager.GetExchangeByName(job.Exchange)
if exch == nil {
return fmt.Errorf("job %s cannot process job: %s %w",
job.Nickname,
job.Exchange,
errExchangeNotLoaded)
}
pairs, err := exch.GetEnabledPairs(job.Asset)
if err != nil {
return fmt.Errorf("job %s exchange %s asset %s currency %s %w", job.Nickname, job.Exchange, job.Asset, job.Pair, err)
}
if !pairs.Contains(job.Pair, false) {
return fmt.Errorf("job %s exchange %s asset %s currency %s %w", job.Nickname, job.Exchange, job.Asset, job.Pair, errCurrencyNotEnabled)
}
if job.Results == nil {
job.Results = make(map[time.Time][]DataHistoryJobResult)
}
if job.RunBatchLimit <= 0 {
log.Warnf(log.DataHistory, "job %s has unset batch limit, defaulting to %v", job.Nickname, defaultDataHistoryBatchLimit)
job.RunBatchLimit = defaultDataHistoryBatchLimit
}
if job.MaxRetryAttempts <= 0 {
log.Warnf(log.DataHistory, "job %s has unset max retry limit, defaulting to %v", job.Nickname, defaultDataHistoryRetryAttempts)
job.MaxRetryAttempts = defaultDataHistoryRetryAttempts
}
if job.RequestSizeLimit <= 0 {
job.RequestSizeLimit = defaultDataHistoryRequestSizeLimit
}
if job.DataType == dataHistoryTradeDataType &&
(job.Interval >= kline.FourHour || job.Interval <= kline.TenMin) {
log.Warnf(log.DataHistory, "job %s interval %v outside limits, defaulting to %v", job.Nickname, job.Interval.Word(), defaultDataHistoryTradeInterval)
job.Interval = defaultDataHistoryTradeInterval
}
b := exch.GetBase()
if !b.Features.Enabled.Kline.Intervals[job.Interval.Word()] {
return fmt.Errorf("job %s %s %w %s", job.Nickname, job.Interval.Word(), kline.ErrUnsupportedInterval, job.Exchange)
}
job.StartDate = job.StartDate.Round(job.Interval.Duration())
job.EndDate = job.EndDate.Round(job.Interval.Duration())
if err := common.StartEndTimeCheck(job.StartDate, job.EndDate); err != nil {
return fmt.Errorf("job %s %w start: %v end %v", job.Nickname, err, job.StartDate, job.EndDate)
}
return nil
}
// GetByID returns a job's details from its ID
func (m *DataHistoryManager) GetByID(id uuid.UUID) (*DataHistoryJob, error) {
if m == nil {
return nil, ErrNilSubsystem
}
if atomic.LoadInt32(&m.started) == 0 {
return nil, ErrSubSystemNotStarted
}
if id == uuid.Nil {
return nil, errEmptyID
}
m.m.Lock()
for i := range m.jobs {
if m.jobs[i].ID == id {
cpy := *m.jobs[i]
m.m.Unlock()
return &cpy, nil
}
}
m.m.Unlock()
dbJ, err := m.jobDB.GetByID(id.String())
if err != nil {
return nil, fmt.Errorf("%w with id %s %s", errJobNotFound, id, err)
}
result, err := m.convertDBModelToJob(dbJ)
if err != nil {
return nil, fmt.Errorf("could not convert model with id %s %w", id, err)
}
return result, nil
}
// GetByNickname searches for jobs by name and returns it if found
// returns nil if not
// if fullDetails is enabled, it will retrieve all job history results from the database
func (m *DataHistoryManager) GetByNickname(nickname string, fullDetails bool) (*DataHistoryJob, error) {
if m == nil {
return nil, ErrNilSubsystem
}
if atomic.LoadInt32(&m.started) == 0 {
return nil, ErrSubSystemNotStarted
}
if fullDetails {
dbJ, err := m.jobDB.GetJobAndAllResults(nickname)
if err != nil {
return nil, fmt.Errorf("job %s could not load job from database: %w", nickname, err)
}
result, err := m.convertDBModelToJob(dbJ)
if err != nil {
return nil, fmt.Errorf("could not convert model with nickname %s %w", nickname, err)
}
return result, nil
}
m.m.Lock()
for i := range m.jobs {
if strings.EqualFold(m.jobs[i].Nickname, nickname) {
cpy := m.jobs[i]
m.m.Unlock()
return cpy, nil
}
}
m.m.Unlock()
// now try the database
j, err := m.jobDB.GetByNickName(nickname)
if err != nil {
if err == sql.ErrNoRows {
// no need to display normal sql err to user
return nil, errJobNotFound
}
return nil, fmt.Errorf("job %s %w, %s", nickname, errJobNotFound, err)
}
job, err := m.convertDBModelToJob(j)
if err != nil {
return nil, err
}
return job, nil
}
// GetAllJobStatusBetween will return all jobs between two ferns
func (m *DataHistoryManager) GetAllJobStatusBetween(start, end time.Time) ([]*DataHistoryJob, error) {
if m == nil {
return nil, ErrNilSubsystem
}
if atomic.LoadInt32(&m.started) == 0 {
return nil, ErrSubSystemNotStarted
}
if err := common.StartEndTimeCheck(start, end); err != nil {
return nil, err
}
dbJobs, err := m.jobDB.GetJobsBetween(start, end)
if err != nil {
return nil, err
}
var results []*DataHistoryJob
for i := range dbJobs {
dbJob, err := m.convertDBModelToJob(&dbJobs[i])
if err != nil {
return nil, err
}
results = append(results, dbJob)
}
return results, nil
}
// DeleteJob helper function to assist in setting a job to deleted
func (m *DataHistoryManager) DeleteJob(nickname, id string) error {
if m == nil {
return ErrNilSubsystem
}
if atomic.LoadInt32(&m.started) == 0 {
return ErrSubSystemNotStarted
}
if nickname == "" && id == "" {
return errNicknameIDUnset
}
if nickname != "" && id != "" {
return errOnlyNicknameOrID
}
var dbJob *datahistoryjob.DataHistoryJob
var err error
m.m.Lock()
defer m.m.Unlock()
for i := range m.jobs {
if strings.EqualFold(m.jobs[i].Nickname, nickname) ||
m.jobs[i].ID.String() == id {
dbJob = m.convertJobToDBModel(m.jobs[i])
m.jobs = append(m.jobs[:i], m.jobs[i+1:]...)
break
}
}
if dbJob == nil {
if nickname != "" {
dbJob, err = m.jobDB.GetByNickName(nickname)
if err != nil {
return err
}
} else {
dbJob, err = m.jobDB.GetByID(id)
if err != nil {
return err
}
}
}
if dbJob.Status != int64(dataHistoryStatusActive) {
status := dataHistoryStatus(dbJob.Status)
return fmt.Errorf("job: %v status: %s error: %w", dbJob.Nickname, status, errCanOnlyDeleteActiveJobs)
}
dbJob.Status = int64(dataHistoryStatusRemoved)
err = m.jobDB.Upsert(dbJob)
if err != nil {
return err
}
log.Infof(log.DataHistory, "deleted job %v", dbJob.Nickname)
return nil
}
// GetActiveJobs returns all jobs with the status `dataHistoryStatusActive`
func (m *DataHistoryManager) GetActiveJobs() ([]DataHistoryJob, error) {
if m == nil {
return nil, ErrNilSubsystem
}
if !m.IsRunning() {
return nil, ErrSubSystemNotStarted
}
m.m.Lock()
defer m.m.Unlock()
var results []DataHistoryJob
for i := range m.jobs {
if m.jobs[i].Status == dataHistoryStatusActive {
results = append(results, *m.jobs[i])
}
}
return results, nil
}
// GenerateJobSummary returns a human readable summary of a job's status
func (m *DataHistoryManager) GenerateJobSummary(nickname string) (*DataHistoryJobSummary, error) {
if m == nil {
return nil, ErrNilSubsystem
}
job, err := m.GetByNickname(nickname, false)
if err != nil {
return nil, fmt.Errorf("job: %v %w", nickname, err)
}
err = m.compareJobsToData(job)
if err != nil {
return nil, err
}
return &DataHistoryJobSummary{
Nickname: job.Nickname,
Exchange: job.Exchange,
Asset: job.Asset,
Pair: job.Pair,
StartDate: job.StartDate,
EndDate: job.EndDate,
Interval: job.Interval,
Status: job.Status,
DataType: job.DataType,
ResultRanges: job.rangeHolder.DataSummary(true),
}, nil
}
// ----------------------------Lovely-converters----------------------------
func (m *DataHistoryManager) convertDBModelToJob(dbModel *datahistoryjob.DataHistoryJob) (*DataHistoryJob, error) {
id, err := uuid.FromString(dbModel.ID)
if err != nil {
return nil, err
}
cp, err := currency.NewPairFromString(fmt.Sprintf("%s-%s", dbModel.Base, dbModel.Quote))
if err != nil {
return nil, fmt.Errorf("job %s could not format pair %s-%s: %w", dbModel.Nickname, dbModel.Base, dbModel.Quote, err)
}
jobResults, err := m.convertDBResultToJobResult(dbModel.Results)
if err != nil {
return nil, fmt.Errorf("job %s could not convert database job: %w", dbModel.Nickname, err)
}
return &DataHistoryJob{
ID: id,
Nickname: dbModel.Nickname,
Exchange: dbModel.ExchangeName,
Asset: asset.Item(dbModel.Asset),
Pair: cp,
StartDate: dbModel.StartDate,
EndDate: dbModel.EndDate,
Interval: kline.Interval(dbModel.Interval),
RunBatchLimit: dbModel.BatchSize,
RequestSizeLimit: dbModel.RequestSizeLimit,
DataType: dataHistoryDataType(dbModel.DataType),
MaxRetryAttempts: dbModel.MaxRetryAttempts,
Status: dataHistoryStatus(dbModel.Status),
CreatedDate: dbModel.CreatedDate,
Results: jobResults,
}, nil
}
func (m *DataHistoryManager) convertDBResultToJobResult(dbModels []*datahistoryjobresult.DataHistoryJobResult) (map[time.Time][]DataHistoryJobResult, error) {
result := make(map[time.Time][]DataHistoryJobResult)
for i := range dbModels {
id, err := uuid.FromString(dbModels[i].ID)
if err != nil {
return nil, err
}
jobID, err := uuid.FromString(dbModels[i].JobID)
if err != nil {
return nil, err
}
lookup := result[dbModels[i].IntervalStartDate]
lookup = append(lookup, DataHistoryJobResult{
ID: id,
JobID: jobID,
IntervalStartDate: dbModels[i].IntervalStartDate,
IntervalEndDate: dbModels[i].IntervalEndDate,
Status: dataHistoryStatus(dbModels[i].Status),
Result: dbModels[i].Result,
Date: dbModels[i].Date,
})
result[dbModels[i].IntervalStartDate] = lookup
}
return result, nil
}
func (m *DataHistoryManager) convertJobResultToDBResult(results map[time.Time][]DataHistoryJobResult) []*datahistoryjobresult.DataHistoryJobResult {
var response []*datahistoryjobresult.DataHistoryJobResult
for _, v := range results {
for i := range v {
response = append(response, &datahistoryjobresult.DataHistoryJobResult{
ID: v[i].ID.String(),
JobID: v[i].JobID.String(),
IntervalStartDate: v[i].IntervalStartDate,
IntervalEndDate: v[i].IntervalEndDate,
Status: int64(v[i].Status),
Result: v[i].Result,
Date: v[i].Date,
})
}
}
return response
}
func (m *DataHistoryManager) convertJobToDBModel(job *DataHistoryJob) *datahistoryjob.DataHistoryJob {
model := &datahistoryjob.DataHistoryJob{
Nickname: job.Nickname,
ExchangeName: job.Exchange,
Asset: job.Asset.String(),
Base: job.Pair.Base.String(),
Quote: job.Pair.Quote.String(),
StartDate: job.StartDate,
EndDate: job.EndDate,
Interval: int64(job.Interval.Duration()),
RequestSizeLimit: job.RequestSizeLimit,
DataType: int64(job.DataType),
MaxRetryAttempts: job.MaxRetryAttempts,
Status: int64(job.Status),
CreatedDate: job.CreatedDate,
BatchSize: job.RunBatchLimit,
Results: m.convertJobResultToDBResult(job.Results),
}
if job.ID != uuid.Nil {
model.ID = job.ID.String()
}
return model
}