Files
gocryptotrader/engine/rpcserver.go
Scott 80bc8c7e9e Trade history, recent trades, live trade processing and storage (#558)
* End of day commit moving packages and setting foundation into how trade processing will go

* Conformity

* tdd candle generation based on received trade data, renames orderbookbuffer back to buffer for now...

* Formalises test functions and designs the trade processor

* Theoretical amending old candles to allow any trades that were part of an old processed candle to be more accurate. Saving of candles will only occur on previous cycles, extending memory usage a bit longer

* Figures out sqlboiler for sqlite. Updates websocket entries to process trade data

* One more trade data

* Adds more exchange support

* Adds PSQL stuff

* Begins creating sql implementation

* End of day commit. Helper functions and understanding sql usage in GCT

* Adds delete and cleans up table design

* Finishes trades conceptually. Awaits candle data update in order to translate trades to candles

* Initial handling of trades in coinbene

* Proto

* Fixing of some bugs, attempting to address coinbene asset type ws issues

* Fixes up coinbene websocket implementation for the most part

* finalises coinbene websocket implementation. Adds new ability to parse currencies without a delimiter

* Implements rpc commands and adds testing

* updates the following to be compatible with trade data update: Theoretical amending old candles to allow any trades that were part of an old processed candle to be more accurate. Saving of candles will only occur on previous cycles, extending memory usage a bit longer

* Changes trade to be its own entity rather than attached to a websocket.

* Adds coverage to trades. Changes signature of `AddTradesToBuffer` to return error. Now automatically shuts down without need for channel listening. Will automatically start up again if it gets data

* Implements trade fetching at the wrapper level for a bunch of exchanges. Adds trade id to script updoot. Probably breaking change

* Implements trade fetching for all wrappers hurray hurrah. Updates all the tests

* Adds new interface func to get recent trades. Ensures GetExchangeHistory continues until conditions are met

* Adds new readme, tests all new wrapper endpoints, updates exchange_wrapper_issues to test new endpoints. Updates exchange_wrapper_coverage with new coverage... Fixes lame bug causing wrapper tests to fail from being poorly setup. Adds loopy loop to ensure that all data is captured when requesting exchange history

* Bugfix on psql migrations. Rebases latest changes, updates table design to use base and quote, updates trades to use exchange_name_id

* Adds new config field for saving trades to the database per exchange. Now exits trade processing when trade saving is not enabled. Similarly for wrapper, does not save if not enabled

* Minor bitfinex trade fixes. continues on buffer processing errors, now saves transactionid to the db

* Adds support for generating candles from candlesextended. May extend it further, idk

* Updates trade candles to be able to fill missing data with trades. Adds more tests. Also does a thing where you can forcefully override a candle based on internal trade data instead of API data

* Fixes bug where force deletions did not follow up with insertions. Adds force to candle commands

* Fixes specific exchange based issues. Extends recent trades to 24 hours where possible

* Fixes issue with saved tests. Fixes tests for trades. Adds parallel to tests. Pre-fixes people's nits

* Adds new GRPC functions to find out what data is missing from trades and candles. Fixes some assumptions from missing period code.

* Adds unique constraint. Fixes up niggling issues for wrappers and websockets

* Fixes issues with using unix times in the database trying to retrieve data via the CLI. Reduces save time to 15 seconds

* Updates trades to use timestamps instead of int64 unix

* Adds missing FTX wrapper implementation. Regens docs

* Linting the linters. Updating readme

* Adds new command to set whether an exchange can process trades

* Doc update

* Adds recent trades and historic trade endpoints to grpc

* formats pair_test.go to appease linter gods

* Addresses data race. Removes logging of missing intervals on unrelated function (now that it has its own rpc command). The buffer time isnt customisable, but I don't feel it needs to be at a config level at all really.

* Fixes a few niterinos regarding spacing, type conversion, a weird Bitmex 0 trade value error, unsubscriptions and cli command references

* Reduces map lookups. Adds base func and moves wrappers to use it

* Uses better currency formatter. Adds time based validation to trade history. Reverts configtest.json

* Reverts config and updates test names. Also WAYYYYY LESS SPAMMY

* oopsie doopsie missed a whoopsie

* mint flavoured lint

* Fixes issues caused by rebase

* Fixes issue with timestamps not converting properly from command to RPCServer. Adds new error type. Adds shorthand entries to some commands. Removes os.Exit from tests. Makes Gemini test rolling. Adds enabled exchange check to RPC function. Escapes timestamp on bitstamp. Renames var

* fixes whoopsie oopsie doopsie I forgot to remove code shoopsie

* missed a line

* 🎉 🎉 :tada:Breaks everything in an end of day commit 🎉 🎉 🎉

* Modifies function 'createlocaloffset' to return a string instead. Uses strings for all time based start and end commands. Uses UTC times in RPC server and updates SQLITE to use formatted time based queries

* Adds concurrency-safe way of changing SaveTradeData and checking it. Fixes embarrassing typo

* End of day fix, adds bitfinex update to loop until either the return trades shows no new dates, or meets specifications. Fixes egregious typo

* Improves testing and handling of historical trades function

* Fixes tests after latest changes

* Fix potential fatal err now that db is enabled in test config now

* Fixes up some database settings to use a local engine instead of global var

* DELICIOUS LINT CHOCOLATE FIXES

* Fixes data race by slashing competitor's tyres

* Adds mock test fixes to allow for live and stored data test

* Removes verbosity in engine level tests. Adds new timezone format to highlight the timezone for RPC functions. Removes reference to Preix index fund

* Oopsie doopsie, fixed a whoopsie

* Loggers can no longer do data drag races on my lawn 👴

* Removes bad lock

* Addresses command nits. End of day conceptual commit, trying to calculate spans of time in the context of missing periods. Tests will fail

* Adds new stream response for retrieving trade history as it can take time to do. Unsuccessfully attempts to simplify time range calculation for missing trades response

* Adds new timeperiods package to calculate time periods, time ranges and whether data is in those ranges. Removes kline basic implementation of same concept

* Fixes lint issues. Fixes test. Moves trade cli commands to their own trade subcommands

* Updates lakebtc to no longer have gethistorictrades as it is unsupported. Adds more validation to rpc functions

* Removes requirement to have trades when testing trade wrapper functions. Doesn't really prove it works if there are no trades for a given currency in a time period.

* Addresses nits, runs linting fix and ensures a test is consistent

* Fix merge issues

* Moves sort to timeperiods. Adds test coverage. Fixes typo

* Removes log package in CLI

* Fixes `GetTrades` url

* Reorders all instances of validation occuring after settingup RPC connection

* Fixes test to ensure that it is setup before testing that it is setup

* Fixed issue with bool retrieval. Removes double append

* Fixes Binance times, fixes bitfinex sell sides, fixes huobi times, sorts all responses

* Fixes poloniex trade id consistency. Makes recent trade for poloniex consistent with others (15 minutes). Fixes coinbene. Fixes localbitcoins to use quote currency. Fixes coinut times. Updates huobi trade id, saves okgroup trades. Fixes bid and ask to buy and sell

* Removes websocket trades for lakebtc as it did not meet our requirements for processing. Adds new constraints to the database to ensure we have uniqueness on trades where ID doesn't exist and doesn't trigger errors for trades where the tid does

* Fixes migration for postgres to downscale properly

* Really really fixes the psql index changes

* Fixes broken tests

* Now with working tests and no pocket lint

* Makes the side column nullable with no more constraint for it. adds migrations and runs generation. comments lakebtc

* Lint & Sprüngli

* Updates zb to use more appropriate side

* Fixes oopsie

* Attempts to address a data race from globals

* Fixes build

* Fixes missed regen rpc files

* Updates readme to point to trade readme. Fixes exchange_wrapper_coverage wrapper count and untested panics, tests bitfinex funding pair test for `fUSD`, adds shiny new param `tradeprocessinginterval`

* mint flavoured lint

* Uses the real default to set the default value by default

* Fixes some extra tests surrounding email sending and number incompatibility

* Reverts test config

* re-adds gom2/usdt currency

* Fixes typo, don't look!

* Fixes minor codelingo pickups

* Adds more precision to handling of trade data from Kraken. Expands test

* interface christmas tree

* lint
2020-10-29 13:00:02 +11:00

2880 lines
82 KiB
Go

package engine
import (
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/gofrs/uuid"
"github.com/golang/protobuf/ptypes"
grpcauth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
grpcruntime "github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/common/crypto"
"github.com/thrasher-corp/gocryptotrader/common/file"
"github.com/thrasher-corp/gocryptotrader/common/file/archive"
"github.com/thrasher-corp/gocryptotrader/common/timeperiods"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/database"
"github.com/thrasher-corp/gocryptotrader/database/models/postgres"
"github.com/thrasher-corp/gocryptotrader/database/models/sqlite3"
"github.com/thrasher-corp/gocryptotrader/database/repository/audit"
exchangeDB "github.com/thrasher-corp/gocryptotrader/database/repository/exchange"
"github.com/thrasher-corp/gocryptotrader/exchanges/account"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
"github.com/thrasher-corp/gocryptotrader/exchanges/trade"
"github.com/thrasher-corp/gocryptotrader/gctrpc"
"github.com/thrasher-corp/gocryptotrader/gctrpc/auth"
gctscript "github.com/thrasher-corp/gocryptotrader/gctscript/vm"
"github.com/thrasher-corp/gocryptotrader/log"
"github.com/thrasher-corp/gocryptotrader/portfolio"
"github.com/thrasher-corp/gocryptotrader/portfolio/banking"
"github.com/thrasher-corp/gocryptotrader/portfolio/withdraw"
"github.com/thrasher-corp/gocryptotrader/utils"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
)
const (
errExchangeNameUnset = "exchange name unset"
errCurrencyPairUnset = "currency pair unset"
errStartEndTimesUnset = "invalid start and end times"
errAssetTypeUnset = "asset type unset"
errDispatchSystem = "dispatch system offline"
invalidArguments = "invalid arguments received"
)
var (
errExchangeNotLoaded = errors.New("exchange is not loaded/doesn't exist")
errExchangeBaseNotFound = errors.New("cannot get exchange base")
errInvalidArguments = errors.New(invalidArguments)
)
// RPCServer struct
type RPCServer struct {
*Engine
}
func (bot *Engine) authenticateClient(ctx context.Context) (context.Context, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return ctx, fmt.Errorf("unable to extract metadata")
}
authStr, ok := md["authorization"]
if !ok {
return ctx, fmt.Errorf("authorization header missing")
}
if !strings.Contains(authStr[0], "Basic") {
return ctx, fmt.Errorf("basic not found in authorization header")
}
decoded, err := crypto.Base64Decode(strings.Split(authStr[0], " ")[1])
if err != nil {
return ctx, fmt.Errorf("unable to base64 decode authorization header")
}
username := strings.Split(string(decoded), ":")[0]
password := strings.Split(string(decoded), ":")[1]
if username != bot.Config.RemoteControl.Username || password != bot.Config.RemoteControl.Password {
return ctx, fmt.Errorf("username/password mismatch")
}
return ctx, nil
}
// StartRPCServer starts a gRPC server with TLS auth
func StartRPCServer(engine *Engine) {
targetDir := utils.GetTLSDir(engine.Settings.DataDir)
err := checkCerts(targetDir)
if err != nil {
log.Errorf(log.GRPCSys, "gRPC checkCerts failed. err: %s\n", err)
return
}
log.Debugf(log.GRPCSys, "gRPC server support enabled. Starting gRPC server on https://%v.\n", engine.Config.RemoteControl.GRPC.ListenAddress)
lis, err := net.Listen("tcp", engine.Config.RemoteControl.GRPC.ListenAddress)
if err != nil {
log.Errorf(log.GRPCSys, "gRPC server failed to bind to port: %s", err)
return
}
creds, err := credentials.NewServerTLSFromFile(filepath.Join(targetDir, "cert.pem"), filepath.Join(targetDir, "key.pem"))
if err != nil {
log.Errorf(log.GRPCSys, "gRPC server could not load TLS keys: %s\n", err)
return
}
opts := []grpc.ServerOption{
grpc.Creds(creds),
grpc.UnaryInterceptor(grpcauth.UnaryServerInterceptor(engine.authenticateClient)),
}
server := grpc.NewServer(opts...)
s := RPCServer{engine}
gctrpc.RegisterGoCryptoTraderServer(server, &s)
go func() {
if err := server.Serve(lis); err != nil {
log.Errorf(log.GRPCSys, "gRPC server failed to serve: %s\n", err)
return
}
}()
log.Debugln(log.GRPCSys, "gRPC server started!")
if s.Settings.EnableGRPCProxy {
s.StartRPCRESTProxy()
}
}
// StartRPCRESTProxy starts a gRPC proxy
func (s *RPCServer) StartRPCRESTProxy() {
log.Debugf(log.GRPCSys, "gRPC proxy server support enabled. Starting gRPC proxy server on http://%v.\n", s.Config.RemoteControl.GRPC.GRPCProxyListenAddress)
targetDir := utils.GetTLSDir(s.Settings.DataDir)
creds, err := credentials.NewClientTLSFromFile(filepath.Join(targetDir, "cert.pem"), "")
if err != nil {
log.Errorf(log.GRPCSys, "Unabled to start gRPC proxy. Err: %s\n", err)
return
}
mux := grpcruntime.NewServeMux()
opts := []grpc.DialOption{grpc.WithTransportCredentials(creds),
grpc.WithPerRPCCredentials(auth.BasicAuth{
Username: s.Config.RemoteControl.Username,
Password: s.Config.RemoteControl.Password,
}),
}
err = gctrpc.RegisterGoCryptoTraderHandlerFromEndpoint(context.Background(),
mux, s.Config.RemoteControl.GRPC.ListenAddress, opts)
if err != nil {
log.Errorf(log.GRPCSys, "Failed to register gRPC proxy. Err: %s\n", err)
return
}
go func() {
if err := http.ListenAndServe(s.Config.RemoteControl.GRPC.GRPCProxyListenAddress, mux); err != nil {
log.Errorf(log.GRPCSys, "gRPC proxy failed to server: %s\n", err)
return
}
}()
log.Debugln(log.GRPCSys, "gRPC proxy server started!")
}
// GetInfo returns info about the current GoCryptoTrader session
func (s *RPCServer) GetInfo(_ context.Context, r *gctrpc.GetInfoRequest) (*gctrpc.GetInfoResponse, error) {
d := time.Since(s.Uptime)
resp := gctrpc.GetInfoResponse{
Uptime: d.String(),
EnabledExchanges: int64(s.Config.CountEnabledExchanges()),
AvailableExchanges: int64(len(s.Config.Exchanges)),
DefaultFiatCurrency: s.Config.Currency.FiatDisplayCurrency.String(),
DefaultForexProvider: s.Config.GetPrimaryForexProvider(),
SubsystemStatus: s.GetSubsystemsStatus(),
}
endpoints := GetRPCEndpoints()
resp.RpcEndpoints = make(map[string]*gctrpc.RPCEndpoint)
for k, v := range endpoints {
resp.RpcEndpoints[k] = &gctrpc.RPCEndpoint{
Started: v.Started,
ListenAddress: v.ListenAddr,
}
}
return &resp, nil
}
// GetSubsystems returns a list of subsystems and their status
func (s *RPCServer) GetSubsystems(_ context.Context, r *gctrpc.GetSubsystemsRequest) (*gctrpc.GetSusbsytemsResponse, error) {
return &gctrpc.GetSusbsytemsResponse{SubsystemsStatus: s.GetSubsystemsStatus()}, nil
}
// EnableSubsystem enables a engine subsytem
func (s *RPCServer) EnableSubsystem(_ context.Context, r *gctrpc.GenericSubsystemRequest) (*gctrpc.GenericResponse, error) {
err := s.SetSubsystem(r.Subsystem, true)
if err != nil {
return nil, err
}
return &gctrpc.GenericResponse{Status: MsgStatusSuccess,
Data: fmt.Sprintf("subsystem %s enabled", r.Subsystem)}, nil
}
// DisableSubsystem disables a engine subsytem
func (s *RPCServer) DisableSubsystem(_ context.Context, r *gctrpc.GenericSubsystemRequest) (*gctrpc.GenericResponse, error) {
err := s.SetSubsystem(r.Subsystem, false)
if err != nil {
return nil, err
}
return &gctrpc.GenericResponse{Status: MsgStatusSuccess,
Data: fmt.Sprintf("subsystem %s disabled", r.Subsystem)}, nil
}
// GetRPCEndpoints returns a list of API endpoints
func (s *RPCServer) GetRPCEndpoints(_ context.Context, r *gctrpc.GetRPCEndpointsRequest) (*gctrpc.GetRPCEndpointsResponse, error) {
endpoints := GetRPCEndpoints()
var resp gctrpc.GetRPCEndpointsResponse
resp.Endpoints = make(map[string]*gctrpc.RPCEndpoint)
for k, v := range endpoints {
resp.Endpoints[k] = &gctrpc.RPCEndpoint{
Started: v.Started,
ListenAddress: v.ListenAddr,
}
}
return &resp, nil
}
// GetCommunicationRelayers returns the status of the engines communication relayers
func (s *RPCServer) GetCommunicationRelayers(_ context.Context, r *gctrpc.GetCommunicationRelayersRequest) (*gctrpc.GetCommunicationRelayersResponse, error) {
relayers, err := s.CommsManager.GetStatus()
if err != nil {
return nil, err
}
var resp gctrpc.GetCommunicationRelayersResponse
resp.CommunicationRelayers = make(map[string]*gctrpc.CommunicationRelayer)
for k, v := range relayers {
resp.CommunicationRelayers[k] = &gctrpc.CommunicationRelayer{
Enabled: v.Enabled,
Connected: v.Connected,
}
}
return &resp, nil
}
// GetExchanges returns a list of exchanges
// Param is whether or not you wish to list enabled exchanges
func (s *RPCServer) GetExchanges(_ context.Context, r *gctrpc.GetExchangesRequest) (*gctrpc.GetExchangesResponse, error) {
exchanges := strings.Join(s.GetExchangeNames(r.Enabled), ",")
return &gctrpc.GetExchangesResponse{Exchanges: exchanges}, nil
}
// DisableExchange disables an exchange
func (s *RPCServer) DisableExchange(_ context.Context, r *gctrpc.GenericExchangeNameRequest) (*gctrpc.GenericResponse, error) {
err := s.UnloadExchange(r.Exchange)
if err != nil {
return nil, err
}
return &gctrpc.GenericResponse{Status: MsgStatusSuccess}, nil
}
// EnableExchange enables an exchange
func (s *RPCServer) EnableExchange(_ context.Context, r *gctrpc.GenericExchangeNameRequest) (*gctrpc.GenericResponse, error) {
err := s.LoadExchange(r.Exchange, false, nil)
if err != nil {
return nil, err
}
return &gctrpc.GenericResponse{Status: MsgStatusSuccess}, nil
}
// GetExchangeOTPCode retrieves an exchanges OTP code
func (s *RPCServer) GetExchangeOTPCode(_ context.Context, r *gctrpc.GenericExchangeNameRequest) (*gctrpc.GetExchangeOTPReponse, error) {
result, err := s.GetExchangeoOTPByName(r.Exchange)
return &gctrpc.GetExchangeOTPReponse{OtpCode: result}, err
}
// GetExchangeOTPCodes retrieves OTP codes for all exchanges which have an
// OTP secret installed
func (s *RPCServer) GetExchangeOTPCodes(_ context.Context, r *gctrpc.GetExchangeOTPsRequest) (*gctrpc.GetExchangeOTPsResponse, error) {
result, err := s.GetExchangeOTPs()
return &gctrpc.GetExchangeOTPsResponse{OtpCodes: result}, err
}
// GetExchangeInfo gets info for a specific exchange
func (s *RPCServer) GetExchangeInfo(_ context.Context, r *gctrpc.GenericExchangeNameRequest) (*gctrpc.GetExchangeInfoResponse, error) {
exchCfg, err := s.Config.GetExchangeConfig(r.Exchange)
if err != nil {
return nil, err
}
resp := &gctrpc.GetExchangeInfoResponse{
Name: exchCfg.Name,
Enabled: exchCfg.Enabled,
Verbose: exchCfg.Verbose,
UsingSandbox: exchCfg.UseSandbox,
HttpTimeout: exchCfg.HTTPTimeout.String(),
HttpUseragent: exchCfg.HTTPUserAgent,
HttpProxy: exchCfg.ProxyAddress,
BaseCurrencies: strings.Join(exchCfg.BaseCurrencies.Strings(), ","),
}
resp.SupportedAssets = make(map[string]*gctrpc.PairsSupported)
assets := exchCfg.CurrencyPairs.GetAssetTypes()
for i := range assets {
ps, err := exchCfg.CurrencyPairs.Get(assets[i])
if err != nil {
return nil, err
}
resp.SupportedAssets[assets[i].String()] = &gctrpc.PairsSupported{
EnabledPairs: ps.Enabled.Join(),
AvailablePairs: ps.Available.Join(),
}
}
return resp, nil
}
// GetTicker returns the ticker for a specified exchange, currency pair and
// asset type
func (s *RPCServer) GetTicker(_ context.Context, r *gctrpc.GetTickerRequest) (*gctrpc.TickerResponse, error) {
a, err := asset.New(r.AssetType)
if err != nil {
return nil, err
}
t, err := s.GetSpecificTicker(currency.Pair{
Delimiter: r.Pair.Delimiter,
Base: currency.NewCode(r.Pair.Base),
Quote: currency.NewCode(r.Pair.Quote),
},
r.Exchange,
a,
)
if err != nil {
return nil, err
}
resp := &gctrpc.TickerResponse{
Pair: r.Pair,
LastUpdated: t.LastUpdated.Unix(),
Last: t.Last,
High: t.High,
Low: t.Low,
Bid: t.Bid,
Ask: t.Ask,
Volume: t.Volume,
PriceAth: t.PriceATH,
}
return resp, nil
}
// GetTickers returns a list of tickers for all enabled exchanges and all
// enabled currency pairs
func (s *RPCServer) GetTickers(_ context.Context, r *gctrpc.GetTickersRequest) (*gctrpc.GetTickersResponse, error) {
activeTickers := s.GetAllActiveTickers()
var tickers []*gctrpc.Tickers
for x := range activeTickers {
var ticker gctrpc.Tickers
ticker.Exchange = activeTickers[x].ExchangeName
for y := range activeTickers[x].ExchangeValues {
t := activeTickers[x].ExchangeValues[y]
ticker.Tickers = append(ticker.Tickers, &gctrpc.TickerResponse{
Pair: &gctrpc.CurrencyPair{
Delimiter: t.Pair.Delimiter,
Base: t.Pair.Base.String(),
Quote: t.Pair.Quote.String(),
},
LastUpdated: t.LastUpdated.Unix(),
Last: t.Last,
High: t.High,
Low: t.Low,
Bid: t.Bid,
Ask: t.Ask,
Volume: t.Volume,
PriceAth: t.PriceATH,
})
}
tickers = append(tickers, &ticker)
}
return &gctrpc.GetTickersResponse{Tickers: tickers}, nil
}
// GetOrderbook returns an orderbook for a specific exchange, currency pair
// and asset type
func (s *RPCServer) GetOrderbook(_ context.Context, r *gctrpc.GetOrderbookRequest) (*gctrpc.OrderbookResponse, error) {
a, err := asset.New(r.AssetType)
if err != nil {
return nil, err
}
ob, err := s.GetSpecificOrderbook(currency.Pair{
Delimiter: r.Pair.Delimiter,
Base: currency.NewCode(r.Pair.Base),
Quote: currency.NewCode(r.Pair.Quote),
},
r.Exchange,
a,
)
if err != nil {
return nil, err
}
bids := make([]*gctrpc.OrderbookItem, 0, len(ob.Bids))
asks := make([]*gctrpc.OrderbookItem, 0, len(ob.Asks))
ch := make(chan bool)
go func() {
for _, b := range ob.Bids {
bids = append(bids, &gctrpc.OrderbookItem{
Amount: b.Amount,
Price: b.Price,
})
}
ch <- true
}()
for _, a := range ob.Asks {
asks = append(asks, &gctrpc.OrderbookItem{
Amount: a.Amount,
Price: a.Price,
})
}
<-ch
resp := &gctrpc.OrderbookResponse{
Pair: r.Pair,
Bids: bids,
Asks: asks,
LastUpdated: ob.LastUpdated.Unix(),
AssetType: r.AssetType,
}
return resp, nil
}
// GetOrderbooks returns a list of orderbooks for all enabled exchanges and all
// enabled currency pairs
func (s *RPCServer) GetOrderbooks(_ context.Context, r *gctrpc.GetOrderbooksRequest) (*gctrpc.GetOrderbooksResponse, error) {
activeOrderbooks := GetAllActiveOrderbooks()
var orderbooks []*gctrpc.Orderbooks
for x := range activeOrderbooks {
var ob gctrpc.Orderbooks
ob.Exchange = activeOrderbooks[x].ExchangeName
for y := range activeOrderbooks[x].ExchangeValues {
o := activeOrderbooks[x].ExchangeValues[y]
var bids []*gctrpc.OrderbookItem
for z := range o.Bids {
bids = append(bids, &gctrpc.OrderbookItem{
Amount: o.Bids[z].Amount,
Price: o.Bids[z].Price,
})
}
var asks []*gctrpc.OrderbookItem
for z := range o.Asks {
asks = append(asks, &gctrpc.OrderbookItem{
Amount: o.Asks[z].Amount,
Price: o.Asks[z].Price,
})
}
ob.Orderbooks = append(ob.Orderbooks, &gctrpc.OrderbookResponse{
Pair: &gctrpc.CurrencyPair{
Delimiter: o.Pair.Delimiter,
Base: o.Pair.Base.String(),
Quote: o.Pair.Quote.String(),
},
LastUpdated: o.LastUpdated.Unix(),
Bids: bids,
Asks: asks,
})
}
orderbooks = append(orderbooks, &ob)
}
return &gctrpc.GetOrderbooksResponse{Orderbooks: orderbooks}, nil
}
// GetAccountInfo returns an account balance for a specific exchange
func (s *RPCServer) GetAccountInfo(_ context.Context, r *gctrpc.GetAccountInfoRequest) (*gctrpc.GetAccountInfoResponse, error) {
exch := s.GetExchangeByName(r.Exchange)
if exch == nil {
return nil, errExchangeNotLoaded
}
resp, err := exch.FetchAccountInfo()
if err != nil {
return nil, err
}
var accounts []*gctrpc.Account
for x := range resp.Accounts {
var a gctrpc.Account
a.Id = resp.Accounts[x].ID
for _, y := range resp.Accounts[x].Currencies {
a.Currencies = append(a.Currencies, &gctrpc.AccountCurrencyInfo{
Currency: y.CurrencyName.String(),
Hold: y.Hold,
TotalValue: y.TotalValue,
})
}
accounts = append(accounts, &a)
}
return &gctrpc.GetAccountInfoResponse{Exchange: r.Exchange, Accounts: accounts}, nil
}
// GetAccountInfoStream streams an account balance for a specific exchange
func (s *RPCServer) GetAccountInfoStream(r *gctrpc.GetAccountInfoRequest, stream gctrpc.GoCryptoTrader_GetAccountInfoStreamServer) error {
if r.Exchange == "" {
return errors.New(errExchangeNameUnset)
}
exch := s.GetExchangeByName(r.Exchange)
if exch == nil {
return errExchangeNotLoaded
}
initAcc, err := exch.FetchAccountInfo()
if err != nil {
return err
}
var accounts []*gctrpc.Account
for x := range initAcc.Accounts {
var subAccounts []*gctrpc.AccountCurrencyInfo
for y := range initAcc.Accounts[x].Currencies {
subAccounts = append(subAccounts, &gctrpc.AccountCurrencyInfo{
Currency: initAcc.Accounts[x].Currencies[y].CurrencyName.String(),
TotalValue: initAcc.Accounts[x].Currencies[y].TotalValue,
Hold: initAcc.Accounts[x].Currencies[y].Hold,
})
}
accounts = append(accounts, &gctrpc.Account{
Id: initAcc.Accounts[x].ID,
Currencies: subAccounts,
})
}
err = stream.Send(&gctrpc.GetAccountInfoResponse{
Exchange: initAcc.Exchange,
Accounts: accounts,
})
if err != nil {
return err
}
pipe, err := account.SubscribeToExchangeAccount(r.Exchange)
if err != nil {
return err
}
defer pipe.Release()
for {
data, ok := <-pipe.C
if !ok {
return errors.New(errDispatchSystem)
}
acc := (*data.(*interface{})).(account.Holdings)
var accounts []*gctrpc.Account
for x := range acc.Accounts {
var subAccounts []*gctrpc.AccountCurrencyInfo
for y := range acc.Accounts[x].Currencies {
subAccounts = append(subAccounts, &gctrpc.AccountCurrencyInfo{
Currency: acc.Accounts[x].Currencies[y].CurrencyName.String(),
TotalValue: acc.Accounts[x].Currencies[y].TotalValue,
Hold: acc.Accounts[x].Currencies[y].Hold,
})
}
accounts = append(accounts, &gctrpc.Account{
Id: acc.Accounts[x].ID,
Currencies: subAccounts,
})
}
err := stream.Send(&gctrpc.GetAccountInfoResponse{
Exchange: acc.Exchange,
Accounts: accounts,
})
if err != nil {
return err
}
}
}
// GetConfig returns the bots config
func (s *RPCServer) GetConfig(_ context.Context, r *gctrpc.GetConfigRequest) (*gctrpc.GetConfigResponse, error) {
return &gctrpc.GetConfigResponse{}, common.ErrNotYetImplemented
}
// GetPortfolio returns the portfolio details
func (s *RPCServer) GetPortfolio(_ context.Context, r *gctrpc.GetPortfolioRequest) (*gctrpc.GetPortfolioResponse, error) {
var addrs []*gctrpc.PortfolioAddress
botAddrs := s.Portfolio.Addresses
for x := range botAddrs {
addrs = append(addrs, &gctrpc.PortfolioAddress{
Address: botAddrs[x].Address,
CoinType: botAddrs[x].CoinType.String(),
Description: botAddrs[x].Description,
Balance: botAddrs[x].Balance,
})
}
resp := &gctrpc.GetPortfolioResponse{
Portfolio: addrs,
}
return resp, nil
}
// GetPortfolioSummary returns the portfolio summary
func (s *RPCServer) GetPortfolioSummary(_ context.Context, r *gctrpc.GetPortfolioSummaryRequest) (*gctrpc.GetPortfolioSummaryResponse, error) {
result := s.Portfolio.GetPortfolioSummary()
var resp gctrpc.GetPortfolioSummaryResponse
p := func(coins []portfolio.Coin) []*gctrpc.Coin {
var c []*gctrpc.Coin
for x := range coins {
c = append(c,
&gctrpc.Coin{
Coin: coins[x].Coin.String(),
Balance: coins[x].Balance,
Address: coins[x].Address,
Percentage: coins[x].Percentage,
},
)
}
return c
}
resp.CoinTotals = p(result.Totals)
resp.CoinsOffline = p(result.Offline)
resp.CoinsOfflineSummary = make(map[string]*gctrpc.OfflineCoins)
for k, v := range result.OfflineSummary {
var o []*gctrpc.OfflineCoinSummary
for x := range v {
o = append(o,
&gctrpc.OfflineCoinSummary{
Address: v[x].Address,
Balance: v[x].Balance,
Percentage: v[x].Percentage,
},
)
}
resp.CoinsOfflineSummary[k.String()] = &gctrpc.OfflineCoins{
Addresses: o,
}
}
resp.CoinsOnline = p(result.Online)
resp.CoinsOnlineSummary = make(map[string]*gctrpc.OnlineCoins)
for k, v := range result.OnlineSummary {
o := make(map[string]*gctrpc.OnlineCoinSummary)
for x, y := range v {
o[x.String()] = &gctrpc.OnlineCoinSummary{
Balance: y.Balance,
Percentage: y.Percentage,
}
}
resp.CoinsOnlineSummary[k] = &gctrpc.OnlineCoins{
Coins: o,
}
}
return &resp, nil
}
// AddPortfolioAddress adds an address to the portfolio manager
func (s *RPCServer) AddPortfolioAddress(_ context.Context, r *gctrpc.AddPortfolioAddressRequest) (*gctrpc.GenericResponse, error) {
err := s.Portfolio.AddAddress(r.Address,
r.Description,
currency.NewCode(r.CoinType),
r.Balance)
if err != nil {
return nil, err
}
return &gctrpc.GenericResponse{Status: MsgStatusSuccess}, nil
}
// RemovePortfolioAddress removes an address from the portfolio manager
func (s *RPCServer) RemovePortfolioAddress(_ context.Context, r *gctrpc.RemovePortfolioAddressRequest) (*gctrpc.GenericResponse, error) {
err := s.Portfolio.RemoveAddress(r.Address,
r.Description,
currency.NewCode(r.CoinType))
if err != nil {
return nil, err
}
return &gctrpc.GenericResponse{Status: MsgStatusSuccess}, nil
}
// GetForexProviders returns a list of available forex providers
func (s *RPCServer) GetForexProviders(_ context.Context, r *gctrpc.GetForexProvidersRequest) (*gctrpc.GetForexProvidersResponse, error) {
providers := s.Config.GetForexProviders()
if len(providers) == 0 {
return nil, fmt.Errorf("forex providers is empty")
}
var forexProviders []*gctrpc.ForexProvider
for x := range providers {
forexProviders = append(forexProviders, &gctrpc.ForexProvider{
Name: providers[x].Name,
Enabled: providers[x].Enabled,
Verbose: providers[x].Verbose,
RestPollingDelay: providers[x].RESTPollingDelay.String(),
ApiKey: providers[x].APIKey,
ApiKeyLevel: int64(providers[x].APIKeyLvl),
PrimaryProvider: providers[x].PrimaryProvider,
})
}
return &gctrpc.GetForexProvidersResponse{ForexProviders: forexProviders}, nil
}
// GetForexRates returns a list of forex rates
func (s *RPCServer) GetForexRates(_ context.Context, r *gctrpc.GetForexRatesRequest) (*gctrpc.GetForexRatesResponse, error) {
rates, err := currency.GetExchangeRates()
if err != nil {
return nil, err
}
if len(rates) == 0 {
return nil, fmt.Errorf("forex rates is empty")
}
var forexRates []*gctrpc.ForexRatesConversion
for x := range rates {
rate, err := rates[x].GetRate()
if err != nil {
continue
}
// TODO
// inverseRate, err := rates[x].GetInversionRate()
// if err != nil {
// continue
// }
forexRates = append(forexRates, &gctrpc.ForexRatesConversion{
From: rates[x].From.String(),
To: rates[x].To.String(),
Rate: rate,
InverseRate: 0,
})
}
return &gctrpc.GetForexRatesResponse{ForexRates: forexRates}, nil
}
// GetOrders returns all open orders, filtered by exchange, currency pair or
// asset type
func (s *RPCServer) GetOrders(_ context.Context, r *gctrpc.GetOrdersRequest) (*gctrpc.GetOrdersResponse, error) {
exch := s.GetExchangeByName(r.Exchange)
if exch == nil {
return nil, errExchangeNotLoaded
}
resp, err := exch.GetActiveOrders(&order.GetOrdersRequest{
Pairs: []currency.Pair{
currency.NewPairWithDelimiter(r.Pair.Base,
r.Pair.Quote, r.Pair.Delimiter),
},
})
if err != nil {
return nil, err
}
var orders []*gctrpc.OrderDetails
for x := range resp {
orders = append(orders, &gctrpc.OrderDetails{
Exchange: r.Exchange,
Id: resp[x].ID,
BaseCurrency: resp[x].Pair.Base.String(),
QuoteCurrency: resp[x].Pair.Quote.String(),
AssetType: asset.Spot.String(),
OrderType: resp[x].Type.String(),
OrderSide: resp[x].Side.String(),
CreationTime: resp[x].Date.Unix(),
Status: resp[x].Status.String(),
Price: resp[x].Price,
Amount: resp[x].Amount,
})
}
return &gctrpc.GetOrdersResponse{Orders: orders}, nil
}
// GetOrder returns order information based on exchange and order ID
func (s *RPCServer) GetOrder(_ context.Context, r *gctrpc.GetOrderRequest) (*gctrpc.OrderDetails, error) {
exch := s.GetExchangeByName(r.Exchange)
if exch == nil {
return nil, errExchangeNotLoaded
}
pair := currency.Pair{
Delimiter: r.Pair.Delimiter,
Base: currency.NewCode(r.Pair.Base),
Quote: currency.NewCode(r.Pair.Quote),
}
result, err := s.OrderManager.GetOrderInfo(r.Exchange, r.OrderId, pair, "") // assetType will be implemented in the future
if err != nil {
return nil, fmt.Errorf("error whilst trying to retrieve info for order %s: %s", r.OrderId, err)
}
var trades []*gctrpc.TradeHistory
for i := range result.Trades {
trades = append(trades, &gctrpc.TradeHistory{
CreationTime: result.Trades[i].Timestamp.Unix(),
Id: result.Trades[i].TID,
Price: result.Trades[i].Price,
Amount: result.Trades[i].Amount,
Exchange: result.Trades[i].Exchange,
AssetType: result.Trades[i].Type.String(),
OrderSide: result.Trades[i].Side.String(),
Fee: result.Trades[i].Fee,
})
}
return &gctrpc.OrderDetails{
Exchange: result.Exchange,
Id: result.ID,
BaseCurrency: result.Pair.Base.String(),
QuoteCurrency: result.Pair.Quote.String(),
AssetType: result.AssetType.String(),
OrderSide: result.Side.String(),
OrderType: result.Type.String(),
CreationTime: result.Date.Unix(),
Status: result.Status.String(),
Price: result.Price,
Amount: result.Amount,
OpenVolume: result.RemainingAmount,
Fee: result.Fee,
Trades: trades,
Cost: result.Cost,
UpdateTime: result.CloseTime.Unix(),
}, err
}
// SubmitOrder submits an order specified by exchange, currency pair and asset
// type
func (s *RPCServer) SubmitOrder(_ context.Context, r *gctrpc.SubmitOrderRequest) (*gctrpc.SubmitOrderResponse, error) {
exch := s.GetExchangeByName(r.Exchange)
if exch == nil {
return nil, errExchangeNotLoaded
}
p, err := currency.NewPairFromStrings(r.Pair.Base, r.Pair.Quote)
if err != nil {
return nil, err
}
a, err := asset.New(r.AssetType)
if err != nil {
return nil, err
}
submission := &order.Submit{
Pair: p,
Side: order.Side(r.Side),
Type: order.Type(r.OrderType),
Amount: r.Amount,
Price: r.Price,
ClientID: r.ClientId,
Exchange: r.Exchange,
AssetType: a,
}
resp, err := exch.SubmitOrder(submission)
if err != nil {
return &gctrpc.SubmitOrderResponse{}, err
}
var trades []*gctrpc.Trades
for i := range resp.Trades {
trades = append(trades, &gctrpc.Trades{
Amount: resp.Trades[i].Amount,
Price: resp.Trades[i].Price,
Fee: resp.Trades[i].Fee,
FeeAsset: resp.Trades[i].FeeAsset,
})
}
return &gctrpc.SubmitOrderResponse{
OrderId: resp.OrderID,
OrderPlaced: resp.IsOrderPlaced,
Trades: trades,
}, err
}
// SimulateOrder simulates an order specified by exchange, currency pair and asset
// type
func (s *RPCServer) SimulateOrder(_ context.Context, r *gctrpc.SimulateOrderRequest) (*gctrpc.SimulateOrderResponse, error) {
exch := s.GetExchangeByName(r.Exchange)
if exch == nil {
return nil, errExchangeNotLoaded
}
p, err := currency.NewPairFromStrings(r.Pair.Base, r.Pair.Quote)
if err != nil {
return nil, err
}
o, err := exch.FetchOrderbook(p, asset.Spot)
if err != nil {
return nil, err
}
var buy = true
if !strings.EqualFold(r.Side, order.Buy.String()) &&
!strings.EqualFold(r.Side, order.Bid.String()) {
buy = false
}
result := o.SimulateOrder(r.Amount, buy)
var resp gctrpc.SimulateOrderResponse
for x := range result.Orders {
resp.Orders = append(resp.Orders, &gctrpc.OrderbookItem{
Price: result.Orders[x].Price,
Amount: result.Orders[x].Amount,
})
}
resp.Amount = result.Amount
resp.MaximumPrice = result.MaximumPrice
resp.MinimumPrice = result.MinimumPrice
resp.PercentageGainLoss = result.PercentageGainOrLoss
resp.Status = result.Status
return &resp, nil
}
// WhaleBomb finds the amount required to reach a specific price target for a given exchange, pair
// and asset type
func (s *RPCServer) WhaleBomb(_ context.Context, r *gctrpc.WhaleBombRequest) (*gctrpc.SimulateOrderResponse, error) {
exch := s.GetExchangeByName(r.Exchange)
if exch == nil {
return nil, errExchangeNotLoaded
}
p, err := currency.NewPairFromStrings(r.Pair.Base, r.Pair.Quote)
if err != nil {
return nil, err
}
o, err := exch.FetchOrderbook(p, asset.Spot)
if err != nil {
return nil, err
}
var buy = true
if !strings.EqualFold(r.Side, order.Buy.String()) &&
!strings.EqualFold(r.Side, order.Bid.String()) {
buy = false
}
result, err := o.WhaleBomb(r.PriceTarget, buy)
var resp gctrpc.SimulateOrderResponse
for x := range result.Orders {
resp.Orders = append(resp.Orders, &gctrpc.OrderbookItem{
Price: result.Orders[x].Price,
Amount: result.Orders[x].Amount,
})
}
resp.Amount = result.Amount
resp.MaximumPrice = result.MaximumPrice
resp.MinimumPrice = result.MinimumPrice
resp.PercentageGainLoss = result.PercentageGainOrLoss
resp.Status = result.Status
return &resp, err
}
// CancelOrder cancels an order specified by exchange, currency pair and asset
// type
func (s *RPCServer) CancelOrder(_ context.Context, r *gctrpc.CancelOrderRequest) (*gctrpc.GenericResponse, error) {
exch := s.GetExchangeByName(r.Exchange)
if exch == nil {
return nil, errExchangeNotLoaded
}
p, err := currency.NewPairFromStrings(r.Pair.Base, r.Pair.Quote)
if err != nil {
return nil, err
}
a, err := asset.New(r.AssetType)
if err != nil {
return nil, err
}
err = exch.CancelOrder(&order.Cancel{
AccountID: r.AccountId,
ID: r.OrderId,
Side: order.Side(r.Side),
WalletAddress: r.WalletAddress,
Pair: p,
AssetType: a,
})
if err != nil {
return nil, err
}
return &gctrpc.GenericResponse{Status: MsgStatusSuccess,
Data: fmt.Sprintf("order %s cancelled", r.OrderId)}, nil
}
// CancelAllOrders cancels all orders, filterable by exchange
func (s *RPCServer) CancelAllOrders(_ context.Context, r *gctrpc.CancelAllOrdersRequest) (*gctrpc.CancelAllOrdersResponse, error) {
return &gctrpc.CancelAllOrdersResponse{}, common.ErrNotYetImplemented
}
// GetEvents returns the stored events list
func (s *RPCServer) GetEvents(_ context.Context, r *gctrpc.GetEventsRequest) (*gctrpc.GetEventsResponse, error) {
return &gctrpc.GetEventsResponse{}, common.ErrNotYetImplemented
}
// AddEvent adds an event
func (s *RPCServer) AddEvent(_ context.Context, r *gctrpc.AddEventRequest) (*gctrpc.AddEventResponse, error) {
evtCondition := EventConditionParams{
CheckBids: r.ConditionParams.CheckBids,
CheckBidsAndAsks: r.ConditionParams.CheckBidsAndAsks,
Condition: r.ConditionParams.Condition,
OrderbookAmount: r.ConditionParams.OrderbookAmount,
Price: r.ConditionParams.Price,
}
p := currency.NewPairWithDelimiter(r.Pair.Base,
r.Pair.Quote, r.Pair.Delimiter)
a, err := asset.New(r.AssetType)
if err != nil {
return nil, err
}
id, err := Add(r.Exchange, r.Item, evtCondition, p, a, r.Action)
if err != nil {
return nil, err
}
return &gctrpc.AddEventResponse{Id: id}, nil
}
// RemoveEvent removes an event, specified by an event ID
func (s *RPCServer) RemoveEvent(_ context.Context, r *gctrpc.RemoveEventRequest) (*gctrpc.GenericResponse, error) {
if !Remove(r.Id) {
return nil, fmt.Errorf("event %d not removed", r.Id)
}
return &gctrpc.GenericResponse{Status: MsgStatusSuccess,
Data: fmt.Sprintf("event %d removed", r.Id)}, nil
}
// GetCryptocurrencyDepositAddresses returns a list of cryptocurrency deposit
// addresses specified by an exchange
func (s *RPCServer) GetCryptocurrencyDepositAddresses(_ context.Context, r *gctrpc.GetCryptocurrencyDepositAddressesRequest) (*gctrpc.GetCryptocurrencyDepositAddressesResponse, error) {
exch := s.GetExchangeByName(r.Exchange)
if exch == nil {
return nil, errExchangeNotLoaded
}
result, err := s.GetCryptocurrencyDepositAddressesByExchange(r.Exchange)
return &gctrpc.GetCryptocurrencyDepositAddressesResponse{Addresses: result}, err
}
// GetCryptocurrencyDepositAddress returns a cryptocurrency deposit address
// specified by exchange and cryptocurrency
func (s *RPCServer) GetCryptocurrencyDepositAddress(_ context.Context, r *gctrpc.GetCryptocurrencyDepositAddressRequest) (*gctrpc.GetCryptocurrencyDepositAddressResponse, error) {
exch := s.GetExchangeByName(r.Exchange)
if exch == nil {
return nil, errExchangeNotLoaded
}
addr, err := s.GetExchangeCryptocurrencyDepositAddress(r.Exchange, "", currency.NewCode(r.Cryptocurrency))
return &gctrpc.GetCryptocurrencyDepositAddressResponse{Address: addr}, err
}
// WithdrawCryptocurrencyFunds withdraws cryptocurrency funds specified by
// exchange
func (s *RPCServer) WithdrawCryptocurrencyFunds(_ context.Context, r *gctrpc.WithdrawCryptoRequest) (*gctrpc.WithdrawResponse, error) {
exch := s.GetExchangeByName(r.Exchange)
if exch == nil {
return nil, errExchangeNotLoaded
}
request := &withdraw.Request{
Exchange: r.Exchange,
Amount: r.Amount,
Currency: currency.NewCode(strings.ToUpper(r.Currency)),
Type: withdraw.Crypto,
Description: r.Description,
Crypto: withdraw.CryptoRequest{
Address: r.Address,
AddressTag: r.AddressTag,
FeeAmount: r.Fee,
},
}
resp, err := SubmitWithdrawal(request)
if err != nil {
return nil, err
}
return &gctrpc.WithdrawResponse{
Id: resp.ID.String(),
Status: resp.Exchange.Status,
}, nil
}
// WithdrawFiatFunds withdraws fiat funds specified by exchange
func (s *RPCServer) WithdrawFiatFunds(_ context.Context, r *gctrpc.WithdrawFiatRequest) (*gctrpc.WithdrawResponse, error) {
exch := s.GetExchangeByName(r.Exchange)
if exch == nil {
return nil, errExchangeNotLoaded
}
var bankAccount *banking.Account
bankAccount, err := banking.GetBankAccountByID(r.BankAccountId)
if err != nil {
base := exch.GetBase()
if base == nil {
return nil, errExchangeBaseNotFound
}
bankAccount, err = base.GetExchangeBankAccounts(r.BankAccountId, r.Currency)
if err != nil {
return nil, err
}
}
request := &withdraw.Request{
Exchange: r.Exchange,
Amount: r.Amount,
Currency: currency.NewCode(strings.ToUpper(r.Currency)),
Type: withdraw.Fiat,
Description: r.Description,
Fiat: withdraw.FiatRequest{
Bank: *bankAccount,
},
}
resp, err := SubmitWithdrawal(request)
if err != nil {
return nil, err
}
return &gctrpc.WithdrawResponse{
Id: resp.ID.String(),
Status: resp.Exchange.Status,
}, nil
}
// WithdrawalEventByID returns previous withdrawal request details
func (s *RPCServer) WithdrawalEventByID(_ context.Context, r *gctrpc.WithdrawalEventByIDRequest) (*gctrpc.WithdrawalEventByIDResponse, error) {
if !s.Config.Database.Enabled {
return nil, database.ErrDatabaseSupportDisabled
}
v, err := WithdrawalEventByID(r.Id)
if err != nil {
return nil, err
}
resp := &gctrpc.WithdrawalEventByIDResponse{
Event: &gctrpc.WithdrawalEventResponse{
Id: v.ID.String(),
Exchange: &gctrpc.WithdrawlExchangeEvent{
Name: v.Exchange.Name,
Id: v.Exchange.Name,
Status: v.Exchange.Status,
},
Request: &gctrpc.WithdrawalRequestEvent{
Currency: v.RequestDetails.Currency.String(),
Description: v.RequestDetails.Description,
Amount: v.RequestDetails.Amount,
Type: int32(v.RequestDetails.Type),
},
},
}
createdAtPtype, err := ptypes.TimestampProto(v.CreatedAt)
if err != nil {
log.Errorf(log.Global, "failed to convert time: %v", err)
}
resp.Event.CreatedAt = createdAtPtype
updatedAtPtype, err := ptypes.TimestampProto(v.UpdatedAt)
if err != nil {
log.Errorf(log.Global, "failed to convert time: %v", err)
}
resp.Event.UpdatedAt = updatedAtPtype
if v.RequestDetails.Type == withdraw.Crypto {
resp.Event.Request.Crypto = new(gctrpc.CryptoWithdrawalEvent)
resp.Event.Request.Crypto = &gctrpc.CryptoWithdrawalEvent{
Address: v.RequestDetails.Crypto.Address,
AddressTag: v.RequestDetails.Crypto.AddressTag,
Fee: v.RequestDetails.Crypto.FeeAmount,
}
} else if v.RequestDetails.Type == withdraw.Fiat {
if v.RequestDetails.Fiat != (withdraw.FiatRequest{}) {
resp.Event.Request.Fiat = new(gctrpc.FiatWithdrawalEvent)
resp.Event.Request.Fiat = &gctrpc.FiatWithdrawalEvent{
BankName: v.RequestDetails.Fiat.Bank.BankName,
AccountName: v.RequestDetails.Fiat.Bank.AccountName,
AccountNumber: v.RequestDetails.Fiat.Bank.AccountNumber,
Bsb: v.RequestDetails.Fiat.Bank.BSBNumber,
Swift: v.RequestDetails.Fiat.Bank.SWIFTCode,
Iban: v.RequestDetails.Fiat.Bank.IBAN,
}
}
}
return resp, nil
}
// WithdrawalEventsByExchange returns previous withdrawal request details by exchange
func (s *RPCServer) WithdrawalEventsByExchange(_ context.Context, r *gctrpc.WithdrawalEventsByExchangeRequest) (*gctrpc.WithdrawalEventsByExchangeResponse, error) {
if !s.Config.Database.Enabled {
return nil, database.ErrDatabaseSupportDisabled
}
if r.Id == "" {
ret, err := WithdrawalEventByExchange(r.Exchange, int(r.Limit))
if err != nil {
return nil, err
}
return parseMultipleEvents(ret), nil
}
ret, err := WithdrawalEventByExchangeID(r.Exchange, r.Id)
if err != nil {
return nil, err
}
return parseSingleEvents(ret), nil
}
// WithdrawalEventsByDate returns previous withdrawal request details by exchange
func (s *RPCServer) WithdrawalEventsByDate(_ context.Context, r *gctrpc.WithdrawalEventsByDateRequest) (*gctrpc.WithdrawalEventsByExchangeResponse, error) {
UTCStartTime, err := time.Parse(common.SimpleTimeFormat, r.Start)
if err != nil {
return nil, err
}
var UTCEndTime time.Time
UTCEndTime, err = time.Parse(common.SimpleTimeFormat, r.End)
if err != nil {
return nil, err
}
var ret []*withdraw.Response
ret, err = WithdrawEventByDate(r.Exchange, UTCStartTime, UTCEndTime, int(r.Limit))
if err != nil {
return nil, err
}
return parseMultipleEvents(ret), nil
}
// GetLoggerDetails returns a loggers details
func (s *RPCServer) GetLoggerDetails(_ context.Context, r *gctrpc.GetLoggerDetailsRequest) (*gctrpc.GetLoggerDetailsResponse, error) {
levels, err := log.Level(r.Logger)
if err != nil {
return nil, err
}
return &gctrpc.GetLoggerDetailsResponse{
Info: levels.Info,
Debug: levels.Debug,
Warn: levels.Warn,
Error: levels.Error,
}, nil
}
// SetLoggerDetails sets a loggers details
func (s *RPCServer) SetLoggerDetails(_ context.Context, r *gctrpc.SetLoggerDetailsRequest) (*gctrpc.GetLoggerDetailsResponse, error) {
levels, err := log.SetLevel(r.Logger, r.Level)
if err != nil {
return nil, err
}
return &gctrpc.GetLoggerDetailsResponse{
Info: levels.Info,
Debug: levels.Debug,
Warn: levels.Warn,
Error: levels.Error,
}, nil
}
// GetExchangePairs returns a list of exchange supported assets and related pairs
func (s *RPCServer) GetExchangePairs(_ context.Context, r *gctrpc.GetExchangePairsRequest) (*gctrpc.GetExchangePairsResponse, error) {
exchCfg, err := s.Config.GetExchangeConfig(r.Exchange)
if err != nil {
return nil, err
}
assetTypes := exchCfg.CurrencyPairs.GetAssetTypes()
var a asset.Item
if r.Asset != "" {
a, err = asset.New(r.Asset)
if err != nil {
return nil, err
}
if !assetTypes.Contains(a) {
return nil, fmt.Errorf("specified asset %s is not supported by exchange", a)
}
}
var resp gctrpc.GetExchangePairsResponse
resp.SupportedAssets = make(map[string]*gctrpc.PairsSupported)
for x := range assetTypes {
if r.Asset != "" && !strings.EqualFold(assetTypes[x].String(), r.Asset) {
continue
}
ps, err := exchCfg.CurrencyPairs.Get(assetTypes[x])
if err != nil {
return nil, err
}
resp.SupportedAssets[assetTypes[x].String()] = &gctrpc.PairsSupported{
AvailablePairs: ps.Available.Join(),
EnabledPairs: ps.Enabled.Join(),
}
}
return &resp, nil
}
// SetExchangePair enables/disabled the specified pair(s) on an exchange
func (s *RPCServer) SetExchangePair(_ context.Context, r *gctrpc.SetExchangePairRequest) (*gctrpc.GenericResponse, error) {
exchCfg, err := s.Config.GetExchangeConfig(r.Exchange)
if err != nil {
return nil, err
}
if r.AssetType == "" {
return nil, errors.New("asset type must be specified")
}
a, err := asset.New(r.AssetType)
if err != nil {
return nil, err
}
if !exchCfg.CurrencyPairs.GetAssetTypes().Contains(a) {
return nil, fmt.Errorf("specified asset %s is not supported by exchange", a)
}
exch := s.GetExchangeByName(r.Exchange)
if exch == nil {
return nil, errExchangeNotLoaded
}
base := exch.GetBase()
if base == nil {
return nil, errExchangeBaseNotFound
}
pairFmt, err := s.Config.GetPairFormat(r.Exchange, a)
if err != nil {
return nil, err
}
var pass bool
var newErrors common.Errors
for i := range r.Pairs {
var p currency.Pair
p, err = currency.NewPairFromStrings(r.Pairs[i].Base, r.Pairs[i].Quote)
if err != nil {
return nil, err
}
if r.Enable {
err = exchCfg.CurrencyPairs.EnablePair(a,
p.Format(pairFmt.Delimiter, pairFmt.Uppercase))
if err != nil {
newErrors = append(newErrors, err)
continue
}
err = base.CurrencyPairs.EnablePair(a, p)
if err != nil {
newErrors = append(newErrors, err)
continue
}
pass = true
continue
}
err = exchCfg.CurrencyPairs.DisablePair(a,
p.Format(pairFmt.Delimiter, pairFmt.Uppercase))
if err != nil {
newErrors = append(newErrors, err)
continue
}
err = base.CurrencyPairs.DisablePair(a, p)
if err != nil {
newErrors = append(newErrors, err)
continue
}
pass = true
}
if exch.IsWebsocketEnabled() && pass && base.Websocket.IsConnected() {
err = exch.FlushWebsocketChannels()
if err != nil {
newErrors = append(newErrors, err)
}
}
if newErrors != nil {
return nil, newErrors
}
return &gctrpc.GenericResponse{Status: MsgStatusSuccess}, nil
}
// GetOrderbookStream streams the requested updated orderbook
func (s *RPCServer) GetOrderbookStream(r *gctrpc.GetOrderbookStreamRequest, stream gctrpc.GoCryptoTrader_GetOrderbookStreamServer) error {
if r.Exchange == "" {
return errors.New(errExchangeNameUnset)
}
if r.Pair.String() == "" {
return errors.New(errCurrencyPairUnset)
}
if r.AssetType == "" {
return errors.New(errAssetTypeUnset)
}
p, err := currency.NewPairFromStrings(r.Pair.Base, r.Pair.Quote)
if err != nil {
return err
}
a, err := asset.New(r.AssetType)
if err != nil {
return err
}
pipe, err := orderbook.SubscribeOrderbook(r.Exchange, p, a)
if err != nil {
return err
}
defer pipe.Release()
for {
data, ok := <-pipe.C
if !ok {
return errors.New(errDispatchSystem)
}
ob := (*data.(*interface{})).(orderbook.Base)
var bids, asks []*gctrpc.OrderbookItem
for i := range ob.Bids {
bids = append(bids, &gctrpc.OrderbookItem{
Amount: ob.Bids[i].Amount,
Price: ob.Bids[i].Price,
Id: ob.Bids[i].ID,
})
}
for i := range ob.Asks {
asks = append(asks, &gctrpc.OrderbookItem{
Amount: ob.Asks[i].Amount,
Price: ob.Asks[i].Price,
Id: ob.Asks[i].ID,
})
}
err := stream.Send(&gctrpc.OrderbookResponse{
Pair: &gctrpc.CurrencyPair{Base: ob.Pair.Base.String(),
Quote: ob.Pair.Quote.String()},
Bids: bids,
Asks: asks,
AssetType: ob.AssetType.String(),
})
if err != nil {
return err
}
}
}
// GetExchangeOrderbookStream streams all orderbooks associated with an exchange
func (s *RPCServer) GetExchangeOrderbookStream(r *gctrpc.GetExchangeOrderbookStreamRequest, stream gctrpc.GoCryptoTrader_GetExchangeOrderbookStreamServer) error {
if r.Exchange == "" {
return errors.New(errExchangeNameUnset)
}
pipe, err := orderbook.SubscribeToExchangeOrderbooks(r.Exchange)
if err != nil {
return err
}
defer pipe.Release()
for {
data, ok := <-pipe.C
if !ok {
return errors.New(errDispatchSystem)
}
ob := (*data.(*interface{})).(orderbook.Base)
var bids, asks []*gctrpc.OrderbookItem
for i := range ob.Bids {
bids = append(bids, &gctrpc.OrderbookItem{
Amount: ob.Bids[i].Amount,
Price: ob.Bids[i].Price,
Id: ob.Bids[i].ID,
})
}
for i := range ob.Asks {
asks = append(asks, &gctrpc.OrderbookItem{
Amount: ob.Asks[i].Amount,
Price: ob.Asks[i].Price,
Id: ob.Asks[i].ID,
})
}
err := stream.Send(&gctrpc.OrderbookResponse{
Pair: &gctrpc.CurrencyPair{Base: ob.Pair.Base.String(),
Quote: ob.Pair.Quote.String()},
Bids: bids,
Asks: asks,
AssetType: ob.AssetType.String(),
})
if err != nil {
return err
}
}
}
// GetTickerStream streams the requested updated ticker
func (s *RPCServer) GetTickerStream(r *gctrpc.GetTickerStreamRequest, stream gctrpc.GoCryptoTrader_GetTickerStreamServer) error {
if r.Exchange == "" {
return errors.New(errExchangeNameUnset)
}
if r.Pair.String() == "" {
return errors.New(errCurrencyPairUnset)
}
if r.AssetType == "" {
return errors.New(errAssetTypeUnset)
}
p, err := currency.NewPairFromStrings(r.Pair.Base, r.Pair.Quote)
if err != nil {
return err
}
a, err := asset.New(r.AssetType)
if err != nil {
return err
}
pipe, err := ticker.SubscribeTicker(r.Exchange, p, a)
if err != nil {
return err
}
defer pipe.Release()
for {
data, ok := <-pipe.C
if !ok {
return errors.New(errDispatchSystem)
}
t := (*data.(*interface{})).(ticker.Price)
err := stream.Send(&gctrpc.TickerResponse{
Pair: &gctrpc.CurrencyPair{
Base: t.Pair.Base.String(),
Quote: t.Pair.Quote.String(),
Delimiter: t.Pair.Delimiter},
LastUpdated: t.LastUpdated.Unix(),
Last: t.Last,
High: t.High,
Low: t.Low,
Bid: t.Bid,
Ask: t.Ask,
Volume: t.Volume,
PriceAth: t.PriceATH,
})
if err != nil {
return err
}
}
}
// GetExchangeTickerStream streams all tickers associated with an exchange
func (s *RPCServer) GetExchangeTickerStream(r *gctrpc.GetExchangeTickerStreamRequest, stream gctrpc.GoCryptoTrader_GetExchangeTickerStreamServer) error {
if r.Exchange == "" {
return errors.New(errExchangeNameUnset)
}
pipe, err := ticker.SubscribeToExchangeTickers(r.Exchange)
if err != nil {
return err
}
defer pipe.Release()
for {
data, ok := <-pipe.C
if !ok {
return errors.New(errDispatchSystem)
}
t := (*data.(*interface{})).(ticker.Price)
err := stream.Send(&gctrpc.TickerResponse{
Pair: &gctrpc.CurrencyPair{
Base: t.Pair.Base.String(),
Quote: t.Pair.Quote.String(),
Delimiter: t.Pair.Delimiter},
LastUpdated: t.LastUpdated.Unix(),
Last: t.Last,
High: t.High,
Low: t.Low,
Bid: t.Bid,
Ask: t.Ask,
Volume: t.Volume,
PriceAth: t.PriceATH,
})
if err != nil {
return err
}
}
}
// GetAuditEvent returns matching audit events from database
func (s *RPCServer) GetAuditEvent(_ context.Context, r *gctrpc.GetAuditEventRequest) (*gctrpc.GetAuditEventResponse, error) {
UTCStartTime, err := time.Parse(common.SimpleTimeFormat, r.StartDate)
if err != nil {
return nil, err
}
UTCEndTime, err := time.Parse(common.SimpleTimeFormat, r.EndDate)
if err != nil {
return nil, err
}
events, err := audit.GetEvent(UTCStartTime, UTCEndTime, r.OrderBy, int(r.Limit))
if err != nil {
return nil, err
}
resp := gctrpc.GetAuditEventResponse{}
switch v := events.(type) {
case postgres.AuditEventSlice:
for x := range v {
tempEvent := &gctrpc.AuditEvent{
Type: v[x].Type,
Identifier: v[x].Identifier,
Message: v[x].Message,
Timestamp: v[x].CreatedAt.In(time.UTC).Format(common.SimpleTimeFormatWithTimezone),
}
resp.Events = append(resp.Events, tempEvent)
}
case sqlite3.AuditEventSlice:
for x := range v {
tempEvent := &gctrpc.AuditEvent{
Type: v[x].Type,
Identifier: v[x].Identifier,
Message: v[x].Message,
Timestamp: v[x].CreatedAt,
}
resp.Events = append(resp.Events, tempEvent)
}
}
return &resp, nil
}
// GetHistoricCandles returns historical candles for a given exchange
func (s *RPCServer) GetHistoricCandles(_ context.Context, r *gctrpc.GetHistoricCandlesRequest) (*gctrpc.GetHistoricCandlesResponse, error) {
if r.Exchange == "" {
return nil, errors.New(errExchangeNameUnset)
}
if r.Pair.String() == "" {
return nil, errors.New(errCurrencyPairUnset)
}
if r.Start == r.End {
return nil, errors.New(errStartEndTimesUnset)
}
var klineItem kline.Item
UTCStartTime, err := time.Parse(common.SimpleTimeFormat, r.Start)
if err != nil {
return nil, err
}
var UTCEndTime time.Time
UTCEndTime, err = time.Parse(common.SimpleTimeFormat, r.End)
if err != nil {
return nil, err
}
interval := kline.Interval(r.TimeInterval)
resp := gctrpc.GetHistoricCandlesResponse{
Interval: interval.Short(),
Pair: r.Pair,
Start: r.Start,
End: r.End,
}
a, err := asset.New(r.AssetType)
if err != nil {
return nil, err
}
pair := currency.Pair{
Delimiter: r.Pair.Delimiter,
Base: currency.NewCode(r.Pair.Base),
Quote: currency.NewCode(r.Pair.Quote),
}
if r.UseDb {
klineItem, err = kline.LoadFromDatabase(r.Exchange,
pair,
a,
interval,
UTCStartTime,
UTCEndTime)
if err != nil {
return nil, err
}
} else {
exchangeEngine := s.GetExchangeByName(r.Exchange)
if exchangeEngine == nil {
return nil, errors.New("Exchange " + r.Exchange + " not found")
}
if r.ExRequest {
klineItem, err = exchangeEngine.GetHistoricCandlesExtended(pair,
a,
UTCStartTime,
UTCEndTime,
interval)
} else {
klineItem, err = exchangeEngine.GetHistoricCandles(pair,
a,
UTCStartTime,
UTCEndTime,
interval)
}
}
if err != nil {
return nil, err
}
if r.FillMissingWithTrades {
var tradeDataKline *kline.Item
tradeDataKline, err = fillMissingCandlesWithStoredTrades(UTCStartTime, UTCEndTime, &klineItem)
if err != nil {
return nil, err
}
klineItem.Candles = append(klineItem.Candles, tradeDataKline.Candles...)
}
resp.Exchange = klineItem.Exchange
for i := range klineItem.Candles {
resp.Candle = append(resp.Candle, &gctrpc.Candle{
Time: klineItem.Candles[i].Time.In(time.UTC).Format(common.SimpleTimeFormatWithTimezone),
Low: klineItem.Candles[i].Low,
High: klineItem.Candles[i].High,
Open: klineItem.Candles[i].Open,
Close: klineItem.Candles[i].Close,
Volume: klineItem.Candles[i].Volume,
})
}
if r.Sync && !r.UseDb {
_, err = kline.StoreInDatabase(&klineItem, r.Force)
if err != nil {
if errors.Is(err, exchangeDB.ErrNoExchangeFound) {
return nil, errors.New("exchange was not found in database, you can seed existing data or insert a new exchange via the dbseed")
}
return nil, err
}
}
return &resp, nil
}
func fillMissingCandlesWithStoredTrades(startTime, endTime time.Time, klineItem *kline.Item) (*kline.Item, error) {
var response kline.Item
var candleTimes []time.Time
for i := range klineItem.Candles {
candleTimes = append(candleTimes, klineItem.Candles[i].Time)
}
ranges, err := timeperiods.FindTimeRangesContainingData(startTime, endTime, klineItem.Interval.Duration(), candleTimes)
if err != nil {
return nil, err
}
for i := range ranges {
if ranges[i].HasDataInRange {
continue
}
var tradeCandles kline.Item
trades, err := trade.GetTradesInRange(
klineItem.Exchange,
klineItem.Asset.String(),
klineItem.Pair.Base.String(),
klineItem.Pair.Quote.String(),
ranges[i].StartOfRange,
ranges[i].EndOfRange,
)
if err != nil {
return klineItem, err
}
if len(trades) == 0 {
continue
}
tradeCandles, err = trade.ConvertTradesToCandles(klineItem.Interval, trades...)
if err != nil {
return klineItem, err
}
if len(tradeCandles.Candles) == 0 {
continue
}
for i := range tradeCandles.Candles {
response.Candles = append(response.Candles, tradeCandles.Candles[i])
}
for i := range response.Candles {
log.Infof(log.GRPCSys,
"Filled requested OHLCV data for %v %v %v interval at %v with trade data",
klineItem.Exchange,
klineItem.Pair.String(),
klineItem.Asset,
response.Candles[i].Time.In(time.UTC).Format(common.SimpleTimeFormatWithTimezone),
)
}
}
return &response, nil
}
// GCTScriptStatus returns a slice of current running scripts that includes next run time and uuid
func (s *RPCServer) GCTScriptStatus(_ context.Context, r *gctrpc.GCTScriptStatusRequest) (*gctrpc.GCTScriptStatusResponse, error) {
if !gctscript.GCTScriptConfig.Enabled {
return &gctrpc.GCTScriptStatusResponse{Status: gctscript.ErrScriptingDisabled.Error()}, nil
}
if gctscript.VMSCount.Len() < 1 {
return &gctrpc.GCTScriptStatusResponse{Status: "no scripts running"}, nil
}
resp := &gctrpc.GCTScriptStatusResponse{
Status: fmt.Sprintf("%v of %v virtual machines running", gctscript.VMSCount.Len(), gctscript.GCTScriptConfig.MaxVirtualMachines),
}
gctscript.AllVMSync.Range(func(k, v interface{}) bool {
vm := v.(*gctscript.VM)
resp.Scripts = append(resp.Scripts, &gctrpc.GCTScript{
UUID: vm.ID.String(),
Name: vm.ShortName(),
NextRun: vm.NextRun.String(),
})
return true
})
return resp, nil
}
// GCTScriptQuery queries a running script and returns script running information
func (s *RPCServer) GCTScriptQuery(_ context.Context, r *gctrpc.GCTScriptQueryRequest) (*gctrpc.GCTScriptQueryResponse, error) {
if !gctscript.GCTScriptConfig.Enabled {
return &gctrpc.GCTScriptQueryResponse{Status: gctscript.ErrScriptingDisabled.Error()}, nil
}
UUID, err := uuid.FromString(r.Script.UUID)
if err != nil {
return &gctrpc.GCTScriptQueryResponse{Status: MsgStatusError, Data: err.Error()}, nil
}
if v, f := gctscript.AllVMSync.Load(UUID); f {
resp := &gctrpc.GCTScriptQueryResponse{
Status: MsgStatusOK,
Script: &gctrpc.GCTScript{
Name: v.(*gctscript.VM).ShortName(),
UUID: v.(*gctscript.VM).ID.String(),
Path: v.(*gctscript.VM).Path,
NextRun: v.(*gctscript.VM).NextRun.String(),
},
}
data, err := v.(*gctscript.VM).Read()
if err != nil {
return nil, err
}
resp.Data = string(data)
return resp, nil
}
return &gctrpc.GCTScriptQueryResponse{Status: MsgStatusError, Data: "UUID not found"}, nil
}
// GCTScriptExecute execute a script
func (s *RPCServer) GCTScriptExecute(_ context.Context, r *gctrpc.GCTScriptExecuteRequest) (*gctrpc.GenericResponse, error) {
if !gctscript.GCTScriptConfig.Enabled {
return &gctrpc.GenericResponse{Status: gctscript.ErrScriptingDisabled.Error()}, nil
}
if r.Script.Path == "" {
r.Script.Path = gctscript.ScriptPath
}
gctVM := gctscript.New()
if gctVM == nil {
return &gctrpc.GenericResponse{Status: MsgStatusError, Data: "unable to create VM instance"}, nil
}
script := filepath.Join(r.Script.Path, r.Script.Name)
err := gctVM.Load(script)
if err != nil {
return &gctrpc.GenericResponse{
Status: MsgStatusError,
Data: err.Error(),
}, nil
}
go gctVM.CompileAndRun()
return &gctrpc.GenericResponse{
Status: MsgStatusOK,
Data: gctVM.ShortName() + " (" + gctVM.ID.String() + ") executed",
}, nil
}
// GCTScriptStop terminate a running script
func (s *RPCServer) GCTScriptStop(_ context.Context, r *gctrpc.GCTScriptStopRequest) (*gctrpc.GenericResponse, error) {
if !gctscript.GCTScriptConfig.Enabled {
return &gctrpc.GenericResponse{Status: gctscript.ErrScriptingDisabled.Error()}, nil
}
UUID, err := uuid.FromString(r.Script.UUID)
if err != nil {
return &gctrpc.GenericResponse{Status: MsgStatusError, Data: err.Error()}, nil
}
if v, f := gctscript.AllVMSync.Load(UUID); f {
err = v.(*gctscript.VM).Shutdown()
status := " terminated"
if err != nil {
status = " " + err.Error()
}
return &gctrpc.GenericResponse{Status: MsgStatusOK, Data: v.(*gctscript.VM).ID.String() + status}, nil
}
return &gctrpc.GenericResponse{Status: MsgStatusError, Data: "no running script found"}, nil
}
// GCTScriptUpload upload a new script to ScriptPath
func (s *RPCServer) GCTScriptUpload(_ context.Context, r *gctrpc.GCTScriptUploadRequest) (*gctrpc.GenericResponse, error) {
if !gctscript.GCTScriptConfig.Enabled {
return &gctrpc.GenericResponse{Status: gctscript.ErrScriptingDisabled.Error()}, nil
}
fPath := filepath.Join(gctscript.ScriptPath, r.ScriptName)
var fPathExits = fPath
if filepath.Ext(fPath) == ".zip" {
fPathExits = fPathExits[0 : len(fPathExits)-4]
}
if s, err := os.Stat(fPathExits); !os.IsNotExist(err) {
if !r.Overwrite {
return nil, fmt.Errorf("%s script found and overwrite set to false", r.ScriptName)
}
f := filepath.Join(gctscript.ScriptPath, "version_history")
err = os.MkdirAll(f, 0770)
if err != nil {
return nil, err
}
timeString := strconv.FormatInt(time.Now().UnixNano(), 10)
renamedFile := filepath.Join(f, timeString+"-"+filepath.Base(fPathExits))
if s.IsDir() {
err = archive.Zip(fPathExits, renamedFile+".zip")
if err != nil {
return nil, err
}
} else {
err = file.Move(fPathExits, renamedFile)
if err != nil {
return nil, err
}
}
}
newFile, err := os.Create(fPath)
if err != nil {
return nil, err
}
_, err = newFile.Write(r.Data)
if err != nil {
return nil, err
}
err = newFile.Close()
if err != nil {
log.Errorln(log.Global, "Failed to close file handle, archive removal may fail")
}
if r.Archived {
files, errExtract := archive.UnZip(fPath, filepath.Join(gctscript.ScriptPath, r.ScriptName[:len(r.ScriptName)-4]))
if errExtract != nil {
log.Errorf(log.Global, "Failed to archive zip file %v", errExtract)
return &gctrpc.GenericResponse{Status: MsgStatusError, Data: errExtract.Error()}, nil
}
var failedFiles []string
for x := range files {
err = gctscript.Validate(files[x])
if err != nil {
failedFiles = append(failedFiles, files[x])
}
}
err = os.Remove(fPath)
if err != nil {
return nil, err
}
if len(failedFiles) > 0 {
err = os.RemoveAll(filepath.Join(gctscript.ScriptPath, r.ScriptName[:len(r.ScriptName)-4]))
if err != nil {
log.Errorf(log.GCTScriptMgr, "Failed to remove file %v (%v), manual deletion required", filepath.Base(fPath), err)
}
return &gctrpc.GenericResponse{Status: ErrScriptFailedValidation, Data: strings.Join(failedFiles, ", ")}, nil
}
} else {
err = gctscript.Validate(fPath)
if err != nil {
errRemove := os.Remove(fPath)
if errRemove != nil {
log.Errorf(log.GCTScriptMgr, "Failed to remove file %v, manual deletion required: %v", filepath.Base(fPath), errRemove)
}
return &gctrpc.GenericResponse{Status: ErrScriptFailedValidation, Data: err.Error()}, nil
}
}
return &gctrpc.GenericResponse{
Status: MsgStatusOK,
Data: fmt.Sprintf("script %s written", newFile.Name()),
}, nil
}
// GCTScriptReadScript read a script and return contents
func (s *RPCServer) GCTScriptReadScript(_ context.Context, r *gctrpc.GCTScriptReadScriptRequest) (*gctrpc.GCTScriptQueryResponse, error) {
if !gctscript.GCTScriptConfig.Enabled {
return &gctrpc.GCTScriptQueryResponse{Status: gctscript.ErrScriptingDisabled.Error()}, nil
}
filename := filepath.Join(gctscript.ScriptPath, r.Script.Name)
if !strings.HasPrefix(filename, filepath.Clean(gctscript.ScriptPath)+string(os.PathSeparator)) {
return nil, fmt.Errorf("%s: invalid file path", filename)
}
data, err := ioutil.ReadFile(filename)
if err != nil {
return nil, err
}
return &gctrpc.GCTScriptQueryResponse{
Status: MsgStatusOK,
Script: &gctrpc.GCTScript{
Name: filepath.Base(filename),
Path: filepath.Dir(filename),
},
Data: string(data),
}, nil
}
// GCTScriptListAll lists all scripts inside the default script path
func (s *RPCServer) GCTScriptListAll(context.Context, *gctrpc.GCTScriptListAllRequest) (*gctrpc.GCTScriptStatusResponse, error) {
if !gctscript.GCTScriptConfig.Enabled {
return &gctrpc.GCTScriptStatusResponse{Status: gctscript.ErrScriptingDisabled.Error()}, nil
}
resp := &gctrpc.GCTScriptStatusResponse{}
err := filepath.Walk(gctscript.ScriptPath,
func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if filepath.Ext(path) == ".gct" {
resp.Scripts = append(resp.Scripts, &gctrpc.GCTScript{
Name: path,
})
}
return nil
})
if err != nil {
return nil, err
}
return resp, nil
}
// GCTScriptStopAll stops all running scripts
func (s *RPCServer) GCTScriptStopAll(context.Context, *gctrpc.GCTScriptStopAllRequest) (*gctrpc.GenericResponse, error) {
if !gctscript.GCTScriptConfig.Enabled {
return &gctrpc.GenericResponse{Status: gctscript.ErrScriptingDisabled.Error()}, nil
}
err := gctscript.ShutdownAll()
if err != nil {
return &gctrpc.GenericResponse{Status: "error", Data: err.Error()}, nil
}
return &gctrpc.GenericResponse{
Status: MsgStatusOK,
Data: "all running scripts have been stopped",
}, nil
}
// GCTScriptAutoLoadToggle adds or removes an entry to the autoload list
func (s *RPCServer) GCTScriptAutoLoadToggle(_ context.Context, r *gctrpc.GCTScriptAutoLoadRequest) (*gctrpc.GenericResponse, error) {
if !gctscript.GCTScriptConfig.Enabled {
return &gctrpc.GenericResponse{Status: gctscript.ErrScriptingDisabled.Error()}, nil
}
if r.Status {
err := gctscript.Autoload(r.Script, true)
if err != nil {
return &gctrpc.GenericResponse{Status: "error", Data: err.Error()}, nil
}
return &gctrpc.GenericResponse{Status: "success", Data: "script " + r.Script + " removed from autoload list"}, nil
}
err := gctscript.Autoload(r.Script, false)
if err != nil {
return &gctrpc.GenericResponse{Status: "error", Data: err.Error()}, nil
}
return &gctrpc.GenericResponse{Status: "success", Data: "script " + r.Script + " added to autoload list"}, nil
}
// SetExchangeAsset enables or disables an exchanges asset type
func (s *RPCServer) SetExchangeAsset(_ context.Context, r *gctrpc.SetExchangeAssetRequest) (*gctrpc.GenericResponse, error) {
exch := s.GetExchangeByName(r.Exchange)
if exch == nil {
return nil, errExchangeNotLoaded
}
exchCfg, err := s.Config.GetExchangeConfig(r.Exchange)
if err != nil {
return nil, err
}
base := exch.GetBase()
if base == nil {
return nil, errExchangeBaseNotFound
}
if r.Asset == "" {
return nil, errors.New("asset type must be specified")
}
a, err := asset.New(r.Asset)
if err != nil {
return nil, err
}
err = base.CurrencyPairs.SetAssetEnabled(a, r.Enable)
if err != nil {
return nil, err
}
err = exchCfg.CurrencyPairs.SetAssetEnabled(a, r.Enable)
if err != nil {
return nil, err
}
return &gctrpc.GenericResponse{Status: MsgStatusSuccess}, nil
}
// SetAllExchangePairs enables or disables an exchanges pairs
func (s *RPCServer) SetAllExchangePairs(_ context.Context, r *gctrpc.SetExchangeAllPairsRequest) (*gctrpc.GenericResponse, error) {
exch := s.GetExchangeByName(r.Exchange)
if exch == nil {
return nil, errExchangeNotLoaded
}
exchCfg, err := s.Config.GetExchangeConfig(r.Exchange)
if err != nil {
return nil, err
}
base := exch.GetBase()
if base == nil {
return nil, errExchangeBaseNotFound
}
assets := base.CurrencyPairs.GetAssetTypes()
if r.Enable {
for i := range assets {
var pairs currency.Pairs
pairs, err = base.CurrencyPairs.GetPairs(assets[i], false)
if err != nil {
return nil, err
}
exchCfg.CurrencyPairs.StorePairs(assets[i], pairs, true)
base.CurrencyPairs.StorePairs(assets[i], pairs, true)
}
} else {
for i := range assets {
exchCfg.CurrencyPairs.StorePairs(assets[i], nil, true)
base.CurrencyPairs.StorePairs(assets[i], nil, true)
}
}
if exch.IsWebsocketEnabled() && base.Websocket.IsConnected() {
err = exch.FlushWebsocketChannels()
if err != nil {
return nil, err
}
}
return &gctrpc.GenericResponse{Status: MsgStatusSuccess}, nil
}
// UpdateExchangeSupportedPairs forces an update of the supported pairs which
// will update the available pairs list and remove any assets that are disabled
// by the exchange
func (s *RPCServer) UpdateExchangeSupportedPairs(_ context.Context, r *gctrpc.UpdateExchangeSupportedPairsRequest) (*gctrpc.GenericResponse, error) {
exch := s.GetExchangeByName(r.Exchange)
if exch == nil {
return nil, errExchangeNotLoaded
}
base := exch.GetBase()
if base == nil {
return nil, errExchangeBaseNotFound
}
if !base.GetEnabledFeatures().AutoPairUpdates {
return nil,
errors.New("cannot auto pair update for exchange, a manual update is needed")
}
err := exch.UpdateTradablePairs(false)
if err != nil {
return nil, err
}
if exch.IsWebsocketEnabled() {
err = exch.FlushWebsocketChannels()
if err != nil {
return nil, err
}
}
return &gctrpc.GenericResponse{Status: MsgStatusSuccess}, nil
}
// GetExchangeAssets returns the supported asset types
func (s *RPCServer) GetExchangeAssets(_ context.Context, r *gctrpc.GetExchangeAssetsRequest) (*gctrpc.GetExchangeAssetsResponse, error) {
exch := s.GetExchangeByName(r.Exchange)
if exch == nil {
return nil, errExchangeNotLoaded
}
return &gctrpc.GetExchangeAssetsResponse{
Assets: exch.GetAssetTypes().JoinToString(","),
}, nil
}
// WebsocketGetInfo returns websocket connection information
func (s *RPCServer) WebsocketGetInfo(_ context.Context, r *gctrpc.WebsocketGetInfoRequest) (*gctrpc.WebsocketGetInfoResponse, error) {
exch := s.GetExchangeByName(r.Exchange)
if exch == nil {
return nil, errExchangeNotLoaded
}
w, err := exch.GetWebsocket()
if err != nil {
return nil, err
}
return &gctrpc.WebsocketGetInfoResponse{
Exchange: exch.GetName(),
Supported: exch.SupportsWebsocket(),
Enabled: exch.IsWebsocketEnabled(),
Authenticated: w.CanUseAuthenticatedEndpoints(),
RunningUrl: w.GetWebsocketURL(),
ProxyAddress: w.GetProxyAddress(),
}, nil
}
// WebsocketSetEnabled enables or disables the websocket client
func (s *RPCServer) WebsocketSetEnabled(_ context.Context, r *gctrpc.WebsocketSetEnabledRequest) (*gctrpc.GenericResponse, error) {
exch := s.GetExchangeByName(r.Exchange)
if exch == nil {
return nil, errExchangeNotLoaded
}
w, err := exch.GetWebsocket()
if err != nil {
return nil, fmt.Errorf("websocket not supported for exchange %s", r.Exchange)
}
exchCfg, err := s.Config.GetExchangeConfig(r.Exchange)
if err != nil {
return nil, err
}
if r.Enable {
err = w.Enable()
if err != nil {
return nil, err
}
exchCfg.Features.Enabled.Websocket = true
return &gctrpc.GenericResponse{Status: MsgStatusSuccess, Data: "websocket enabled"}, nil
}
err = w.Disable()
if err != nil {
return nil, err
}
exchCfg.Features.Enabled.Websocket = false
return &gctrpc.GenericResponse{Status: MsgStatusSuccess, Data: "websocket disabled"}, nil
}
// WebsocketGetSubscriptions returns websocket subscription analysis
func (s *RPCServer) WebsocketGetSubscriptions(_ context.Context, r *gctrpc.WebsocketGetSubscriptionsRequest) (*gctrpc.WebsocketGetSubscriptionsResponse, error) {
exch := s.GetExchangeByName(r.Exchange)
if exch == nil {
return nil, errExchangeNotLoaded
}
w, err := exch.GetWebsocket()
if err != nil {
return nil, fmt.Errorf("websocket not supported for exchange %s", r.Exchange)
}
payload := new(gctrpc.WebsocketGetSubscriptionsResponse)
payload.Exchange = exch.GetName()
subs := w.GetSubscriptions()
for i := range subs {
params, err := json.Marshal(subs[i].Params)
if err != nil {
return nil, err
}
payload.Subscriptions = append(payload.Subscriptions,
&gctrpc.WebsocketSubscription{
Channel: subs[i].Channel,
Currency: subs[i].Currency.String(),
Asset: subs[i].Asset.String(),
Params: string(params),
})
}
return payload, nil
}
// WebsocketSetProxy sets client websocket connection proxy
func (s *RPCServer) WebsocketSetProxy(_ context.Context, r *gctrpc.WebsocketSetProxyRequest) (*gctrpc.GenericResponse, error) {
exch := s.GetExchangeByName(r.Exchange)
if exch == nil {
return nil, errExchangeNotLoaded
}
w, err := exch.GetWebsocket()
if err != nil {
return nil, fmt.Errorf("websocket not supported for exchange %s", r.Exchange)
}
err = w.SetProxyAddress(r.Proxy)
if err != nil {
return nil, err
}
return &gctrpc.GenericResponse{Status: MsgStatusSuccess,
Data: fmt.Sprintf("new proxy has been set [%s] for %s websocket connection",
r.Exchange,
r.Proxy)}, nil
}
// WebsocketSetURL sets exchange websocket client connection URL
func (s *RPCServer) WebsocketSetURL(_ context.Context, r *gctrpc.WebsocketSetURLRequest) (*gctrpc.GenericResponse, error) {
exch := s.GetExchangeByName(r.Exchange)
if exch == nil {
return nil, errExchangeNotLoaded
}
w, err := exch.GetWebsocket()
if err != nil {
return nil, fmt.Errorf("websocket not supported for exchange %s", r.Exchange)
}
err = w.SetWebsocketURL(r.Url, false, true)
if err != nil {
return nil, err
}
return &gctrpc.GenericResponse{Status: MsgStatusSuccess,
Data: fmt.Sprintf("new URL has been set [%s] for %s websocket connection",
r.Exchange,
r.Url)}, nil
}
// GetSavedTrades returns trades from the database
func (s *RPCServer) GetSavedTrades(_ context.Context, r *gctrpc.GetSavedTradesRequest) (*gctrpc.SavedTradesResponse, error) {
if r.End == "" || r.Start == "" || r.Exchange == "" || r.Pair == nil || r.AssetType == "" || r.Pair.String() == "" {
return nil, errInvalidArguments
}
exch := s.GetExchangeByName(r.Exchange)
if exch == nil {
return nil, errExchangeNotLoaded
}
cp, err := currency.NewPairFromStrings(r.Pair.Base, r.Pair.Quote)
if err != nil {
return nil, err
}
a := asset.Item(r.AssetType)
if !a.IsValid() {
return nil, errors.New("invalid asset")
}
var pairs currency.Pairs
pairs, err = exch.GetEnabledPairs(a)
if err != nil {
return nil, err
}
if !pairs.Contains(cp, false) {
return nil, errors.New("currency not enabled")
}
var UTCStartTime, UTCEndTime time.Time
UTCStartTime, err = time.Parse(common.SimpleTimeFormat, r.Start)
if err != nil {
return nil, err
}
UTCEndTime, err = time.Parse(common.SimpleTimeFormat, r.End)
if err != nil {
return nil, err
}
var trades []trade.Data
trades, err = trade.GetTradesInRange(r.Exchange, r.AssetType, r.Pair.Base, r.Pair.Quote, UTCStartTime, UTCEndTime)
if err != nil {
return nil, err
}
resp := &gctrpc.SavedTradesResponse{
ExchangeName: r.Exchange,
Asset: r.AssetType,
Pair: r.Pair,
}
for i := range trades {
resp.Trades = append(resp.Trades, &gctrpc.SavedTrades{
Price: trades[i].Price,
Amount: trades[i].Amount,
Side: trades[i].Side.String(),
Timestamp: trades[i].Timestamp.In(time.UTC).Format(common.SimpleTimeFormatWithTimezone),
TradeId: trades[i].TID,
})
}
if len(resp.Trades) == 0 {
return nil, fmt.Errorf("request for %v %v trade data between %v and %v and returned no results", r.Exchange, r.AssetType, r.Start, r.End)
}
return resp, nil
}
// ConvertTradesToCandles converts trades to candles using the interval requested
// returns the data too for extra fun scrutiny
func (s *RPCServer) ConvertTradesToCandles(_ context.Context, r *gctrpc.ConvertTradesToCandlesRequest) (*gctrpc.GetHistoricCandlesResponse, error) {
if r.End == "" || r.Start == "" || r.Exchange == "" || r.Pair == nil || r.AssetType == "" || r.Pair.String() == "" || r.TimeInterval == 0 {
return nil, errInvalidArguments
}
exch := s.GetExchangeByName(r.Exchange)
UTCStartTime, err := time.Parse(common.SimpleTimeFormat, r.Start)
if err != nil {
return nil, err
}
var UTCEndTime time.Time
UTCEndTime, err = time.Parse(common.SimpleTimeFormat, r.End)
if err != nil {
return nil, err
}
if exch == nil {
return nil, errExchangeNotLoaded
}
var cp currency.Pair
cp, err = currency.NewPairFromStrings(r.Pair.Base, r.Pair.Quote)
if err != nil {
return nil, err
}
a := asset.Item(r.AssetType)
if !a.IsValid() {
return nil, errors.New("invalid asset")
}
var pairs currency.Pairs
pairs, err = exch.GetEnabledPairs(a)
if err != nil {
return nil, err
}
if !pairs.Contains(cp, false) {
return nil, errors.New("currency not enabled")
}
var trades []trade.Data
trades, err = trade.GetTradesInRange(r.Exchange, r.AssetType, r.Pair.Base, r.Pair.Quote, UTCStartTime, UTCEndTime)
if err != nil {
return nil, err
}
if len(trades) == 0 {
return nil, fmt.Errorf("no trades returned from supplied params")
}
interval := kline.Interval(r.TimeInterval)
var klineItem kline.Item
klineItem, err = trade.ConvertTradesToCandles(interval, trades...)
if err != nil {
return nil, err
}
if len(klineItem.Candles) == 0 {
return nil, fmt.Errorf("no candles generated from trades")
}
resp := &gctrpc.GetHistoricCandlesResponse{
Exchange: r.Exchange,
Pair: r.Pair,
Start: r.Start,
End: r.End,
Interval: interval.String(),
}
for i := range klineItem.Candles {
resp.Candle = append(resp.Candle, &gctrpc.Candle{
Time: klineItem.Candles[i].Time.In(time.UTC).Format(common.SimpleTimeFormatWithTimezone),
Low: klineItem.Candles[i].Low,
High: klineItem.Candles[i].High,
Open: klineItem.Candles[i].Open,
Close: klineItem.Candles[i].Close,
Volume: klineItem.Candles[i].Volume,
})
}
if r.Sync {
_, err = kline.StoreInDatabase(&klineItem, r.Force)
if err != nil {
return nil, err
}
}
return resp, nil
}
// FindMissingSavedCandleIntervals is used to help determine what candle data is missing
func (s *RPCServer) FindMissingSavedCandleIntervals(_ context.Context, r *gctrpc.FindMissingCandlePeriodsRequest) (*gctrpc.FindMissingIntervalsResponse, error) {
if r.End == "" || r.Start == "" || r.ExchangeName == "" || r.Pair == nil || r.AssetType == "" || r.Pair.String() == "" || r.Interval <= 0 {
return nil, errInvalidArguments
}
exch := s.GetExchangeByName(r.ExchangeName)
if exch == nil {
return nil, errExchangeNotLoaded
}
cp, err := currency.NewPairFromStrings(r.Pair.Base, r.Pair.Quote)
if err != nil {
return nil, err
}
a := asset.Item(r.AssetType)
if !a.IsValid() {
return nil, errors.New("invalid asset")
}
var pairs currency.Pairs
pairs, err = exch.GetEnabledPairs(a)
if err != nil {
return nil, err
}
if !pairs.Contains(cp, false) {
return nil, errors.New("currency not enabled")
}
var UTCStartTime, UTCEndTime time.Time
UTCStartTime, err = time.Parse(common.SimpleTimeFormat, r.Start)
if err != nil {
return nil, err
}
UTCEndTime, err = time.Parse(common.SimpleTimeFormat, r.End)
if err != nil {
return nil, err
}
klineItem, err := kline.LoadFromDatabase(
r.ExchangeName,
currency.Pair{
Delimiter: r.Pair.Delimiter,
Base: currency.NewCode(r.Pair.Base),
Quote: currency.NewCode(r.Pair.Quote),
},
asset.Item(strings.ToLower(r.AssetType)),
kline.Interval(r.Interval),
UTCStartTime,
UTCEndTime,
)
if err != nil {
return nil, err
}
resp := &gctrpc.FindMissingIntervalsResponse{
ExchangeName: r.ExchangeName,
AssetType: r.AssetType,
Pair: r.Pair,
MissingPeriods: []string{},
}
var candleTimes []time.Time
for i := range klineItem.Candles {
candleTimes = append(candleTimes, klineItem.Candles[i].Time)
}
var ranges []timeperiods.TimeRange
ranges, err = timeperiods.FindTimeRangesContainingData(UTCStartTime, UTCEndTime, klineItem.Interval.Duration(), candleTimes)
if err != nil {
return nil, err
}
foundCount := 0
for i := range ranges {
if !ranges[i].HasDataInRange {
resp.MissingPeriods = append(resp.MissingPeriods,
ranges[i].StartOfRange.UTC().Format(common.SimpleTimeFormatWithTimezone)+
" - "+
ranges[i].EndOfRange.UTC().Format(common.SimpleTimeFormatWithTimezone))
} else {
foundCount++
}
}
if len(resp.MissingPeriods) == 0 {
resp.Status = fmt.Sprintf("no missing candles found between %v and %v",
r.Start,
r.End,
)
} else {
resp.Status = fmt.Sprintf("Found %v candles. Missing %v candles in requested timeframe starting %v ending %v",
foundCount,
len(resp.MissingPeriods),
UTCStartTime.In(time.UTC).Format(common.SimpleTimeFormatWithTimezone),
UTCEndTime.In(time.UTC).Format(common.SimpleTimeFormatWithTimezone))
}
return resp, nil
}
// FindMissingSavedTradeIntervals is used to help determine what trade data is missing
func (s *RPCServer) FindMissingSavedTradeIntervals(_ context.Context, r *gctrpc.FindMissingTradePeriodsRequest) (*gctrpc.FindMissingIntervalsResponse, error) {
if r.End == "" || r.Start == "" || r.ExchangeName == "" || r.Pair == nil || r.AssetType == "" || r.Pair.String() == "" {
return nil, errInvalidArguments
}
exch := s.GetExchangeByName(r.ExchangeName)
if exch == nil {
return nil, errExchangeNotLoaded
}
var err error
var UTCStartTime, UTCEndTime time.Time
UTCStartTime, err = time.Parse(common.SimpleTimeFormat, r.Start)
if err != nil {
return nil, err
}
UTCStartTime = UTCStartTime.Truncate(time.Hour)
UTCEndTime, err = time.Parse(common.SimpleTimeFormat, r.End)
if err != nil {
return nil, err
}
UTCEndTime = UTCEndTime.Truncate(time.Hour)
intervalMap := make(map[time.Time]bool)
iterationTime := UTCStartTime
for iterationTime.Before(UTCEndTime) {
intervalMap[iterationTime] = false
iterationTime = iterationTime.Add(time.Hour)
}
var cp currency.Pair
cp, err = currency.NewPairFromStrings(r.Pair.Base, r.Pair.Quote)
if err != nil {
return nil, err
}
a := asset.Item(r.AssetType)
if !a.IsValid() {
return nil, errors.New("invalid asset")
}
var pairs currency.Pairs
pairs, err = exch.GetEnabledPairs(a)
if err != nil {
return nil, err
}
if !pairs.Contains(cp, false) {
return nil, errors.New("currency not enabled")
}
var trades []trade.Data
trades, err = trade.GetTradesInRange(
r.ExchangeName,
r.AssetType,
r.Pair.Base,
r.Pair.Quote,
UTCStartTime,
UTCEndTime,
)
if err != nil {
return nil, err
}
resp := &gctrpc.FindMissingIntervalsResponse{
ExchangeName: r.ExchangeName,
AssetType: r.AssetType,
Pair: r.Pair,
MissingPeriods: []string{},
}
var tradeTimes []time.Time
for i := range trades {
tradeTimes = append(tradeTimes, trades[i].Timestamp)
}
var ranges []timeperiods.TimeRange
ranges, err = timeperiods.FindTimeRangesContainingData(UTCStartTime, UTCEndTime, time.Hour, tradeTimes)
if err != nil {
return nil, err
}
foundCount := 0
for i := range ranges {
if !ranges[i].HasDataInRange {
resp.MissingPeriods = append(resp.MissingPeriods,
ranges[i].StartOfRange.UTC().Format(common.SimpleTimeFormatWithTimezone)+
" - "+
ranges[i].EndOfRange.UTC().Format(common.SimpleTimeFormatWithTimezone))
} else {
foundCount++
}
}
if len(resp.MissingPeriods) == 0 {
resp.Status = fmt.Sprintf("no missing periods found between %v and %v",
r.Start,
r.End,
)
} else {
resp.Status = fmt.Sprintf("Found %v periods. Missing %v periods between %v and %v",
foundCount,
len(resp.MissingPeriods),
UTCStartTime.In(time.UTC).Format(common.SimpleTimeFormatWithTimezone),
UTCEndTime.In(time.UTC).Format(common.SimpleTimeFormatWithTimezone))
}
return resp, nil
}
// SetExchangeTradeProcessing allows the setting of exchange trade processing
func (s *RPCServer) SetExchangeTradeProcessing(_ context.Context, r *gctrpc.SetExchangeTradeProcessingRequest) (*gctrpc.GenericResponse, error) {
exch := s.GetExchangeByName(r.Exchange)
if exch == nil {
return nil, errExchangeNotLoaded
}
b := exch.GetBase()
b.SetSaveTradeDataStatus(r.Status)
return &gctrpc.GenericResponse{
Status: "success",
}, nil
}
// GetHistoricTrades returns trades between a set of dates
func (s *RPCServer) GetHistoricTrades(r *gctrpc.GetSavedTradesRequest, stream gctrpc.GoCryptoTrader_GetHistoricTradesServer) error {
if r.Exchange == "" || r.Pair == nil || r.AssetType == "" || r.Pair.String() == "" {
return errInvalidArguments
}
exch := s.GetExchangeByName(r.Exchange)
if exch == nil {
return errExchangeNotLoaded
}
cp, err := currency.NewPairFromStrings(r.Pair.Base, r.Pair.Quote)
if err != nil {
return err
}
a := asset.Item(r.AssetType)
if !a.IsValid() {
return errors.New("invalid asset")
}
var pairs currency.Pairs
pairs, err = exch.GetEnabledPairs(a)
if err != nil {
return err
}
if !pairs.Contains(cp, false) {
return errors.New("currency not enabled")
}
var trades []trade.Data
var UTCStartTime, UTCEndTime time.Time
UTCStartTime, err = time.Parse(common.SimpleTimeFormat, r.Start)
if err != nil {
return err
}
UTCEndTime, err = time.Parse(common.SimpleTimeFormat, r.End)
if err != nil {
return err
}
resp := &gctrpc.SavedTradesResponse{
ExchangeName: r.Exchange,
Asset: r.AssetType,
Pair: r.Pair,
}
iterateStartTime := UTCStartTime
iterateEndTime := iterateStartTime.Add(time.Hour)
for iterateStartTime.Before(UTCEndTime) {
trades, err = exch.GetHistoricTrades(cp, asset.Item(r.AssetType), iterateStartTime, iterateEndTime)
if err != nil {
return err
}
grpcTrades := &gctrpc.SavedTradesResponse{
ExchangeName: r.Exchange,
Asset: r.AssetType,
Pair: r.Pair,
}
for i := range trades {
tradeTS := trades[i].Timestamp.In(time.UTC)
if tradeTS.After(UTCEndTime) {
break
}
grpcTrades.Trades = append(grpcTrades.Trades, &gctrpc.SavedTrades{
Price: trades[i].Price,
Amount: trades[i].Amount,
Side: trades[i].Side.String(),
Timestamp: tradeTS.Format(common.SimpleTimeFormatWithTimezone),
TradeId: trades[i].TID,
})
}
stream.Send(grpcTrades)
iterateStartTime = iterateStartTime.Add(time.Hour)
iterateEndTime = iterateEndTime.Add(time.Hour)
}
stream.Send(resp)
return nil
}
// GetRecentTrades returns trades
func (s *RPCServer) GetRecentTrades(_ context.Context, r *gctrpc.GetSavedTradesRequest) (*gctrpc.SavedTradesResponse, error) {
if r.Exchange == "" || r.Pair == nil || r.AssetType == "" || r.Pair.String() == "" {
return nil, errInvalidArguments
}
exch := s.GetExchangeByName(r.Exchange)
if exch == nil {
return nil, errExchangeNotLoaded
}
cp, err := currency.NewPairFromStrings(r.Pair.Base, r.Pair.Quote)
if err != nil {
return nil, err
}
a := asset.Item(r.AssetType)
if !a.IsValid() {
return nil, errors.New("invalid asset")
}
var pairs currency.Pairs
pairs, err = exch.GetEnabledPairs(a)
if err != nil {
return nil, err
}
if !pairs.Contains(cp, false) {
return nil, errors.New("currency not enabled")
}
var trades []trade.Data
trades, err = exch.GetRecentTrades(cp, asset.Item(r.AssetType))
if err != nil {
return nil, err
}
resp := &gctrpc.SavedTradesResponse{
ExchangeName: r.Exchange,
Asset: r.AssetType,
Pair: r.Pair,
}
for i := range trades {
resp.Trades = append(resp.Trades, &gctrpc.SavedTrades{
Price: trades[i].Price,
Amount: trades[i].Amount,
Side: trades[i].Side.String(),
Timestamp: trades[i].Timestamp.In(time.UTC).Format(common.SimpleTimeFormatWithTimezone),
TradeId: trades[i].TID,
})
}
if len(resp.Trades) == 0 {
return nil, fmt.Errorf("request for %v %v trade data and returned no results", r.Exchange, r.AssetType)
}
return resp, nil
}