Files
gocryptotrader/engine/orders.go
Andrew f6fd94ea69 Engine: Scripting support (#383)
* WIP

* updated appveyor and increased deadline 5 seconds due to increased linters being added

* revert files to upstream/engine

* WIP

* WIP

* mod file changes

* added script manager

* Added manager/and cli interfaces to scripting

* Added script task handler

* WIP - Added timer/repeat support and fleshed out wrapper further

* autoload support added + WIP

* WIP commit

* added account balance info

* btc markets temp work around

* WIP - merged with upstream for new order package BTC Markets responses broken

* Cancel order wrapper WIP

*  order wrapper update

* Added test coverage for VM

* moved to map for VM List shutdown of all VM now handled added gctcli commands for list and stop of running scripts

* added override to load/execute for path

* fixed incorrect channel shutdown added further test coverage and restructured gctcli commands into sub commands

* increased test coverage for packages

* Added docs cleaned up tests and example scripts

* Test coverage increased for module/gct/exchange package

* windows fixes

* merged upstream/engine

* WIP

* logger fixes - removed pointer to bool check removed duplicate test check for logger

* remove unused mutex

* added inital upload support

* fix linter issues for go-fmt

* added zip support for uploading and added base for fund withdrawing

* changed error return types and also log errors, fix zip path issue

* improved error outputs and code flow

* pairs response fix added protobuf defs for stop all and list all

* added stop all running scripts general clean up and moved across to OrderManager

* linter fixes (gofmt)

* added list all command

* rewrote zip handler to be cleaner also fixed file overwrite on upload

* added query command reworked tests

* added further error checking to compileandrun corrected use of pointers for accountinfo

* bumped tengo version

* Removed named returns reworded log messages removed unused falseptr

* WIP

* Added virutal machine limit improved config options

* added model for script event added upload validation

* script_event table has been completed, tests for wrapper functions implemented

* README updates

* reverted changes opened new PR to move withdraw struct outs

* intial work on adding withdraw support after merger of withdraw package

* started work on examples

* Added crypto withdraw support

* fix switch case assignment and gofmt project

* Reworking Fiat withdraw request pending #402

* removed double pointer call

* added withdraw support for fiat  currencies

* added tests for withdraw methods increased readme

* removed local tengo require and also fix linter issues

* Added default log size const added basic test for invalid script execution

* First pass at moving wrapper to validator package to allow proper validation of uploaded scripts

* Added script details to README added config test added test for no file extension

* moved tests to const and fixed incorrect pathing

* added test coverage to withdraw package

* corrected file close handling

* point to included configtest.json

* extended validator support when a script is uploaded

* Bug fix on bool logic

* Added mutex

* Don't create autit events on test execution

* reverted common to master

* moved file rename to unix timestamp format

* converted logger enabled back to pointer as i need nilness check also moved scriptid to text over blob

* started work on autoload add/remove support

* First round of PR fixes (mostly commented exports)

* Moved GCTScript load to last, removed unneeded error from cleanup()

* Comment clairty for AuitEventID

* added autoload add/remove command to cli

* added tests for autoload

* Test updates for Exchanges

* linter fixes (gofmt)

* Removed double check of engine pointer

* remove possible nil pointer on GetSpecificTicker

* Fixed not closing file handler on write that causes archive removal to fail

* file handler Close clean ups

* corrected spelling on error return and return invalid name n autoload

* moved strings to cosnt moved bool pointer creation to convert package

* new zip extractor added

* Validation has been added to archive uploads

* removed shadow var on err

* added ok check to conversion

* converted condition check

* basic test for zip extract added

* new zip handler

* reverted back to old atomic loading system

* removed shadow err

* lets add a new line

* added space to error return

* command line toggle for script now works properly

* readme updated

* set configLoaded to true

* check for configLoaded condition

* added mutex to allow for multiple access on virtual machine increased test coverage disable script manager if scripting is disabled

* linked up to enable/disablesubsystem commands

* added start/stop example to readme

* reworked logic on test as check should be done on Load()

* updated to tengo v2

* linters

* lower time on ntp client to stop slippage

* remove all fails if any fail validtion from an archive

* remove vm from list if timer is invalid

* removed shadow on err

* remove config creation from NTPCheck test

* WIP testing DB changes

* add unique constraint

* WIP: created has many model

* linters run

* basic sqlite3 support added for new database format

* linters run

* Added test coverage for script repo

* removed unused print

* updated env vars for CI instances

* updated env vars for CI instances

* Updated test packages

* Test updates for postgresql

* removed invalid tests from postgres

* remove duplication of struct and improved code flow

* general cleanup

* wording changes on log output

* use databasemgr logger and add support for autoload without file extension

* corrected test naming

* return correct error

* return correct error again version 82

* store scriptdata on creation

* Hello

* Errorln -> Errorf

* Removed unused vars

* Read me updates

* testing without parallel

* comment on exported type

* added nil check against VM for test

* add debugging information

* gofmt

* remove verbose and data sent to channel

* Added debug information

* linter fixes (gofmt)

* remove unused CompileAndRun() call

* test sleep to see if issue is timing related

* semi-concurrent map fixes

* one day i will run gofmt or setup precommit hooks

* new line :D

* increased test coverage

* added correct sleep time

* Moved over to sync map

* linter fixes (gofmt)

* goimports

* moved VM related methods to vm.go

* new line at end of file

* trying increased timeout on golangci-lint for appveyor

* add debugging information

* removed timeout

* reworked timeout logic

* linter fixes (gofmt)

* increased test coverage

* increased test coverage

* one day i will run gofmt or setup precommit hooks

* removed unused exchange test

* increased golangci-lint timeout

* Added nil check on shutdown and test coverage for it lowered timeout back to 1:30

* reworked ID system

* removed script hash as it was unused

* added comments on exported methods and read me update

* reorder code

* removed to atomic.value for test execution flag

* increased test coverage

* move add further up execution

* point to correct script file
2020-01-23 13:54:04 +11:00

283 lines
6.7 KiB
Go

package engine
import (
"errors"
"fmt"
"sync/atomic"
"time"
"github.com/gofrs/uuid"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/communications/base"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
log "github.com/thrasher-corp/gocryptotrader/logger"
)
// vars for the fund manager package
var (
OrderManagerDelay = time.Second * 10
ErrOrdersAlreadyExists = errors.New("order already exists")
)
func (o *orderStore) Get() map[string][]order.Detail {
o.m.Lock()
defer o.m.Unlock()
return o.Orders
}
func (o *orderStore) exists(order *order.Detail) bool {
r, ok := o.Orders[order.Exchange]
if !ok {
return false
}
for x := range r {
if r[x].ID == order.ID {
return true
}
}
return false
}
func (o *orderStore) Add(order *order.Detail) error {
o.m.Lock()
defer o.m.Unlock()
if o.exists(order) {
return ErrOrdersAlreadyExists
}
orders := o.Orders[order.Exchange]
orders = append(orders, *order)
o.Orders[order.Exchange] = orders
return nil
}
func (o *orderManager) Started() bool {
return atomic.LoadInt32(&o.started) == 1
}
func (o *orderManager) Start() error {
if atomic.AddInt32(&o.started, 1) != 1 {
return errors.New("order manager already started")
}
log.Debugln(log.OrderBook, "Order manager starting...")
o.shutdown = make(chan struct{})
o.orderStore.Orders = make(map[string][]order.Detail)
go o.run()
return nil
}
func (o *orderManager) Stop() error {
if atomic.LoadInt32(&o.started) == 0 {
return errors.New("order manager not started")
}
if atomic.AddInt32(&o.stopped, 1) != 1 {
return errors.New("order manager is already stopped")
}
defer func() {
atomic.CompareAndSwapInt32(&o.stopped, 1, 0)
atomic.CompareAndSwapInt32(&o.started, 1, 0)
}()
log.Debugln(log.OrderBook, "Order manager shutting down...")
close(o.shutdown)
return nil
}
func (o *orderManager) gracefulShutdown() {
if o.cfg.CancelOrdersOnShutdown {
log.Debugln(log.OrderMgr, "Order manager: Cancelling any open orders...")
orders := o.orderStore.Get()
if orders == nil {
return
}
for k, v := range orders {
log.Debugf(log.OrderMgr, "Order manager: Cancelling order(s) for exchange %s.\n", k)
for y := range v {
log.Debugf(log.OrderMgr, "order manager: Cancelling order ID %v [%v]",
v[y].ID, v[y])
err := o.Cancel(k, &order.Cancel{
OrderID: v[y].ID,
})
if err != nil {
msg := fmt.Sprintf("Order manager: Exchange %s unable to cancel order ID=%v. Err: %s",
k, v[y].ID, err)
log.Debugln(log.OrderBook, msg)
Bot.CommsManager.PushEvent(base.Event{
Type: "order",
Message: msg,
})
continue
}
msg := fmt.Sprintf("Order manager: Exchange %s order ID=%v cancelled.",
k, v[y].ID)
log.Debugln(log.OrderBook, msg)
Bot.CommsManager.PushEvent(base.Event{
Type: "order",
Message: msg,
})
}
}
}
}
func (o *orderManager) run() {
log.Debugln(log.OrderBook, "Order manager started.")
tick := time.NewTicker(OrderManagerDelay)
Bot.ServicesWG.Add(1)
defer func() {
log.Debugln(log.OrderMgr, "Order manager shutdown.")
tick.Stop()
Bot.ServicesWG.Done()
}()
for {
select {
case <-o.shutdown:
o.gracefulShutdown()
return
case <-tick.C:
o.processOrders()
}
}
}
func (o *orderManager) CancelAllOrders() {}
func (o *orderManager) Cancel(exchName string, cancel *order.Cancel) error {
if exchName == "" {
return errors.New("order exchange name is empty")
}
if cancel == nil {
return errors.New("order cancel param is nil")
}
if cancel.OrderID == "" {
return errors.New("order id is empty")
}
exch := GetExchangeByName(exchName)
if exch == nil {
return errors.New("unable to get exchange by name")
}
if cancel.AssetType.String() != "" && !exch.GetAssetTypes().Contains(cancel.AssetType) {
return errors.New("order asset type not supported by exchange")
}
return exch.CancelOrder(cancel)
}
func (o *orderManager) Submit(exchName string, newOrder *order.Submit) (*orderSubmitResponse, error) {
if exchName == "" {
return nil, errors.New("order exchange name must be specified")
}
if err := newOrder.Validate(); err != nil {
return nil, err
}
if o.cfg.EnforceLimitConfig {
if !o.cfg.AllowMarketOrders && newOrder.OrderType == order.Market {
return nil, errors.New("order market type is not allowed")
}
if o.cfg.LimitAmount > 0 && newOrder.Amount > o.cfg.LimitAmount {
return nil, errors.New("order limit exceeds allowed limit")
}
if len(o.cfg.AllowedExchanges) > 0 &&
!common.StringDataCompareInsensitive(o.cfg.AllowedExchanges, exchName) {
return nil, errors.New("order exchange not found in allowed list")
}
if len(o.cfg.AllowedPairs) > 0 && !o.cfg.AllowedPairs.Contains(newOrder.Pair, true) {
return nil, errors.New("order pair not found in allowed list")
}
}
exch := GetExchangeByName(exchName)
if exch == nil {
return nil, errors.New("unable to get exchange by name")
}
id, err := uuid.NewV4()
if err != nil {
log.Warnf(log.OrderMgr,
"Order manager: Unable to generate UUID. Err: %s\n",
err)
}
result, err := exch.SubmitOrder(newOrder)
if err != nil {
return nil, err
}
if !result.IsOrderPlaced {
return nil, errors.New("order unable to be placed")
}
msg := fmt.Sprintf("Order manager: Exchange %s submitted order ID=%v [Ours: %v] pair=%v price=%v amount=%v side=%v type=%v.",
exchName,
result.OrderID,
id.String(),
newOrder.Pair,
newOrder.Price,
newOrder.Amount,
newOrder.OrderSide,
newOrder.OrderType)
log.Debugln(log.OrderMgr, msg)
Bot.CommsManager.PushEvent(base.Event{
Type: "order",
Message: msg,
})
return &orderSubmitResponse{
SubmitResponse: order.SubmitResponse{
OrderID: result.OrderID,
},
OurOrderID: id.String(),
}, nil
}
func (o *orderManager) processOrders() {
authExchanges := GetAuthAPISupportedExchanges()
for x := range authExchanges {
log.Debugf(log.OrderMgr, "Order manager: Procesing orders for exchange %v.\n", authExchanges[x])
exch := GetExchangeByName(authExchanges[x])
req := order.GetOrdersRequest{
OrderSide: order.AnySide,
OrderType: order.AnyType,
}
result, err := exch.GetActiveOrders(&req)
if err != nil {
log.Warnf(log.OrderMgr, "Order manager: Unable to get active orders: %s\n", err)
continue
}
for x := range result {
ord := &result[x]
result := o.orderStore.Add(ord)
if result != ErrOrdersAlreadyExists {
msg := fmt.Sprintf("Order manager: Exchange %s added order ID=%v pair=%v price=%v amount=%v side=%v type=%v.",
ord.Exchange, ord.ID, ord.CurrencyPair, ord.Price, ord.Amount, ord.OrderSide, ord.OrderType)
log.Debugf(log.OrderMgr, "%v\n", msg)
Bot.CommsManager.PushEvent(base.Event{
Type: "order",
Message: msg,
})
continue
}
}
}
}