mirror of
https://github.com/d0zingcat/gocryptotrader.git
synced 2026-05-13 23:16:45 +00:00
common: update Errors type (#1129)
* common: adjust common error slice to allow multi errors.Is matching and conform to interface better * zb: forgot to save? * linties: fixies * linties: word change as well. * nitters: glorious * buts * nitters: fix glorious bug * Update common/common.go Co-authored-by: Scott <gloriousCode@users.noreply.github.com> * nitters: shifty --------- Co-authored-by: Ryan O'Hara-Reid <ryan.oharareid@thrasher.io> Co-authored-by: Scott <gloriousCode@users.noreply.github.com>
This commit is contained in:
@@ -14,8 +14,6 @@ import (
|
||||
|
||||
// CalculateResults calculates all statistics for the exchange, asset, currency pair
|
||||
func (c *CurrencyPairStatistic) CalculateResults(riskFreeRate decimal.Decimal) error {
|
||||
var errs gctcommon.Errors
|
||||
var err error
|
||||
first := c.Events[0]
|
||||
sep := fmt.Sprintf("%v %v %v |\t", first.DataEvent.GetExchange(), first.DataEvent.GetAssetType(), first.DataEvent.Pair())
|
||||
|
||||
@@ -54,7 +52,7 @@ func (c *CurrencyPairStatistic) CalculateResults(riskFreeRate decimal.Decimal) e
|
||||
c.StrategyMovement = last.Holdings.TotalValue.Sub(first.Holdings.TotalValue).Div(first.Holdings.TotalValue).Mul(oneHundred)
|
||||
}
|
||||
c.analysePNLGrowth()
|
||||
err = c.calculateHighestCommittedFunds()
|
||||
err := c.calculateHighestCommittedFunds()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -87,9 +85,10 @@ func (c *CurrencyPairStatistic) CalculateResults(riskFreeRate decimal.Decimal) e
|
||||
// ratio calculations as no movement has been made
|
||||
benchmarkRates = benchmarkRates[1:]
|
||||
returnsPerCandle = returnsPerCandle[1:]
|
||||
var errs error
|
||||
c.MaxDrawdown, err = CalculateBiggestEventDrawdown(allDataEvents)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = gctcommon.AppendError(errs, err)
|
||||
}
|
||||
|
||||
interval := first.DataEvent.GetInterval()
|
||||
@@ -109,7 +108,7 @@ func (c *CurrencyPairStatistic) CalculateResults(riskFreeRate decimal.Decimal) e
|
||||
decimal.NewFromInt(int64(len(c.Events))),
|
||||
)
|
||||
if err != nil && !errors.Is(err, gctmath.ErrPowerDifferenceTooSmall) {
|
||||
errs = append(errs, err)
|
||||
errs = gctcommon.AppendError(errs, err)
|
||||
}
|
||||
c.CompoundAnnualGrowthRate = cagr
|
||||
}
|
||||
@@ -124,10 +123,7 @@ func (c *CurrencyPairStatistic) CalculateResults(riskFreeRate decimal.Decimal) e
|
||||
c.UnrealisedPNL = last.PNL.GetUnrealisedPNL().PNL
|
||||
c.RealisedPNL = last.PNL.GetRealisedPNL().PNL
|
||||
}
|
||||
if len(errs) > 0 {
|
||||
return errs
|
||||
}
|
||||
return nil
|
||||
return errs
|
||||
}
|
||||
|
||||
func (c *CurrencyPairStatistic) calculateHighestCommittedFunds() error {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package statistics
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"time"
|
||||
@@ -71,8 +72,7 @@ func (s *Statistic) PrintTotalResults() {
|
||||
// grouped by time to allow a clearer picture of events
|
||||
func (s *Statistic) PrintAllEventsChronologically() {
|
||||
log.Info(common.Statistics, common.CMDColours.H1+"------------------Events-------------------------------------"+common.CMDColours.Default)
|
||||
var errs gctcommon.Errors
|
||||
var err error
|
||||
var errs error
|
||||
var results []eventOutputHolder
|
||||
for _, exchangeMap := range s.ExchangeAssetPairStatistics {
|
||||
for _, assetMap := range exchangeMap {
|
||||
@@ -81,25 +81,26 @@ func (s *Statistic) PrintAllEventsChronologically() {
|
||||
for i := range currencyStatistic.Events {
|
||||
var result string
|
||||
var tt time.Time
|
||||
var err error
|
||||
switch {
|
||||
case currencyStatistic.Events[i].FillEvent != nil:
|
||||
result, err = s.CreateLog(currencyStatistic.Events[i].FillEvent)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = gctcommon.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
tt = currencyStatistic.Events[i].FillEvent.GetTime()
|
||||
case currencyStatistic.Events[i].SignalEvent != nil:
|
||||
result, err = s.CreateLog(currencyStatistic.Events[i].SignalEvent)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = gctcommon.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
tt = currencyStatistic.Events[i].SignalEvent.GetTime()
|
||||
case currencyStatistic.Events[i].DataEvent != nil:
|
||||
result, err = s.CreateLog(currencyStatistic.Events[i].DataEvent)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = gctcommon.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
tt = currencyStatistic.Events[i].DataEvent.GetTime()
|
||||
@@ -121,10 +122,10 @@ func (s *Statistic) PrintAllEventsChronologically() {
|
||||
log.Info(common.Statistics, results[i].Events[j])
|
||||
}
|
||||
}
|
||||
if len(errs) > 0 {
|
||||
if errs != nil {
|
||||
log.Info(common.Statistics, common.CMDColours.Error+"------------------Errors-------------------------------------"+common.CMDColours.Default)
|
||||
for i := range errs {
|
||||
log.Error(common.Statistics, errs[i].Error())
|
||||
for err := errors.Unwrap(errs); err != nil; err = errors.Unwrap(errs) {
|
||||
log.Error(common.Statistics, err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -206,7 +207,6 @@ func (s *Statistic) CreateLog(data common.Event) (string, error) {
|
||||
|
||||
// PrintResults outputs all calculated statistics to the command line
|
||||
func (c *CurrencyPairStatistic) PrintResults(e string, a asset.Item, p currency.Pair, usingExchangeLevelFunding bool) {
|
||||
var errs gctcommon.Errors
|
||||
sort.Slice(c.Events, func(i, j int) bool {
|
||||
return c.Events[i].Time.Before(c.Events[j].Time)
|
||||
})
|
||||
@@ -302,12 +302,6 @@ func (c *CurrencyPairStatistic) PrintResults(e string, a asset.Item, p currency.
|
||||
log.Infof(common.CurrencyStatistics, "%s Final Unrealised PNL: %s", sep, convert.DecimalToHumanFriendlyString(unrealised.PNL, 8, ".", ","))
|
||||
log.Infof(common.CurrencyStatistics, "%s Final Realised PNL: %s", sep, convert.DecimalToHumanFriendlyString(realised.PNL, 8, ".", ","))
|
||||
}
|
||||
if len(errs) > 0 {
|
||||
log.Info(common.CurrencyStatistics, common.CMDColours.Error+"------------------Errors-------------------------------------"+common.CMDColours.Default)
|
||||
for i := range errs {
|
||||
log.Error(common.CurrencyStatistics, errs[i].Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// PrintResults outputs all calculated funding statistics to the command line
|
||||
|
||||
@@ -74,20 +74,16 @@ func (s *Strategy) SupportsSimultaneousProcessing() bool {
|
||||
// For dollarcostaverage, the strategy is always "buy", so it uses the OnSignal function
|
||||
func (s *Strategy) OnSimultaneousSignals(d []data.Handler, _ funding.IFundingTransferer, _ portfolio.Handler) ([]signal.Event, error) {
|
||||
var resp []signal.Event
|
||||
var errs gctcommon.Errors
|
||||
var errs error
|
||||
for i := range d {
|
||||
sigEvent, err := s.OnSignal(d[i], nil, nil)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = gctcommon.AppendError(errs, err)
|
||||
} else {
|
||||
resp = append(resp, sigEvent)
|
||||
}
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
return nil, errs
|
||||
}
|
||||
return resp, nil
|
||||
return resp, errs
|
||||
}
|
||||
|
||||
// SetCustomSettings not required for DCA
|
||||
|
||||
@@ -114,7 +114,7 @@ func (s *Strategy) SupportsSimultaneousProcessing() bool {
|
||||
// in allowing a strategy to only place an order for X currency if Y currency's price is Z
|
||||
func (s *Strategy) OnSimultaneousSignals(d []data.Handler, _ funding.IFundingTransferer, _ portfolio.Handler) ([]signal.Event, error) {
|
||||
var resp []signal.Event
|
||||
var errs gctcommon.Errors
|
||||
var errs error
|
||||
for i := range d {
|
||||
latest, err := d[i].Latest()
|
||||
if err != nil {
|
||||
@@ -122,16 +122,16 @@ func (s *Strategy) OnSimultaneousSignals(d []data.Handler, _ funding.IFundingTra
|
||||
}
|
||||
sigEvent, err := s.OnSignal(d[i], nil, nil)
|
||||
if err != nil {
|
||||
errs = append(errs, fmt.Errorf("%v %v %v %w", latest.GetExchange(), latest.GetAssetType(), latest.Pair(), err))
|
||||
errs = gctcommon.AppendError(errs, fmt.Errorf("%v %v %v %w",
|
||||
latest.GetExchange(),
|
||||
latest.GetAssetType(),
|
||||
latest.Pair(),
|
||||
err))
|
||||
} else {
|
||||
resp = append(resp, sigEvent)
|
||||
}
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
return nil, errs
|
||||
}
|
||||
return resp, nil
|
||||
return resp, errs
|
||||
}
|
||||
|
||||
// SetCustomSettings allows a user to modify the RSI limits in their config
|
||||
|
||||
@@ -433,28 +433,78 @@ func InArray(val, array interface{}) (exists bool, index int) {
|
||||
return
|
||||
}
|
||||
|
||||
// Errors defines multiple errors
|
||||
type Errors []error
|
||||
|
||||
// Error implements error interface
|
||||
func (e Errors) Error() string {
|
||||
if len(e) == 0 {
|
||||
return ""
|
||||
}
|
||||
var r string
|
||||
for i := range e {
|
||||
r += e[i].Error() + ", "
|
||||
}
|
||||
return r[:len(r)-2]
|
||||
// multiError holds all the errors as a slice, this is unexported, so it forces
|
||||
// inbuilt error handling.
|
||||
type multiError struct {
|
||||
loadedErrors []error
|
||||
offset *int
|
||||
}
|
||||
|
||||
// Unwrap implements interface behaviour for errors.Is() matching NOTE: only
|
||||
// returns first element.
|
||||
func (e Errors) Unwrap() error {
|
||||
if len(e) == 0 {
|
||||
return nil
|
||||
// AppendError appends error in a more idiomatic way. This can start out as a
|
||||
// standard error e.g. err := errors.New("random error")
|
||||
// err = AppendError(err, errors.New("another random error"))
|
||||
func AppendError(original, incoming error) error {
|
||||
errSliceP, ok := original.(*multiError)
|
||||
if ok {
|
||||
errSliceP.offset = nil
|
||||
}
|
||||
return e[0]
|
||||
if incoming == nil {
|
||||
return original // Skip append - continue as normal.
|
||||
}
|
||||
if !ok {
|
||||
// This assumes that a standard error is passed in and we can want to
|
||||
// track it and add additional errors.
|
||||
errSliceP = &multiError{}
|
||||
if original != nil {
|
||||
errSliceP.loadedErrors = append(errSliceP.loadedErrors, original)
|
||||
}
|
||||
}
|
||||
if incomingSlice, ok := incoming.(*multiError); ok {
|
||||
// Join slices if needed.
|
||||
errSliceP.loadedErrors = append(errSliceP.loadedErrors, incomingSlice.loadedErrors...)
|
||||
} else {
|
||||
errSliceP.loadedErrors = append(errSliceP.loadedErrors, incoming)
|
||||
}
|
||||
return errSliceP
|
||||
}
|
||||
|
||||
// Error displays all errors comma separated, if unwrapped has been called and
|
||||
// has not been reset will display the individual error
|
||||
func (e *multiError) Error() string {
|
||||
if e.offset != nil {
|
||||
return e.loadedErrors[*e.offset].Error()
|
||||
}
|
||||
allErrors := make([]string, len(e.loadedErrors))
|
||||
for x := range e.loadedErrors {
|
||||
allErrors[x] = e.loadedErrors[x].Error()
|
||||
}
|
||||
return strings.Join(allErrors, ", ")
|
||||
}
|
||||
|
||||
// Unwrap increments the offset so errors.Is() can be called to its individual
|
||||
// error for correct matching.
|
||||
func (e *multiError) Unwrap() error {
|
||||
if e.offset == nil {
|
||||
e.offset = new(int)
|
||||
} else {
|
||||
*e.offset++
|
||||
}
|
||||
if *e.offset == len(e.loadedErrors) {
|
||||
e.offset = nil
|
||||
return nil // Force errors.Is package to return false.
|
||||
}
|
||||
return e
|
||||
}
|
||||
|
||||
// Is checks to see if the errors match. It calls package errors.Is() so that
|
||||
// we can keep fmt.Errorf() trimmings. This is called in errors package at
|
||||
// interface assertion err.(interface{ Is(error) bool }).
|
||||
func (e *multiError) Is(incoming error) bool {
|
||||
if e.offset != nil && errors.Is(e.loadedErrors[*e.offset], incoming) {
|
||||
e.offset = nil
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// StartEndTimeCheck provides some basic checks which occur
|
||||
|
||||
@@ -3,6 +3,7 @@ package common
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
@@ -623,19 +624,106 @@ func TestInArray(t *testing.T) {
|
||||
|
||||
func TestErrors(t *testing.T) {
|
||||
t.Parallel()
|
||||
var test Errors
|
||||
if test.Error() != "" {
|
||||
t.Fatal("string should be nil")
|
||||
}
|
||||
errTestOne := errors.New("test1")
|
||||
test = append(test, errTestOne)
|
||||
|
||||
var errTestOne = errors.New("test1")
|
||||
var test error
|
||||
test = AppendError(test, errTestOne)
|
||||
if !errors.Is(test, errTestOne) {
|
||||
t.Fatal("does not match error")
|
||||
}
|
||||
test = append(test, errors.New("test2"))
|
||||
|
||||
var errTestTwo = errors.New("test2")
|
||||
test = AppendError(test, errTestTwo)
|
||||
if !errors.Is(test, errTestTwo) {
|
||||
t.Fatal("does not match error")
|
||||
}
|
||||
|
||||
if !errors.Is(test, errTestTwo) {
|
||||
t.Fatal("does not match error")
|
||||
}
|
||||
|
||||
// Append nil should log
|
||||
test = AppendError(test, nil)
|
||||
|
||||
if test.Error() != "test1, test2" {
|
||||
t.Fatal("does not match error")
|
||||
}
|
||||
|
||||
// Join slices for whatever reason
|
||||
test = AppendError(test, test)
|
||||
|
||||
if test.Error() != "test1, test2, test1, test2" {
|
||||
t.Fatal("does not match error")
|
||||
}
|
||||
|
||||
var errTestThree = errors.New("test3")
|
||||
if errors.Is(test, errTestThree) {
|
||||
t.Fatal("expected errors.Is() should not match")
|
||||
}
|
||||
|
||||
if errors.Is(test, errTestThree) {
|
||||
t.Fatal("expected errors.Is() should not match")
|
||||
}
|
||||
|
||||
strangeError := errors.New("this is a strange error")
|
||||
|
||||
strangeError = AppendError(strangeError, errTestOne)
|
||||
if strangeError.Error() != "this is a strange error, test1" {
|
||||
t.Fatal("does not match error")
|
||||
}
|
||||
|
||||
// Add trimmings
|
||||
strangeError = AppendError(strangeError, fmt.Errorf("TRIMMINGS: %w", errTestTwo))
|
||||
if strangeError.Error() != "this is a strange error, test1, TRIMMINGS: test2" {
|
||||
t.Fatal("does not match error")
|
||||
}
|
||||
|
||||
if !errors.Is(strangeError, errTestTwo) {
|
||||
t.Fatal("does not match error")
|
||||
}
|
||||
|
||||
if errors.Is(strangeError, errTestThree) {
|
||||
t.Fatal("should not match")
|
||||
}
|
||||
|
||||
// Test again because unwrap was called multiple times.
|
||||
if strangeError.Error() != "this is a strange error, test1, TRIMMINGS: test2" {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", strangeError.Error(), "this is a strange error, test1, TRIMMINGS: test2")
|
||||
}
|
||||
|
||||
strangeError = AppendError(strangeError, errors.New("even more error"))
|
||||
|
||||
strangeError = AppendError(strangeError, nil) // Skip this nasty thing.
|
||||
|
||||
// Test for individual display of errors
|
||||
target := 0
|
||||
for indv := errors.Unwrap(strangeError); indv != nil; indv = errors.Unwrap(indv) {
|
||||
switch target {
|
||||
case 0:
|
||||
if indv.Error() != "this is a strange error" {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", indv.Error(), "this is a strange error")
|
||||
}
|
||||
case 1:
|
||||
if indv.Error() != "test1" {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", indv.Error(), "test1")
|
||||
}
|
||||
|
||||
case 2:
|
||||
if indv.Error() != "TRIMMINGS: test2" {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", indv.Error(), "TRIMMINGS: test2")
|
||||
}
|
||||
case 3:
|
||||
if indv.Error() != "even more error" {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", indv.Error(), "even more error")
|
||||
}
|
||||
default:
|
||||
t.Fatal("unhandled case")
|
||||
}
|
||||
target++
|
||||
}
|
||||
if target != 4 {
|
||||
t.Fatal("targets not achieved")
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseStartEndDate(t *testing.T) {
|
||||
|
||||
@@ -13,17 +13,17 @@ import (
|
||||
// eg if no comparisonTimes match, you will receive 1 TimeRange of Start End with dataInRange = false
|
||||
// eg2 if 1 comparisonTime matches in the middle of start and end, you will receive three ranges
|
||||
func FindTimeRangesContainingData(start, end time.Time, period time.Duration, comparisonTimes []time.Time) ([]TimeRange, error) {
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
if start.IsZero() {
|
||||
errs = append(errs, errors.New("invalid start time"))
|
||||
errs = common.AppendError(errs, errors.New("invalid start time"))
|
||||
}
|
||||
if end.IsZero() {
|
||||
errs = append(errs, errors.New("invalid end time"))
|
||||
errs = common.AppendError(errs, errors.New("invalid end time"))
|
||||
}
|
||||
if err := validatePeriod(period); err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
}
|
||||
if len(errs) > 0 {
|
||||
if errs != nil {
|
||||
return nil, errs
|
||||
}
|
||||
var t TimePeriodCalculator
|
||||
@@ -52,17 +52,17 @@ func validatePeriod(period time.Duration) error {
|
||||
// CalculateTimePeriodsInRange can break down start and end times into time periods
|
||||
// eg 1 hourly intervals
|
||||
func CalculateTimePeriodsInRange(start, end time.Time, period time.Duration) ([]TimePeriod, error) {
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
if start.IsZero() {
|
||||
errs = append(errs, errors.New("invalid start time"))
|
||||
errs = common.AppendError(errs, errors.New("invalid start time"))
|
||||
}
|
||||
if end.IsZero() {
|
||||
errs = append(errs, errors.New("invalid end time"))
|
||||
errs = common.AppendError(errs, errors.New("invalid end time"))
|
||||
}
|
||||
if err := validatePeriod(period); err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
}
|
||||
if len(errs) > 0 {
|
||||
if errs != nil {
|
||||
return nil, errs
|
||||
}
|
||||
|
||||
|
||||
@@ -2046,7 +2046,7 @@ func (s *RPCServer) SetExchangePair(_ context.Context, r *gctrpc.SetExchangePair
|
||||
return nil, err
|
||||
}
|
||||
var pass bool
|
||||
var newErrors common.Errors
|
||||
var newErrors error
|
||||
for i := range r.Pairs {
|
||||
var p currency.Pair
|
||||
p, err = currency.NewPairFromStrings(r.Pairs[i].Base, r.Pairs[i].Quote)
|
||||
@@ -2057,12 +2057,12 @@ func (s *RPCServer) SetExchangePair(_ context.Context, r *gctrpc.SetExchangePair
|
||||
if r.Enable {
|
||||
err = exchCfg.CurrencyPairs.EnablePair(a, p.Format(pairFmt))
|
||||
if err != nil {
|
||||
newErrors = append(newErrors, fmt.Errorf("%s %w", r.Pairs[i], err))
|
||||
newErrors = common.AppendError(newErrors, fmt.Errorf("%s %w", r.Pairs[i], err))
|
||||
continue
|
||||
}
|
||||
err = base.CurrencyPairs.EnablePair(a, p)
|
||||
if err != nil {
|
||||
newErrors = append(newErrors, fmt.Errorf("%s %w", r.Pairs[i], err))
|
||||
newErrors = common.AppendError(newErrors, fmt.Errorf("%s %w", r.Pairs[i], err))
|
||||
continue
|
||||
}
|
||||
pass = true
|
||||
@@ -2072,7 +2072,7 @@ func (s *RPCServer) SetExchangePair(_ context.Context, r *gctrpc.SetExchangePair
|
||||
err = exchCfg.CurrencyPairs.DisablePair(a, p.Format(pairFmt))
|
||||
if err != nil {
|
||||
if errors.Is(err, currency.ErrPairNotFound) {
|
||||
newErrors = append(newErrors, fmt.Errorf("%s %w", r.Pairs[i], errSpecificPairNotEnabled))
|
||||
newErrors = common.AppendError(newErrors, fmt.Errorf("%s %w", r.Pairs[i], errSpecificPairNotEnabled))
|
||||
continue
|
||||
}
|
||||
return nil, err
|
||||
@@ -2081,7 +2081,7 @@ func (s *RPCServer) SetExchangePair(_ context.Context, r *gctrpc.SetExchangePair
|
||||
err = base.CurrencyPairs.DisablePair(a, p)
|
||||
if err != nil {
|
||||
if errors.Is(err, currency.ErrPairNotFound) {
|
||||
newErrors = append(newErrors, fmt.Errorf("%s %w", r.Pairs[i], errSpecificPairNotEnabled))
|
||||
newErrors = common.AppendError(newErrors, fmt.Errorf("%s %w", r.Pairs[i], errSpecificPairNotEnabled))
|
||||
continue
|
||||
}
|
||||
return nil, err
|
||||
@@ -2092,7 +2092,7 @@ func (s *RPCServer) SetExchangePair(_ context.Context, r *gctrpc.SetExchangePair
|
||||
if exch.IsWebsocketEnabled() && pass && base.Websocket.IsConnected() {
|
||||
err = exch.FlushWebsocketChannels()
|
||||
if err != nil {
|
||||
newErrors = append(newErrors, err)
|
||||
newErrors = common.AppendError(newErrors, err)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -237,10 +237,10 @@ func (s *Service) Update(incoming *Holdings, creds *Credentials) error {
|
||||
s.exchangeAccounts[exch] = accounts
|
||||
}
|
||||
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
for x := range incoming.Accounts {
|
||||
if !incoming.Accounts[x].AssetType.IsValid() {
|
||||
errs = append(errs, fmt.Errorf("cannot load sub account holdings for %s [%s] %w",
|
||||
errs = common.AppendError(errs, fmt.Errorf("cannot load sub account holdings for %s [%s] %w",
|
||||
incoming.Accounts[x].ID,
|
||||
incoming.Accounts[x].AssetType,
|
||||
asset.ErrNotSupported))
|
||||
@@ -295,11 +295,7 @@ func (s *Service) Update(incoming *Holdings, creds *Credentials) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if errs != nil {
|
||||
return errs
|
||||
}
|
||||
|
||||
return nil
|
||||
return errs
|
||||
}
|
||||
|
||||
// load checks to see if there is a change from incoming balance, if there is a
|
||||
|
||||
@@ -1544,7 +1544,6 @@ func (b *Bitfinex) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription,
|
||||
|
||||
// Subscribe sends a websocket message to receive data from the channel
|
||||
func (b *Bitfinex) Subscribe(channelsToSubscribe []stream.ChannelSubscription) error {
|
||||
var errs common.Errors
|
||||
checksum := make(map[string]interface{})
|
||||
checksum["event"] = "conf"
|
||||
checksum["flags"] = bitfinexChecksumFlag + bitfinexWsSequenceFlag
|
||||
@@ -1553,6 +1552,7 @@ func (b *Bitfinex) Subscribe(channelsToSubscribe []stream.ChannelSubscription) e
|
||||
return err
|
||||
}
|
||||
|
||||
var errs error
|
||||
for i := range channelsToSubscribe {
|
||||
req := make(map[string]interface{})
|
||||
req["event"] = "subscribe"
|
||||
@@ -1564,20 +1564,17 @@ func (b *Bitfinex) Subscribe(channelsToSubscribe []stream.ChannelSubscription) e
|
||||
|
||||
err := b.Websocket.Conn.SendJSONMessage(req)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
b.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe[i])
|
||||
}
|
||||
if errs != nil {
|
||||
return errs
|
||||
}
|
||||
return nil
|
||||
return errs
|
||||
}
|
||||
|
||||
// Unsubscribe sends a websocket message to stop receiving data from the channel
|
||||
func (b *Bitfinex) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription) error {
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
for i := range channelsToUnsubscribe {
|
||||
req := make(map[string]interface{})
|
||||
req["event"] = "unsubscribe"
|
||||
@@ -1589,15 +1586,12 @@ func (b *Bitfinex) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscriptio
|
||||
|
||||
err := b.Websocket.Conn.SendJSONMessage(req)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
b.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
|
||||
}
|
||||
if errs != nil {
|
||||
return errs
|
||||
}
|
||||
return nil
|
||||
return errs
|
||||
}
|
||||
|
||||
// WsSendAuth sends a authenticated event payload
|
||||
|
||||
@@ -207,7 +207,7 @@ func (b *Bitstamp) generateDefaultSubscriptions() ([]stream.ChannelSubscription,
|
||||
|
||||
// Subscribe sends a websocket message to receive data from the channel
|
||||
func (b *Bitstamp) Subscribe(channelsToSubscribe []stream.ChannelSubscription) error {
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
for i := range channelsToSubscribe {
|
||||
req := websocketEventRequest{
|
||||
Event: "bts:subscribe",
|
||||
@@ -217,20 +217,17 @@ func (b *Bitstamp) Subscribe(channelsToSubscribe []stream.ChannelSubscription) e
|
||||
}
|
||||
err := b.Websocket.Conn.SendJSONMessage(req)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
b.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe[i])
|
||||
}
|
||||
if errs != nil {
|
||||
return errs
|
||||
}
|
||||
return nil
|
||||
return errs
|
||||
}
|
||||
|
||||
// Unsubscribe sends a websocket message to stop receiving data from the channel
|
||||
func (b *Bitstamp) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription) error {
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
for i := range channelsToUnsubscribe {
|
||||
req := websocketEventRequest{
|
||||
Event: "bts:unsubscribe",
|
||||
@@ -240,15 +237,12 @@ func (b *Bitstamp) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscriptio
|
||||
}
|
||||
err := b.Websocket.Conn.SendJSONMessage(req)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
b.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
|
||||
}
|
||||
if errs != nil {
|
||||
return errs
|
||||
}
|
||||
return nil
|
||||
return errs
|
||||
}
|
||||
|
||||
func (b *Bitstamp) wsUpdateOrderbook(update *websocketOrderBook, p currency.Pair, assetType asset.Item) error {
|
||||
|
||||
@@ -253,21 +253,18 @@ func (b *Bittrex) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription,
|
||||
// Subscribe sends a websocket message to receive data from the channel
|
||||
func (b *Bittrex) Subscribe(channelsToSubscribe []stream.ChannelSubscription) error {
|
||||
var x int
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
for x = 0; x+wsMessageRateLimit < len(channelsToSubscribe); x += wsMessageRateLimit {
|
||||
err := b.subscribeSlice(channelsToSubscribe[x : x+wsMessageRateLimit])
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
}
|
||||
}
|
||||
err := b.subscribeSlice(channelsToSubscribe[x:])
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
}
|
||||
if errs != nil {
|
||||
return errs
|
||||
}
|
||||
return nil
|
||||
return errs
|
||||
}
|
||||
|
||||
func (b *Bittrex) subscribeSlice(channelsToSubscribe []stream.ChannelSubscription) error {
|
||||
@@ -301,38 +298,32 @@ func (b *Bittrex) subscribeSlice(channelsToSubscribe []stream.ChannelSubscriptio
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
for i := range response.Response {
|
||||
if !response.Response[i].Success {
|
||||
errs = append(errs, errors.New("unable to subscribe to "+channels[i]+" - error code "+response.Response[i].ErrorCode))
|
||||
errs = common.AppendError(errs, errors.New("unable to subscribe to "+channels[i]+" - error code "+response.Response[i].ErrorCode))
|
||||
continue
|
||||
}
|
||||
b.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe[i])
|
||||
}
|
||||
if errs != nil {
|
||||
return errs
|
||||
}
|
||||
return nil
|
||||
return errs
|
||||
}
|
||||
|
||||
// Unsubscribe sends a websocket message to receive data from the channel
|
||||
func (b *Bittrex) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription) error {
|
||||
var x int
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
for x = 0; x+wsMessageRateLimit < len(channelsToUnsubscribe); x += wsMessageRateLimit {
|
||||
err := b.unsubscribeSlice(channelsToUnsubscribe[x : x+wsMessageRateLimit])
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
}
|
||||
}
|
||||
err := b.unsubscribeSlice(channelsToUnsubscribe[x:])
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
}
|
||||
if errs != nil {
|
||||
return errs
|
||||
}
|
||||
return nil
|
||||
return errs
|
||||
}
|
||||
|
||||
func (b *Bittrex) unsubscribeSlice(channelsToUnsubscribe []stream.ChannelSubscription) error {
|
||||
@@ -366,18 +357,15 @@ func (b *Bittrex) unsubscribeSlice(channelsToUnsubscribe []stream.ChannelSubscri
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
for i := range response.Response {
|
||||
if !response.Response[i].Success {
|
||||
errs = append(errs, errors.New("unable to unsubscribe from "+channels[i]+" - error code "+response.Response[i].ErrorCode))
|
||||
errs = common.AppendError(errs, errors.New("unable to unsubscribe from "+channels[i]+" - error code "+response.Response[i].ErrorCode))
|
||||
continue
|
||||
}
|
||||
b.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
|
||||
}
|
||||
if errs != nil {
|
||||
return errs
|
||||
}
|
||||
return nil
|
||||
return errs
|
||||
}
|
||||
|
||||
// wsReadData gets and passes on websocket messages for processing
|
||||
|
||||
@@ -115,7 +115,7 @@ func (by *Bybit) WsAuth(ctx context.Context) error {
|
||||
|
||||
// Subscribe sends a websocket message to receive data from the channel
|
||||
func (by *Bybit) Subscribe(channelsToSubscribe []stream.ChannelSubscription) error {
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
for i := range channelsToSubscribe {
|
||||
var subReq WsReq
|
||||
subReq.Topic = channelsToSubscribe[i].Channel
|
||||
@@ -123,7 +123,7 @@ func (by *Bybit) Subscribe(channelsToSubscribe []stream.ChannelSubscription) err
|
||||
|
||||
formattedPair, err := by.FormatExchangeCurrency(channelsToSubscribe[i].Currency, asset.Spot)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
if channelsToSubscribe[i].Channel == wsKlines {
|
||||
@@ -140,20 +140,17 @@ func (by *Bybit) Subscribe(channelsToSubscribe []stream.ChannelSubscription) err
|
||||
}
|
||||
err = by.Websocket.Conn.SendJSONMessage(subReq)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
by.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe[i])
|
||||
}
|
||||
if errs != nil {
|
||||
return errs
|
||||
}
|
||||
return nil
|
||||
return errs
|
||||
}
|
||||
|
||||
// Unsubscribe sends a websocket message to stop receiving data from the channel
|
||||
func (by *Bybit) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription) error {
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
|
||||
for i := range channelsToUnsubscribe {
|
||||
var unSub WsReq
|
||||
@@ -162,7 +159,7 @@ func (by *Bybit) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription)
|
||||
|
||||
formattedPair, err := by.FormatExchangeCurrency(channelsToUnsubscribe[i].Currency, asset.Spot)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
unSub.Parameters = WsParams{
|
||||
@@ -170,15 +167,12 @@ func (by *Bybit) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription)
|
||||
}
|
||||
err = by.Websocket.Conn.SendJSONMessage(unSub)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
by.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
|
||||
}
|
||||
if errs != nil {
|
||||
return errs
|
||||
}
|
||||
return nil
|
||||
return errs
|
||||
}
|
||||
|
||||
// wsReadData receives and passes on websocket messages for processing
|
||||
|
||||
@@ -121,7 +121,7 @@ func (by *Bybit) WsCoinAuth(ctx context.Context) error {
|
||||
|
||||
// SubscribeCoin sends a websocket message to receive data from the channel
|
||||
func (by *Bybit) SubscribeCoin(channelsToSubscribe []stream.ChannelSubscription) error {
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
for i := range channelsToSubscribe {
|
||||
var sub WsFuturesReq
|
||||
sub.Topic = subscribe
|
||||
@@ -129,7 +129,7 @@ func (by *Bybit) SubscribeCoin(channelsToSubscribe []stream.ChannelSubscription)
|
||||
sub.Args = append(sub.Args, formatArgs(channelsToSubscribe[i].Channel, channelsToSubscribe[i].Params))
|
||||
err := by.Websocket.Conn.SendJSONMessage(sub)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
by.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe[i])
|
||||
@@ -150,29 +150,25 @@ func formatArgs(channel string, params map[string]interface{}) string {
|
||||
|
||||
// UnsubscribeCoin sends a websocket message to stop receiving data from the channel
|
||||
func (by *Bybit) UnsubscribeCoin(channelsToUnsubscribe []stream.ChannelSubscription) error {
|
||||
var errs common.Errors
|
||||
|
||||
var errs error
|
||||
for i := range channelsToUnsubscribe {
|
||||
var unSub WsFuturesReq
|
||||
unSub.Topic = unsubscribe
|
||||
|
||||
formattedPair, err := by.FormatExchangeCurrency(channelsToUnsubscribe[i].Currency, asset.CoinMarginedFutures)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
unSub.Args = append(unSub.Args, channelsToUnsubscribe[i].Channel+dot+formattedPair.String())
|
||||
err = by.Websocket.Conn.SendJSONMessage(unSub)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
by.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
|
||||
}
|
||||
if errs != nil {
|
||||
return errs
|
||||
}
|
||||
return nil
|
||||
return errs
|
||||
}
|
||||
|
||||
// wsCoinReadData gets and passes on websocket messages for processing
|
||||
|
||||
@@ -84,7 +84,7 @@ func (by *Bybit) WsFuturesAuth(ctx context.Context) error {
|
||||
|
||||
// SubscribeFutures sends a websocket message to receive data from the channel
|
||||
func (by *Bybit) SubscribeFutures(channelsToSubscribe []stream.ChannelSubscription) error {
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
for i := range channelsToSubscribe {
|
||||
var sub WsFuturesReq
|
||||
sub.Topic = subscribe
|
||||
@@ -92,42 +92,35 @@ func (by *Bybit) SubscribeFutures(channelsToSubscribe []stream.ChannelSubscripti
|
||||
sub.Args = append(sub.Args, formatArgs(channelsToSubscribe[i].Channel, channelsToSubscribe[i].Params))
|
||||
err := by.Websocket.Conn.SendJSONMessage(sub)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
by.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe[i])
|
||||
}
|
||||
if errs != nil {
|
||||
return errs
|
||||
}
|
||||
return nil
|
||||
return errs
|
||||
}
|
||||
|
||||
// UnsubscribeFutures sends a websocket message to stop receiving data from the channel
|
||||
func (by *Bybit) UnsubscribeFutures(channelsToUnsubscribe []stream.ChannelSubscription) error {
|
||||
var errs common.Errors
|
||||
|
||||
var errs error
|
||||
for i := range channelsToUnsubscribe {
|
||||
var unSub WsFuturesReq
|
||||
unSub.Topic = unsubscribe
|
||||
|
||||
formattedPair, err := by.FormatExchangeCurrency(channelsToUnsubscribe[i].Currency, asset.Futures)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
unSub.Args = append(unSub.Args, channelsToUnsubscribe[i].Channel+dot+formattedPair.String())
|
||||
err = by.Websocket.Conn.SendJSONMessage(unSub)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
by.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
|
||||
}
|
||||
if errs != nil {
|
||||
return errs
|
||||
}
|
||||
return nil
|
||||
return errs
|
||||
}
|
||||
|
||||
// wsFuturesReadData gets and passes on websocket messages for processing
|
||||
|
||||
@@ -86,7 +86,7 @@ func (by *Bybit) WsUSDTAuth(ctx context.Context) error {
|
||||
|
||||
// SubscribeUSDT sends a websocket message to receive data from the channel
|
||||
func (by *Bybit) SubscribeUSDT(channelsToSubscribe []stream.ChannelSubscription) error {
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
for i := range channelsToSubscribe {
|
||||
var sub WsFuturesReq
|
||||
sub.Topic = subscribe
|
||||
@@ -94,7 +94,7 @@ func (by *Bybit) SubscribeUSDT(channelsToSubscribe []stream.ChannelSubscription)
|
||||
sub.Args = append(sub.Args, formatArgs(channelsToSubscribe[i].Channel, channelsToSubscribe[i].Params))
|
||||
err := by.Websocket.Conn.SendJSONMessage(sub)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
by.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe[i])
|
||||
@@ -107,21 +107,20 @@ func (by *Bybit) SubscribeUSDT(channelsToSubscribe []stream.ChannelSubscription)
|
||||
|
||||
// UnsubscribeUSDT sends a websocket message to stop receiving data from the channel
|
||||
func (by *Bybit) UnsubscribeUSDT(channelsToUnsubscribe []stream.ChannelSubscription) error {
|
||||
var errs common.Errors
|
||||
|
||||
var errs error
|
||||
for i := range channelsToUnsubscribe {
|
||||
var unSub WsFuturesReq
|
||||
unSub.Topic = unsubscribe
|
||||
|
||||
formattedPair, err := by.FormatExchangeCurrency(channelsToUnsubscribe[i].Currency, asset.USDTMarginedFutures)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
unSub.Args = append(unSub.Args, channelsToUnsubscribe[i].Channel+dot+formattedPair.String())
|
||||
err = by.Websocket.Conn.SendJSONMessage(unSub)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
by.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
|
||||
|
||||
@@ -616,11 +616,11 @@ func (c *COINUT) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription, e
|
||||
|
||||
// Subscribe sends a websocket message to receive data from the channel
|
||||
func (c *COINUT) Subscribe(channelsToSubscribe []stream.ChannelSubscription) error {
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
for i := range channelsToSubscribe {
|
||||
fpair, err := c.FormatExchangeCurrency(channelsToSubscribe[i].Currency, asset.Spot)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -632,7 +632,7 @@ func (c *COINUT) Subscribe(channelsToSubscribe []stream.ChannelSubscription) err
|
||||
}
|
||||
err = c.Websocket.Conn.SendJSONMessage(subscribe)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
c.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe[i])
|
||||
@@ -645,11 +645,11 @@ func (c *COINUT) Subscribe(channelsToSubscribe []stream.ChannelSubscription) err
|
||||
|
||||
// Unsubscribe sends a websocket message to stop receiving data from the channel
|
||||
func (c *COINUT) Unsubscribe(channelToUnsubscribe []stream.ChannelSubscription) error {
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
for i := range channelToUnsubscribe {
|
||||
fpair, err := c.FormatExchangeCurrency(channelToUnsubscribe[i].Currency, asset.Spot)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -662,32 +662,29 @@ func (c *COINUT) Unsubscribe(channelToUnsubscribe []stream.ChannelSubscription)
|
||||
resp, err := c.Websocket.Conn.SendMessageReturnResponse(subscribe.Nonce,
|
||||
subscribe)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
var response map[string]interface{}
|
||||
err = json.Unmarshal(resp, &response)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
|
||||
val, ok := response["status"].([]interface{})
|
||||
if !ok {
|
||||
errs = append(errs, errors.New("unable to type assert response status"))
|
||||
errs = common.AppendError(errs, errors.New("unable to type assert response status"))
|
||||
}
|
||||
if val[0] != "OK" {
|
||||
errs = append(errs, fmt.Errorf("%v unsubscribe failed for channel %v",
|
||||
errs = common.AppendError(errs, fmt.Errorf("%v unsubscribe failed for channel %v",
|
||||
c.Name,
|
||||
channelToUnsubscribe[i].Channel))
|
||||
continue
|
||||
}
|
||||
c.Websocket.RemoveSuccessfulUnsubscriptions(channelToUnsubscribe[i])
|
||||
}
|
||||
if errs != nil {
|
||||
return errs
|
||||
}
|
||||
return nil
|
||||
return errs
|
||||
}
|
||||
|
||||
func (c *COINUT) wsAuthenticate(ctx context.Context) error {
|
||||
|
||||
@@ -541,31 +541,28 @@ func (g *Gateio) Subscribe(channelsToSubscribe []stream.ChannelSubscription) err
|
||||
return err
|
||||
}
|
||||
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
for k := range payloads {
|
||||
resp, err := g.Websocket.Conn.SendMessageReturnResponse(payloads[k].ID, payloads[k])
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
var response WebsocketAuthenticationResponse
|
||||
err = json.Unmarshal(resp, &response)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
if response.Result.Status != "success" {
|
||||
errs = append(errs, fmt.Errorf("%v could not subscribe to %v",
|
||||
errs = common.AppendError(errs, fmt.Errorf("%v could not subscribe to %v",
|
||||
g.Name,
|
||||
payloads[k].Method))
|
||||
continue
|
||||
}
|
||||
g.Websocket.AddSuccessfulSubscriptions(payloads[k].Channels...)
|
||||
}
|
||||
if errs != nil {
|
||||
return errs
|
||||
}
|
||||
return nil
|
||||
return errs
|
||||
}
|
||||
|
||||
func (g *Gateio) generatePayload(channelsToSubscribe []stream.ChannelSubscription) ([]WebsocketRequest, error) {
|
||||
|
||||
@@ -498,7 +498,7 @@ func (h *HitBTC) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription, e
|
||||
|
||||
// Subscribe sends a websocket message to receive data from the channel
|
||||
func (h *HitBTC) Subscribe(channelsToSubscribe []stream.ChannelSubscription) error {
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
for i := range channelsToSubscribe {
|
||||
subscribe := WsRequest{
|
||||
Method: channelsToSubscribe[i].Channel,
|
||||
@@ -517,7 +517,7 @@ func (h *HitBTC) Subscribe(channelsToSubscribe []stream.ChannelSubscription) err
|
||||
|
||||
err := h.Websocket.Conn.SendJSONMessage(subscribe)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
h.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe[i])
|
||||
@@ -530,7 +530,7 @@ func (h *HitBTC) Subscribe(channelsToSubscribe []stream.ChannelSubscription) err
|
||||
|
||||
// Unsubscribe sends a websocket message to stop receiving data from the channel
|
||||
func (h *HitBTC) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription) error {
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
for i := range channelsToUnsubscribe {
|
||||
unsubscribeChannel := strings.Replace(channelsToUnsubscribe[i].Channel,
|
||||
"subscribe",
|
||||
@@ -552,7 +552,7 @@ func (h *HitBTC) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription)
|
||||
|
||||
err := h.Websocket.Conn.SendJSONMessage(unsubscribe)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
h.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
|
||||
|
||||
@@ -553,7 +553,7 @@ func (h *HUOBI) Subscribe(channelsToSubscribe []stream.ChannelSubscription) erro
|
||||
return err
|
||||
}
|
||||
}
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
for i := range channelsToSubscribe {
|
||||
if (strings.Contains(channelsToSubscribe[i].Channel, "orders.") ||
|
||||
strings.Contains(channelsToSubscribe[i].Channel, "accounts")) && creds != nil {
|
||||
@@ -562,7 +562,7 @@ func (h *HUOBI) Subscribe(channelsToSubscribe []stream.ChannelSubscription) erro
|
||||
wsAccountsOrdersEndPoint+channelsToSubscribe[i].Channel,
|
||||
channelsToSubscribe[i].Channel)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
h.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe[i])
|
||||
@@ -572,7 +572,7 @@ func (h *HUOBI) Subscribe(channelsToSubscribe []stream.ChannelSubscription) erro
|
||||
Subscribe: channelsToSubscribe[i].Channel,
|
||||
})
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
h.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe[i])
|
||||
@@ -593,7 +593,7 @@ func (h *HUOBI) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription)
|
||||
return err
|
||||
}
|
||||
}
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
for i := range channelsToUnsubscribe {
|
||||
if (strings.Contains(channelsToUnsubscribe[i].Channel, "orders.") ||
|
||||
strings.Contains(channelsToUnsubscribe[i].Channel, "accounts")) && creds != nil {
|
||||
@@ -602,7 +602,7 @@ func (h *HUOBI) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription)
|
||||
wsAccountsOrdersEndPoint+channelsToUnsubscribe[i].Channel,
|
||||
channelsToUnsubscribe[i].Channel)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
h.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
|
||||
@@ -612,7 +612,7 @@ func (h *HUOBI) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription)
|
||||
Unsubscribe: channelsToUnsubscribe[i].Channel,
|
||||
})
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
h.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
|
||||
|
||||
@@ -372,15 +372,15 @@ func (k *Kraken) GetTrades(ctx context.Context, symbol currency.Pair) ([]RecentT
|
||||
}
|
||||
|
||||
if len(data.Error) > 0 {
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
for x := range data.Error {
|
||||
errString, ok := data.Error[x].(string)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
errs = append(errs, errors.New(errString))
|
||||
errs = common.AppendError(errs, errors.New(errString))
|
||||
}
|
||||
if len(errs) > 0 {
|
||||
if errs != nil {
|
||||
return nil, errs
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1264,13 +1264,13 @@ channels:
|
||||
*s = append(*s, outbound)
|
||||
}
|
||||
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
for subType, subs := range subscriptions {
|
||||
for i := range *subs {
|
||||
if common.StringDataContains(authenticatedChannels, (*subs)[i].Subscription.Name) {
|
||||
_, err := k.Websocket.AuthConn.SendMessageReturnResponse((*subs)[i].RequestID, (*subs)[i])
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
k.Websocket.AddSuccessfulSubscriptions((*subs)[i].Channels...)
|
||||
@@ -1285,16 +1285,13 @@ channels:
|
||||
}
|
||||
_, err := k.Websocket.Conn.SendMessageReturnResponse((*subs)[i].RequestID, (*subs)[i])
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
k.Websocket.AddSuccessfulSubscriptions((*subs)[i].Channels...)
|
||||
}
|
||||
}
|
||||
if errs != nil {
|
||||
return errs
|
||||
}
|
||||
return nil
|
||||
return errs
|
||||
}
|
||||
|
||||
// Unsubscribe sends a websocket message to stop receiving data from the channel
|
||||
@@ -1339,12 +1336,12 @@ channels:
|
||||
unsubs = append(unsubs, unsub)
|
||||
}
|
||||
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
for i := range unsubs {
|
||||
if common.StringDataContains(authenticatedChannels, unsubs[i].Subscription.Name) {
|
||||
_, err := k.Websocket.AuthConn.SendMessageReturnResponse(unsubs[i].RequestID, unsubs[i])
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
k.Websocket.RemoveSuccessfulUnsubscriptions(unsubs[i].Channels...)
|
||||
@@ -1353,15 +1350,12 @@ channels:
|
||||
|
||||
_, err := k.Websocket.Conn.SendMessageReturnResponse(unsubs[i].RequestID, unsubs[i])
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
k.Websocket.RemoveSuccessfulUnsubscriptions(unsubs[i].Channels...)
|
||||
}
|
||||
if errs != nil {
|
||||
return errs
|
||||
}
|
||||
return nil
|
||||
return errs
|
||||
}
|
||||
|
||||
// wsAddOrder creates an order, returned order ID if success
|
||||
|
||||
@@ -463,9 +463,9 @@ func (ok *Okx) PlaceMultipleOrders(ctx context.Context, args []PlaceOrderRequest
|
||||
if len(resp) == 0 {
|
||||
return nil, err
|
||||
}
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
for x := range resp {
|
||||
errs = append(errs, fmt.Errorf("error code:%s message: %v", resp[x].SCode, resp[x].SMessage))
|
||||
errs = common.AppendError(errs, fmt.Errorf("error code:%s message: %v", resp[x].SCode, resp[x].SMessage))
|
||||
}
|
||||
return nil, errs
|
||||
}
|
||||
@@ -513,10 +513,10 @@ func (ok *Okx) CancelMultipleOrders(ctx context.Context, args []CancelOrderReque
|
||||
if len(resp) == 0 {
|
||||
return nil, err
|
||||
}
|
||||
errs := common.Errors{}
|
||||
var errs error
|
||||
for x := range resp {
|
||||
if resp[x].SCode != "0" {
|
||||
errs = append(errs, fmt.Errorf("error code:%s message: %v", resp[x].SCode, resp[x].SMessage))
|
||||
errs = common.AppendError(errs, fmt.Errorf("error code:%s message: %v", resp[x].SCode, resp[x].SMessage))
|
||||
}
|
||||
}
|
||||
return nil, errs
|
||||
|
||||
@@ -1384,10 +1384,10 @@ func (ok *Okx) WsPlaceMultipleOrder(args []PlaceOrderRequestParam) ([]OrderData,
|
||||
if len(data.Data) == 0 {
|
||||
return nil, fmt.Errorf("error code:%s message: %v", data.Code, ErrorCodes[data.Code])
|
||||
}
|
||||
errs := common.Errors{}
|
||||
var errs error
|
||||
for x := range resp.Data {
|
||||
if resp.Data[x].SCode != "0" {
|
||||
errs = append(errs, fmt.Errorf("error code:%s message: %s", resp.Data[x].SCode, resp.Data[x].SMessage))
|
||||
errs = common.AppendError(errs, fmt.Errorf("error code:%s message: %s", resp.Data[x].SCode, resp.Data[x].SMessage))
|
||||
}
|
||||
}
|
||||
return nil, errs
|
||||
@@ -1514,10 +1514,10 @@ func (ok *Okx) WsCancelMultipleOrder(args []CancelOrderRequestParam) ([]OrderDat
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
errs := common.Errors{}
|
||||
var errs error
|
||||
for x := range resp.Data {
|
||||
if resp.Data[x].SCode != "0" {
|
||||
errs = append(errs, fmt.Errorf("error code:%s message: %v", resp.Data[x].SCode, resp.Data[x].SMessage))
|
||||
errs = common.AppendError(errs, fmt.Errorf("error code:%s message: %v", resp.Data[x].SCode, resp.Data[x].SMessage))
|
||||
}
|
||||
}
|
||||
return nil, errs
|
||||
@@ -1652,10 +1652,10 @@ func (ok *Okx) WsAmendMultipleOrders(args []AmendOrderRequestParams) ([]OrderDat
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
errs := common.Errors{}
|
||||
var errs error
|
||||
for x := range resp.Data {
|
||||
if resp.Data[x].SCode != "0" {
|
||||
errs = append(errs, fmt.Errorf("error code:%s message: %v", resp.Data[x].SCode, resp.Data[x].SMessage))
|
||||
errs = common.AppendError(errs, fmt.Errorf("error code:%s message: %v", resp.Data[x].SCode, resp.Data[x].SMessage))
|
||||
}
|
||||
}
|
||||
return nil, errs
|
||||
|
||||
@@ -1109,18 +1109,14 @@ func (c *Cancel) Validate(opt ...validate.Checker) error {
|
||||
return ErrCancelOrderIsNil
|
||||
}
|
||||
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
for _, o := range opt {
|
||||
err := o.Check()
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
}
|
||||
}
|
||||
|
||||
if errs != nil {
|
||||
return errs
|
||||
}
|
||||
return nil
|
||||
return errs
|
||||
}
|
||||
|
||||
// Validate checks internal struct requirements and returns filter requirement
|
||||
@@ -1142,17 +1138,14 @@ func (g *GetOrdersRequest) Validate(opt ...validate.Checker) error {
|
||||
return errUnrecognisedOrderType
|
||||
}
|
||||
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
for _, o := range opt {
|
||||
err := o.Check()
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
}
|
||||
}
|
||||
if errs != nil {
|
||||
return errs
|
||||
}
|
||||
return nil
|
||||
return errs
|
||||
}
|
||||
|
||||
// Filter reduces slice by optional fields
|
||||
@@ -1183,14 +1176,13 @@ func (m *Modify) Validate(opt ...validate.Checker) error {
|
||||
return ErrAssetNotSet
|
||||
}
|
||||
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
for _, o := range opt {
|
||||
err := o.Check()
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
}
|
||||
}
|
||||
|
||||
if errs != nil {
|
||||
return errs
|
||||
}
|
||||
|
||||
@@ -549,7 +549,7 @@ func (p *Poloniex) Subscribe(sub []stream.ChannelSubscription) error {
|
||||
return err
|
||||
}
|
||||
}
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
channels:
|
||||
for i := range sub {
|
||||
subscriptionRequest := WsCommand{
|
||||
@@ -560,7 +560,7 @@ channels:
|
||||
sub[i].Channel) && creds != nil:
|
||||
err := p.wsSendAuthorisedCommand(creds.Secret, creds.Key, "subscribe")
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue channels
|
||||
}
|
||||
p.Websocket.AddSuccessfulSubscriptions(sub[i])
|
||||
@@ -574,7 +574,7 @@ channels:
|
||||
|
||||
err := p.Websocket.Conn.SendJSONMessage(subscriptionRequest)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -596,7 +596,7 @@ func (p *Poloniex) Unsubscribe(unsub []stream.ChannelSubscription) error {
|
||||
return err
|
||||
}
|
||||
}
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
channels:
|
||||
for i := range unsub {
|
||||
unsubscriptionRequest := WsCommand{
|
||||
@@ -607,7 +607,7 @@ channels:
|
||||
unsub[i].Channel) && creds != nil:
|
||||
err := p.wsSendAuthorisedCommand(creds.Secret, creds.Key, "unsubscribe")
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue channels
|
||||
}
|
||||
p.Websocket.RemoveSuccessfulUnsubscriptions(unsub[i])
|
||||
@@ -620,7 +620,7 @@ channels:
|
||||
}
|
||||
err := p.Websocket.Conn.SendJSONMessage(unsubscriptionRequest)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
p.Websocket.RemoveSuccessfulUnsubscriptions(unsub[i])
|
||||
|
||||
@@ -65,7 +65,6 @@ func AddTradesToBuffer(exchangeName string, data ...Data) error {
|
||||
if len(data) == 0 {
|
||||
return nil
|
||||
}
|
||||
var errs common.Errors
|
||||
if atomic.AddInt32(&processor.started, 0) == 0 {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
@@ -73,13 +72,14 @@ func AddTradesToBuffer(exchangeName string, data ...Data) error {
|
||||
wg.Wait()
|
||||
}
|
||||
validDatas := make([]Data, 0, len(data))
|
||||
var errs error
|
||||
for i := range data {
|
||||
if data[i].Price == 0 ||
|
||||
data[i].Amount == 0 ||
|
||||
data[i].CurrencyPair.IsEmpty() ||
|
||||
data[i].Exchange == "" ||
|
||||
data[i].Timestamp.IsZero() {
|
||||
errs = append(errs, fmt.Errorf("%v received invalid trade data: %+v", exchangeName, data[i]))
|
||||
errs = common.AppendError(errs, fmt.Errorf("%v received invalid trade data: %+v", exchangeName, data[i]))
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -99,7 +99,7 @@ func AddTradesToBuffer(exchangeName string, data ...Data) error {
|
||||
}
|
||||
uu, err := uuid.NewV4()
|
||||
if err != nil {
|
||||
errs = append(errs, fmt.Errorf("%s uuid failed to generate for trade: %+v", exchangeName, data[i]))
|
||||
errs = common.AppendError(errs, fmt.Errorf("%s uuid failed to generate for trade: %+v", exchangeName, data[i]))
|
||||
}
|
||||
data[i].ID = uu
|
||||
validDatas = append(validDatas, data[i])
|
||||
@@ -107,10 +107,7 @@ func AddTradesToBuffer(exchangeName string, data ...Data) error {
|
||||
processor.mutex.Lock()
|
||||
processor.buffer = append(processor.buffer, validDatas...)
|
||||
processor.mutex.Unlock()
|
||||
if len(errs) > 0 {
|
||||
return errs
|
||||
}
|
||||
return nil
|
||||
return errs
|
||||
}
|
||||
|
||||
// Run will save trade data to the database in batches
|
||||
|
||||
@@ -299,7 +299,7 @@ func (z *ZB) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription, error
|
||||
|
||||
// Subscribe sends a websocket message to receive data from the channel
|
||||
func (z *ZB) Subscribe(channelsToSubscribe []stream.ChannelSubscription) error {
|
||||
var errs common.Errors
|
||||
var errs error
|
||||
for i := range channelsToSubscribe {
|
||||
subscriptionRequest := Subscription{
|
||||
Event: zWebsocketAddChannel,
|
||||
@@ -307,7 +307,7 @@ func (z *ZB) Subscribe(channelsToSubscribe []stream.ChannelSubscription) error {
|
||||
}
|
||||
err := z.Websocket.Conn.SendJSONMessage(subscriptionRequest)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
errs = common.AppendError(errs, err)
|
||||
continue
|
||||
}
|
||||
z.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe[i])
|
||||
|
||||
Reference in New Issue
Block a user