mirror of
https://github.com/d0zingcat/gocryptotrader.git
synced 2026-05-13 23:16:45 +00:00
* orderbook: change Base struct name to Snapshot * linter: fix * Update exchanges/exchange.go Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com> * Update exchanges/orderbook/depth.go Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com> * Update exchanges/orderbook/orderbook_types.go Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com> * Snapshot -> Book * Tranche(s) -> Level(s) * Tranche(s) -> Level(s) * rm tranche ref * linter: fix * linter: rides again * update tests * Update exchange/websocket/buffer/buffer.go Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com> * Update backtester/eventhandlers/exchange/slippage/slippage.go Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com> * Update exchange/websocket/buffer/buffer.go Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io> * Update exchange/websocket/buffer/buffer.go Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io> * Update exchanges/orderbook/orderbook_test.go Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io> * Update exchanges/orderbook/orderbook_test.go Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io> * Update exchanges/orderbook/orderbook_test.go Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io> * Update exchanges/orderbook/orderbook_types.go Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io> * Update exchanges/orderbook/orderbook_types.go Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io> * fixup tests * Update exchanges/orderbook/orderbook_test.go Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com> * Update exchanges/orderbook/orderbook_test.go Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com> * Update exchanges/orderbook/orderbook_test.go Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com> * Update exchanges/orderbook/orderbook_test.go Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com> * Update exchanges/orderbook/orderbook_test.go Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com> * Update exchanges/orderbook/orderbook_test.go Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com> * Update exchanges/orderbook/orderbook_test.go Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com> * Update exchanges/orderbook/orderbook_test.go Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com> * Update exchanges/orderbook/orderbook_types.go Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com> * Update exchanges/orderbook/orderbook_types.go Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com> * gk: nits and rm stuff that is not needed * Update exchanges/orderbook/orderbook_test.go Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com> * gk: nits --------- Co-authored-by: shazbert <ryan.oharareid@thrasher.io> Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com> Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io>
923 lines
26 KiB
Go
923 lines
26 KiB
Go
package engine
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"net/http/pprof"
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/gorilla/mux"
|
|
gws "github.com/gorilla/websocket"
|
|
"github.com/thrasher-corp/gocryptotrader/common"
|
|
"github.com/thrasher-corp/gocryptotrader/config"
|
|
"github.com/thrasher-corp/gocryptotrader/currency"
|
|
"github.com/thrasher-corp/gocryptotrader/encoding/json"
|
|
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
|
|
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
|
|
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
|
|
"github.com/thrasher-corp/gocryptotrader/log"
|
|
)
|
|
|
|
// setupAPIServerManager checks and creates an api server manager
|
|
func setupAPIServerManager(remoteConfig *config.RemoteControlConfig, pprofConfig *config.Profiler, exchangeManager iExchangeManager, bot iBot, portfolioManager iPortfolioManager, configPath string) (*apiServerManager, error) {
|
|
if remoteConfig == nil {
|
|
return nil, errNilRemoteConfig
|
|
}
|
|
if pprofConfig == nil {
|
|
return nil, errNilPProfConfig
|
|
}
|
|
if exchangeManager == nil {
|
|
return nil, errNilExchangeManager
|
|
}
|
|
if bot == nil {
|
|
return nil, errNilBot
|
|
}
|
|
if configPath == "" {
|
|
return nil, errEmptyConfigPath
|
|
}
|
|
return &apiServerManager{
|
|
remoteConfig: remoteConfig,
|
|
pprofConfig: pprofConfig,
|
|
restListenAddress: remoteConfig.DeprecatedRPC.ListenAddress,
|
|
websocketListenAddress: remoteConfig.WebsocketRPC.ListenAddress,
|
|
exchangeManager: exchangeManager,
|
|
bot: bot,
|
|
gctConfigPath: configPath,
|
|
portfolioManager: portfolioManager,
|
|
}, nil
|
|
}
|
|
|
|
// IsRESTServerRunning safely checks whether the subsystem is running
|
|
func (m *apiServerManager) IsRESTServerRunning() bool {
|
|
if m == nil {
|
|
return false
|
|
}
|
|
return atomic.LoadInt32(&m.restStarted) == 1
|
|
}
|
|
|
|
// IsWebsocketServerRunning safely checks whether the subsystem is running
|
|
func (m *apiServerManager) IsWebsocketServerRunning() bool {
|
|
if m == nil {
|
|
return false
|
|
}
|
|
return atomic.LoadInt32(&m.websocketStarted) == 1
|
|
}
|
|
|
|
// StopRESTServer attempts to shutdown the subsystem
|
|
func (m *apiServerManager) StopRESTServer() error {
|
|
if m == nil {
|
|
return fmt.Errorf("api server %w", ErrNilSubsystem)
|
|
}
|
|
if !atomic.CompareAndSwapInt32(&m.restStarted, 1, 0) {
|
|
return fmt.Errorf("apiserver deprecated server %w", ErrSubSystemNotStarted)
|
|
}
|
|
err := m.restHTTPServer.Shutdown(context.Background())
|
|
if err != nil && !errors.Is(err, http.ErrServerClosed) {
|
|
return err
|
|
}
|
|
m.wgRest.Wait()
|
|
m.restRouter = nil
|
|
return nil
|
|
}
|
|
|
|
func (m *apiServerManager) StopWebsocketServer() error {
|
|
if m == nil {
|
|
return fmt.Errorf("api server %w", ErrNilSubsystem)
|
|
}
|
|
if !atomic.CompareAndSwapInt32(&m.websocketStarted, 1, 0) {
|
|
return fmt.Errorf("apiserver websocket server %w", ErrSubSystemNotStarted)
|
|
}
|
|
|
|
err := m.websocketHTTPServer.Shutdown(context.Background())
|
|
if err != nil && !errors.Is(err, http.ErrServerClosed) {
|
|
return err
|
|
}
|
|
m.websocketRouter = nil
|
|
m.websocketHub = nil
|
|
m.wgWebsocket.Wait()
|
|
m.websocketHTTPServer = nil
|
|
return nil
|
|
}
|
|
|
|
// newRouter takes in the exchange interfaces and returns a new multiplexer
|
|
// router
|
|
func (m *apiServerManager) newRouter(isREST bool) *mux.Router {
|
|
router := mux.NewRouter().StrictSlash(true)
|
|
var routes []Route
|
|
if common.ExtractPortOrDefault(m.websocketListenAddress) == 80 {
|
|
m.websocketListenAddress = common.ExtractHostOrDefault(m.websocketListenAddress)
|
|
} else {
|
|
m.websocketListenAddress = common.ExtractHostOrDefault(m.websocketListenAddress) + ":" +
|
|
strconv.Itoa(common.ExtractPortOrDefault(m.websocketListenAddress))
|
|
}
|
|
|
|
if isREST {
|
|
routes = []Route{
|
|
{"", http.MethodGet, "/", m.getIndex},
|
|
{"GetAllSettings", http.MethodGet, "/config/all", m.restGetAllSettings},
|
|
{"SaveAllSettings", http.MethodPost, "/config/all/save", m.restSaveAllSettings},
|
|
{"AllEnabledAccountInfo", http.MethodGet, "/exchanges/enabled/accounts/all", m.restGetAllEnabledAccountInfo},
|
|
{"AllActiveExchangesAndCurrencies", http.MethodGet, "/exchanges/enabled/latest/all", m.restGetAllActiveTickers},
|
|
{"GetPortfolio", http.MethodGet, "/portfolio/all", m.restGetPortfolio},
|
|
{"AllActiveExchangesAndOrderbooks", http.MethodGet, "/exchanges/orderbook/latest/all", m.restGetAllActiveOrderbooks},
|
|
}
|
|
|
|
if m.pprofConfig.Enabled {
|
|
if m.pprofConfig.MutexProfileFraction > 0 {
|
|
runtime.SetMutexProfileFraction(m.pprofConfig.MutexProfileFraction)
|
|
}
|
|
log.Debugf(log.RESTSys,
|
|
"HTTP Go performance profiler (pprof) endpoint enabled: http://%s:%d/debug/pprof/\n",
|
|
common.ExtractHostOrDefault(m.websocketListenAddress),
|
|
common.ExtractPortOrDefault(m.websocketListenAddress),
|
|
)
|
|
router.PathPrefix("/debug/pprof/").HandlerFunc(pprof.Index)
|
|
}
|
|
} else {
|
|
routes = []Route{
|
|
{"ws", http.MethodGet, "/ws", m.WebsocketClientHandler},
|
|
}
|
|
}
|
|
|
|
for _, route := range routes {
|
|
router.
|
|
Methods(route.Method).
|
|
Path(route.Pattern).
|
|
Name(route.Name).
|
|
Handler(restLogger(route.HandlerFunc, route.Name)).
|
|
Host(m.websocketListenAddress)
|
|
}
|
|
return router
|
|
}
|
|
|
|
// StartRESTServer starts a REST handler
|
|
func (m *apiServerManager) StartRESTServer() error {
|
|
if !atomic.CompareAndSwapInt32(&m.restStarted, 0, 1) {
|
|
return fmt.Errorf("rest server %w", errAlreadyRunning)
|
|
}
|
|
if !m.remoteConfig.DeprecatedRPC.Enabled {
|
|
atomic.StoreInt32(&m.restStarted, 0)
|
|
return fmt.Errorf("rest %w", errServerDisabled)
|
|
}
|
|
log.Debugf(log.RESTSys,
|
|
"Deprecated RPC handler support enabled. Listen URL: http://%s:%d\n",
|
|
common.ExtractHostOrDefault(m.restListenAddress),
|
|
common.ExtractPortOrDefault(m.restListenAddress),
|
|
)
|
|
m.restRouter = m.newRouter(true)
|
|
if m.restHTTPServer == nil {
|
|
m.restHTTPServer = &http.Server{
|
|
Addr: m.restListenAddress,
|
|
Handler: m.restRouter,
|
|
ReadHeaderTimeout: time.Minute,
|
|
}
|
|
}
|
|
m.wgRest.Add(1)
|
|
go func() {
|
|
defer m.wgRest.Done()
|
|
err := m.restHTTPServer.ListenAndServe()
|
|
if err != nil {
|
|
atomic.StoreInt32(&m.restStarted, 0)
|
|
if !errors.Is(err, http.ErrServerClosed) {
|
|
log.Errorln(log.APIServerMgr, err)
|
|
}
|
|
}
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
// restLogger logs the requests internally
|
|
func restLogger(inner http.Handler, name string) http.Handler {
|
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
start := time.Now()
|
|
inner.ServeHTTP(w, r)
|
|
|
|
log.Debugf(log.RESTSys,
|
|
"%s\t%s\t%s\t%s",
|
|
r.Method,
|
|
r.RequestURI,
|
|
name,
|
|
time.Since(start),
|
|
)
|
|
})
|
|
}
|
|
|
|
// writeResponse outputs a JSON response of the response interface
|
|
func writeResponse(w http.ResponseWriter, response any) error {
|
|
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
|
|
w.WriteHeader(http.StatusOK)
|
|
return json.NewEncoder(w).Encode(response)
|
|
}
|
|
|
|
// handleError prints the REST method and error
|
|
func handleError(method string, err error) {
|
|
log.Errorf(log.APIServerMgr, "RESTful %s: handler failed to send JSON response. Error %s\n",
|
|
method, err)
|
|
}
|
|
|
|
// restGetAllSettings replies to a request with an encoded JSON response about the
|
|
// trading Bots configuration.
|
|
func (m *apiServerManager) restGetAllSettings(w http.ResponseWriter, r *http.Request) {
|
|
err := writeResponse(w, config.GetConfig())
|
|
if err != nil {
|
|
handleError(r.Method, err)
|
|
}
|
|
}
|
|
|
|
// restSaveAllSettings saves all current settings from request body as a JSON
|
|
// document then reloads state and returns the settings
|
|
func (m *apiServerManager) restSaveAllSettings(w http.ResponseWriter, r *http.Request) {
|
|
// Get the data from the request
|
|
decoder := json.NewDecoder(r.Body)
|
|
var responseData config.Post
|
|
err := decoder.Decode(&responseData)
|
|
if err != nil {
|
|
handleError(r.Method, err)
|
|
}
|
|
// Save change the settings
|
|
cfg := config.GetConfig()
|
|
err = cfg.UpdateConfig(m.gctConfigPath, &responseData.Data, false)
|
|
if err != nil {
|
|
handleError(r.Method, err)
|
|
}
|
|
|
|
err = writeResponse(w, cfg)
|
|
if err != nil {
|
|
handleError(r.Method, err)
|
|
}
|
|
err = m.bot.SetupExchanges()
|
|
if err != nil {
|
|
handleError(r.Method, err)
|
|
}
|
|
}
|
|
|
|
// restGetAllActiveOrderbooks returns all enabled exchange orderbooks
|
|
func (m *apiServerManager) restGetAllActiveOrderbooks(w http.ResponseWriter, r *http.Request) {
|
|
var response AllEnabledExchangeOrderbooks
|
|
response.Data = getAllActiveOrderbooks(m.exchangeManager)
|
|
err := writeResponse(w, response)
|
|
if err != nil {
|
|
handleError(r.Method, err)
|
|
}
|
|
}
|
|
|
|
// restGetPortfolio returns the Bot portfolio manager
|
|
func (m *apiServerManager) restGetPortfolio(w http.ResponseWriter, r *http.Request) {
|
|
result := m.portfolioManager.GetPortfolioSummary()
|
|
err := writeResponse(w, result)
|
|
if err != nil {
|
|
handleError(r.Method, err)
|
|
}
|
|
}
|
|
|
|
// restGetAllActiveTickers returns all active tickers
|
|
func (m *apiServerManager) restGetAllActiveTickers(w http.ResponseWriter, r *http.Request) {
|
|
var response AllEnabledExchangeCurrencies
|
|
response.Data = getAllActiveTickers(m.exchangeManager)
|
|
err := writeResponse(w, response)
|
|
if err != nil {
|
|
handleError(r.Method, err)
|
|
}
|
|
}
|
|
|
|
// restGetAllEnabledAccountInfo via get request returns JSON response of account
|
|
// info
|
|
func (m *apiServerManager) restGetAllEnabledAccountInfo(w http.ResponseWriter, r *http.Request) {
|
|
response := getAllActiveAccounts(m.exchangeManager)
|
|
err := writeResponse(w, response)
|
|
if err != nil {
|
|
handleError(r.Method, err)
|
|
}
|
|
}
|
|
|
|
// getIndex returns an HTML snippet for when a user requests the index URL
|
|
func (m *apiServerManager) getIndex(w http.ResponseWriter, _ *http.Request) {
|
|
_, err := fmt.Fprint(w, restIndexResponse)
|
|
if err != nil {
|
|
log.Errorln(log.APIServerMgr, err)
|
|
}
|
|
w.WriteHeader(http.StatusOK)
|
|
}
|
|
|
|
// getAllActiveOrderbooks returns all enabled exchanges orderbooks
|
|
func getAllActiveOrderbooks(m iExchangeManager) []EnabledExchangeOrderbooks {
|
|
exchanges, err := m.GetExchanges()
|
|
if err != nil {
|
|
log.Errorf(log.APIServerMgr, "Cannot get exchanges: %v", err)
|
|
return nil
|
|
}
|
|
|
|
orderbookData := make([]EnabledExchangeOrderbooks, 0, len(exchanges))
|
|
for _, e := range exchanges {
|
|
var orderbooks []orderbook.Book
|
|
for _, a := range e.GetAssetTypes(true) {
|
|
pairs, err := e.GetEnabledPairs(a)
|
|
if err != nil {
|
|
log.Errorf(log.APIServerMgr, "Exchange %s could not retrieve enabled currencies. Err: %s\n", e.GetName(), err)
|
|
continue
|
|
}
|
|
for _, pair := range pairs {
|
|
ob, err := e.GetCachedOrderbook(pair, a)
|
|
if err != nil {
|
|
log.Errorf(log.APIServerMgr, "Exchange %s failed to retrieve %s orderbook. Err: %s\n", e.GetName(), pair, err)
|
|
continue
|
|
}
|
|
orderbooks = append(orderbooks, *ob)
|
|
}
|
|
}
|
|
orderbookData = append(orderbookData, EnabledExchangeOrderbooks{ExchangeName: e.GetName(), ExchangeValues: orderbooks})
|
|
}
|
|
return orderbookData
|
|
}
|
|
|
|
// getAllActiveTickers returns all enabled exchanges tickers
|
|
func getAllActiveTickers(m iExchangeManager) []EnabledExchangeCurrencies {
|
|
exchanges, err := m.GetExchanges()
|
|
if err != nil {
|
|
log.Errorf(log.APIServerMgr, "Cannot get exchanges: %v", err)
|
|
return nil
|
|
}
|
|
|
|
exchangeTickers := make([]EnabledExchangeCurrencies, 0, len(exchanges))
|
|
for _, e := range exchanges {
|
|
var tickers []*ticker.Price
|
|
for _, a := range e.GetAssetTypes(true) {
|
|
pairs, err := e.GetEnabledPairs(a)
|
|
if err != nil {
|
|
log.Errorf(log.APIServerMgr, "Exchange %s could not retrieve enabled currencies. Err: %s\n", e.GetName(), err)
|
|
continue
|
|
}
|
|
for _, pair := range pairs {
|
|
t, err := e.GetCachedTicker(pair, a)
|
|
if err != nil {
|
|
log.Errorf(log.APIServerMgr, "Exchange %s failed to retrieve %s ticker. Err: %s\n", e.GetName(), pair.String(), err)
|
|
continue
|
|
}
|
|
tickers = append(tickers, t)
|
|
}
|
|
}
|
|
exchangeTickers = append(exchangeTickers, EnabledExchangeCurrencies{ExchangeName: e.GetName(), ExchangeValues: tickers})
|
|
}
|
|
return exchangeTickers
|
|
}
|
|
|
|
// getAllActiveAccounts returns all enabled exchanges accounts
|
|
func getAllActiveAccounts(m iExchangeManager) []AllEnabledExchangeAccounts {
|
|
exchanges, err := m.GetExchanges()
|
|
if err != nil {
|
|
log.Errorf(log.APIServerMgr, "Cannot get exchanges: %v", err)
|
|
return nil
|
|
}
|
|
|
|
accounts := make([]AllEnabledExchangeAccounts, 0, len(exchanges))
|
|
for x := range exchanges {
|
|
assets := exchanges[x].GetAssetTypes(true)
|
|
exchName := exchanges[x].GetName()
|
|
var exchangeAccounts AllEnabledExchangeAccounts
|
|
for y := range assets {
|
|
a, err := exchanges[x].GetCachedAccountInfo(context.TODO(), assets[y])
|
|
if err != nil {
|
|
log.Errorf(log.APIServerMgr,
|
|
"Exchange %s failed to retrieve %s ticker. Err: %s\n",
|
|
exchName,
|
|
assets[y],
|
|
err)
|
|
continue
|
|
}
|
|
exchangeAccounts.Data = append(exchangeAccounts.Data, a)
|
|
}
|
|
accounts = append(accounts, exchangeAccounts)
|
|
}
|
|
return accounts
|
|
}
|
|
|
|
// StartWebsocketServer starts a Websocket handler
|
|
func (m *apiServerManager) StartWebsocketServer() error {
|
|
if !atomic.CompareAndSwapInt32(&m.websocketStarted, 0, 1) {
|
|
return fmt.Errorf("websocket server %w", errAlreadyRunning)
|
|
}
|
|
if !m.remoteConfig.WebsocketRPC.Enabled {
|
|
atomic.StoreInt32(&m.websocketStarted, 0)
|
|
return fmt.Errorf("websocket %w", errServerDisabled)
|
|
}
|
|
log.Debugf(log.APIServerMgr,
|
|
"Websocket RPC support enabled. Listen URL: ws://%s:%d/ws\n",
|
|
common.ExtractHostOrDefault(m.websocketListenAddress),
|
|
common.ExtractPortOrDefault(m.websocketListenAddress),
|
|
)
|
|
m.websocketRouter = m.newRouter(false)
|
|
if m.websocketHTTPServer == nil {
|
|
m.websocketHTTPServer = &http.Server{
|
|
Addr: m.websocketListenAddress,
|
|
Handler: m.websocketRouter,
|
|
ReadHeaderTimeout: time.Minute,
|
|
}
|
|
}
|
|
|
|
m.wgWebsocket.Add(1)
|
|
go func() {
|
|
defer m.wgWebsocket.Done()
|
|
err := m.websocketHTTPServer.ListenAndServe()
|
|
if err != nil {
|
|
atomic.StoreInt32(&m.websocketStarted, 0)
|
|
if !errors.Is(err, http.ErrServerClosed) {
|
|
log.Errorln(log.APIServerMgr, err)
|
|
}
|
|
}
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
// newWebsocketHub Creates a new websocket hub
|
|
func newWebsocketHub() *websocketHub {
|
|
return &websocketHub{
|
|
Broadcast: make(chan []byte),
|
|
Register: make(chan *websocketClient),
|
|
Unregister: make(chan *websocketClient),
|
|
Clients: make(map[*websocketClient]bool),
|
|
}
|
|
}
|
|
|
|
func (h *websocketHub) run() {
|
|
for {
|
|
select {
|
|
case client := <-h.Register:
|
|
h.Clients[client] = true
|
|
case client := <-h.Unregister:
|
|
if _, ok := h.Clients[client]; ok {
|
|
log.Debugln(log.APIServerMgr, "websocket: disconnected client")
|
|
delete(h.Clients, client)
|
|
close(client.Send)
|
|
}
|
|
case message := <-h.Broadcast:
|
|
for client := range h.Clients {
|
|
select {
|
|
case client.Send <- message:
|
|
default:
|
|
log.Debugln(log.APIServerMgr, "websocket: disconnected client")
|
|
close(client.Send)
|
|
delete(h.Clients, client)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// SendWebsocketMessage sends a websocket event to the client
|
|
func (c *websocketClient) SendWebsocketMessage(evt any) error {
|
|
data, err := json.Marshal(evt)
|
|
if err != nil {
|
|
log.Errorf(log.APIServerMgr, "websocket: failed to send message: %s\n", err)
|
|
return err
|
|
}
|
|
|
|
c.Send <- data
|
|
return nil
|
|
}
|
|
|
|
func (c *websocketClient) read() {
|
|
defer func() {
|
|
c.Hub.Unregister <- c
|
|
conErr := c.Conn.Close()
|
|
if conErr != nil {
|
|
log.Errorln(log.APIServerMgr, conErr)
|
|
}
|
|
}()
|
|
|
|
for {
|
|
msgType, message, err := c.Conn.ReadMessage()
|
|
if err != nil {
|
|
if gws.IsUnexpectedCloseError(err, gws.CloseGoingAway, gws.CloseAbnormalClosure) {
|
|
log.Errorf(log.APIServerMgr, "websocket: client disconnected, err: %s\n", err)
|
|
}
|
|
break
|
|
}
|
|
|
|
if msgType == gws.TextMessage {
|
|
var evt WebsocketEvent
|
|
err := json.Unmarshal(message, &evt)
|
|
if err != nil {
|
|
log.Errorf(log.APIServerMgr, "websocket: failed to decode JSON sent from client %s\n", err)
|
|
continue
|
|
}
|
|
|
|
if evt.Event == "" {
|
|
log.Warnln(log.APIServerMgr, "websocket: client sent a blank event, disconnecting")
|
|
continue
|
|
}
|
|
|
|
dataJSON, err := json.Marshal(evt.Data)
|
|
if err != nil {
|
|
log.Errorln(log.APIServerMgr, "websocket: client sent data we couldn't JSON decode")
|
|
break
|
|
}
|
|
|
|
req := strings.ToLower(evt.Event)
|
|
log.Debugf(log.APIServerMgr, "websocket: request received: %s\n", req)
|
|
|
|
result, ok := wsHandlers[req]
|
|
if !ok {
|
|
log.Debugln(log.APIServerMgr, "websocket: unsupported event")
|
|
continue
|
|
}
|
|
|
|
if result.authRequired && !c.Authenticated {
|
|
log.Warnf(log.APIServerMgr, "Websocket: request %s failed due to unauthenticated request on an authenticated API\n", evt.Event)
|
|
err = c.SendWebsocketMessage(WebsocketEventResponse{Event: evt.Event, Error: "unauthorised request on authenticated API"})
|
|
if err != nil {
|
|
log.Errorln(log.APIServerMgr, err)
|
|
}
|
|
continue
|
|
}
|
|
|
|
err = result.handler(c, dataJSON)
|
|
if err != nil {
|
|
log.Errorf(log.APIServerMgr, "websocket: request %s failed. Error %s\n", evt.Event, err)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *websocketClient) write() {
|
|
defer func() {
|
|
err := c.Conn.Close()
|
|
if err != nil {
|
|
log.Errorln(log.APIServerMgr, err)
|
|
}
|
|
}()
|
|
for {
|
|
message, ok := <-c.Send
|
|
if !ok {
|
|
err := c.Conn.WriteMessage(gws.CloseMessage, []byte{})
|
|
if err != nil {
|
|
log.Errorln(log.APIServerMgr, err)
|
|
}
|
|
log.Debugln(log.APIServerMgr, "websocket: hub closed the channel")
|
|
return
|
|
}
|
|
|
|
w, err := c.Conn.NextWriter(gws.TextMessage)
|
|
if err != nil {
|
|
log.Errorf(log.APIServerMgr, "websocket: failed to create new io.writeCloser: %s\n", err)
|
|
return
|
|
}
|
|
_, err = w.Write(message)
|
|
if err != nil {
|
|
log.Errorln(log.APIServerMgr, err)
|
|
}
|
|
|
|
// Add queued chat messages to the current websocket message
|
|
n := len(c.Send)
|
|
for range n {
|
|
_, err = w.Write(<-c.Send)
|
|
if err != nil {
|
|
log.Errorln(log.APIServerMgr, err)
|
|
}
|
|
}
|
|
|
|
if err := w.Close(); err != nil {
|
|
log.Errorf(log.APIServerMgr, "websocket: failed to close io.WriteCloser: %s\n", err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// StartWebsocketHandler starts the websocket hub and routine which
|
|
// handles clients
|
|
func StartWebsocketHandler() {
|
|
if !wsHubStarted {
|
|
wsHubStarted = true
|
|
wsHub = newWebsocketHub()
|
|
go wsHub.run()
|
|
}
|
|
}
|
|
|
|
// BroadcastWebsocketMessage meow
|
|
func BroadcastWebsocketMessage(evt WebsocketEvent) error {
|
|
if !wsHubStarted {
|
|
return ErrWebsocketServiceNotRunning
|
|
}
|
|
|
|
data, err := json.Marshal(evt)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
wsHub.Broadcast <- data
|
|
return nil
|
|
}
|
|
|
|
// WebsocketClientHandler upgrades the HTTP connection to a websocket
|
|
// compatible one
|
|
func (m *apiServerManager) WebsocketClientHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !wsHubStarted {
|
|
StartWebsocketHandler()
|
|
}
|
|
|
|
connectionLimit := m.remoteConfig.WebsocketRPC.ConnectionLimit
|
|
numClients := len(wsHub.Clients)
|
|
|
|
if numClients >= connectionLimit {
|
|
log.Warnf(log.APIServerMgr,
|
|
"websocket: client rejected due to websocket client limit reached. Number of clients %d. Limit %d.\n",
|
|
numClients, connectionLimit)
|
|
w.WriteHeader(http.StatusForbidden)
|
|
return
|
|
}
|
|
|
|
upgrader := gws.Upgrader{
|
|
WriteBufferSize: 1024,
|
|
ReadBufferSize: 1024,
|
|
}
|
|
|
|
// Allow insecure origin if the Origin request header is present and not
|
|
// equal to the Host request header. Default to false
|
|
if m.remoteConfig.WebsocketRPC.AllowInsecureOrigin {
|
|
upgrader.CheckOrigin = func(*http.Request) bool { return true }
|
|
}
|
|
|
|
conn, err := upgrader.Upgrade(w, r, nil)
|
|
if err != nil {
|
|
log.Errorln(log.APIServerMgr, err)
|
|
return
|
|
}
|
|
|
|
client := &websocketClient{
|
|
Hub: wsHub,
|
|
Conn: conn,
|
|
Send: make(chan []byte, 1024),
|
|
maxAuthFailures: m.remoteConfig.WebsocketRPC.MaxAuthFailures,
|
|
username: m.remoteConfig.Username,
|
|
password: m.remoteConfig.Password,
|
|
configPath: m.gctConfigPath,
|
|
exchangeManager: m.exchangeManager,
|
|
bot: m.bot,
|
|
portfolioManager: m.portfolioManager,
|
|
}
|
|
|
|
client.Hub.Register <- client
|
|
log.Debugf(log.APIServerMgr,
|
|
"websocket: client connected. Connected clients: %d. Limit %d.\n",
|
|
numClients+1, connectionLimit)
|
|
|
|
go client.read()
|
|
go client.write()
|
|
}
|
|
|
|
func wsAuth(client *websocketClient, data any) error {
|
|
d, ok := data.([]byte)
|
|
if !ok {
|
|
return common.GetTypeAssertError("[]byte", data)
|
|
}
|
|
|
|
wsResp := WebsocketEventResponse{
|
|
Event: "auth",
|
|
}
|
|
|
|
var auth WebsocketAuth
|
|
err := json.Unmarshal(d, &auth)
|
|
if err != nil {
|
|
wsResp.Error = err.Error()
|
|
sendErr := client.SendWebsocketMessage(wsResp)
|
|
if sendErr != nil {
|
|
log.Errorln(log.APIServerMgr, sendErr)
|
|
}
|
|
return err
|
|
}
|
|
|
|
shasum := sha256.Sum256([]byte(client.password))
|
|
if auth.Username == client.username && auth.Password == hex.EncodeToString(shasum[:]) {
|
|
client.Authenticated = true
|
|
wsResp.Data = WebsocketResponseSuccess
|
|
log.Debugln(log.APIServerMgr,
|
|
"websocket: client authenticated successfully")
|
|
return client.SendWebsocketMessage(wsResp)
|
|
}
|
|
|
|
wsResp.Error = "invalid username/password"
|
|
client.authFailures++
|
|
sendErr := client.SendWebsocketMessage(wsResp)
|
|
if sendErr != nil {
|
|
log.Errorln(log.APIServerMgr, sendErr)
|
|
}
|
|
if client.authFailures >= client.maxAuthFailures {
|
|
log.Debugf(log.APIServerMgr,
|
|
"websocket: disconnecting client, maximum auth failures threshold reached (failures: %d limit: %d)\n",
|
|
client.authFailures, client.maxAuthFailures)
|
|
wsHub.Unregister <- client
|
|
return nil
|
|
}
|
|
|
|
log.Debugf(log.APIServerMgr,
|
|
"websocket: client sent wrong username/password (failures: %d limit: %d)\n",
|
|
client.authFailures, client.maxAuthFailures)
|
|
return nil
|
|
}
|
|
|
|
func wsGetConfig(client *websocketClient, _ any) error {
|
|
wsResp := WebsocketEventResponse{
|
|
Event: "GetConfig",
|
|
Data: config.GetConfig(),
|
|
}
|
|
return client.SendWebsocketMessage(wsResp)
|
|
}
|
|
|
|
func wsSaveConfig(client *websocketClient, data any) error {
|
|
d, ok := data.([]byte)
|
|
if !ok {
|
|
return common.GetTypeAssertError("[]byte", data)
|
|
}
|
|
|
|
wsResp := WebsocketEventResponse{
|
|
Event: "SaveConfig",
|
|
}
|
|
var respCfg config.Config
|
|
err := json.Unmarshal(d, &respCfg)
|
|
if err != nil {
|
|
wsResp.Error = err.Error()
|
|
sendErr := client.SendWebsocketMessage(wsResp)
|
|
if sendErr != nil {
|
|
log.Errorln(log.APIServerMgr, sendErr)
|
|
}
|
|
return err
|
|
}
|
|
|
|
cfg := config.GetConfig()
|
|
err = cfg.UpdateConfig(client.configPath, &respCfg, false)
|
|
if err != nil {
|
|
wsResp.Error = err.Error()
|
|
sendErr := client.SendWebsocketMessage(wsResp)
|
|
if sendErr != nil {
|
|
log.Errorln(log.APIServerMgr, sendErr)
|
|
}
|
|
return err
|
|
}
|
|
|
|
err = client.bot.SetupExchanges()
|
|
if err != nil {
|
|
wsResp.Error = err.Error()
|
|
sendErr := client.SendWebsocketMessage(wsResp)
|
|
if sendErr != nil {
|
|
log.Errorln(log.APIServerMgr, sendErr)
|
|
}
|
|
return err
|
|
}
|
|
wsResp.Data = WebsocketResponseSuccess
|
|
return client.SendWebsocketMessage(wsResp)
|
|
}
|
|
|
|
func wsGetAccountInfo(client *websocketClient, _ any) error {
|
|
accountInfo := getAllActiveAccounts(client.exchangeManager)
|
|
wsResp := WebsocketEventResponse{
|
|
Event: "GetAccountInfo",
|
|
Data: accountInfo,
|
|
}
|
|
return client.SendWebsocketMessage(wsResp)
|
|
}
|
|
|
|
func wsGetTickers(client *websocketClient, _ any) error {
|
|
wsResp := WebsocketEventResponse{
|
|
Event: "GetTickers",
|
|
}
|
|
wsResp.Data = getAllActiveTickers(client.exchangeManager)
|
|
return client.SendWebsocketMessage(wsResp)
|
|
}
|
|
|
|
func wsGetTicker(client *websocketClient, data any) error {
|
|
d, ok := data.([]byte)
|
|
if !ok {
|
|
return common.GetTypeAssertError("[]byte", data)
|
|
}
|
|
|
|
wsResp := WebsocketEventResponse{
|
|
Event: "GetTicker",
|
|
}
|
|
var tickerReq WebsocketOrderbookTickerRequest
|
|
err := json.Unmarshal(d, &tickerReq)
|
|
if err != nil {
|
|
wsResp.Error = err.Error()
|
|
sendErr := client.SendWebsocketMessage(wsResp)
|
|
if sendErr != nil {
|
|
log.Errorln(log.APIServerMgr, sendErr)
|
|
}
|
|
return err
|
|
}
|
|
|
|
p, err := currency.NewPairFromString(tickerReq.Currency)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
a, err := asset.New(tickerReq.AssetType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
exch, err := client.exchangeManager.GetExchangeByName(tickerReq.Exchange)
|
|
if err != nil {
|
|
wsResp.Error = err.Error()
|
|
sendErr := client.SendWebsocketMessage(wsResp)
|
|
if sendErr != nil {
|
|
log.Errorln(log.APIServerMgr, sendErr)
|
|
}
|
|
return err
|
|
}
|
|
tick, err := exch.GetCachedTicker(p, a)
|
|
if err != nil {
|
|
wsResp.Error = err.Error()
|
|
sendErr := client.SendWebsocketMessage(wsResp)
|
|
if sendErr != nil {
|
|
log.Errorln(log.APIServerMgr, sendErr)
|
|
}
|
|
return err
|
|
}
|
|
wsResp.Data = tick
|
|
return client.SendWebsocketMessage(wsResp)
|
|
}
|
|
|
|
func wsGetOrderbooks(client *websocketClient, _ any) error {
|
|
wsResp := WebsocketEventResponse{
|
|
Event: "GetOrderbooks",
|
|
}
|
|
wsResp.Data = getAllActiveOrderbooks(client.exchangeManager)
|
|
return client.SendWebsocketMessage(wsResp)
|
|
}
|
|
|
|
func wsGetOrderbook(client *websocketClient, data any) error {
|
|
d, ok := data.([]byte)
|
|
if !ok {
|
|
return common.GetTypeAssertError("[]byte", data)
|
|
}
|
|
|
|
var orderbookReq WebsocketOrderbookTickerRequest
|
|
err := json.Unmarshal(d, &orderbookReq)
|
|
if err != nil {
|
|
sendErr := client.SendWebsocketMessage(WebsocketEventResponse{Event: "GetOrderbook", Error: err.Error()})
|
|
if sendErr != nil {
|
|
log.Errorln(log.APIServerMgr, sendErr)
|
|
}
|
|
return err
|
|
}
|
|
|
|
p, err := currency.NewPairFromString(orderbookReq.Currency)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
a, err := asset.New(orderbookReq.AssetType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
exch, err := client.exchangeManager.GetExchangeByName(orderbookReq.Exchange)
|
|
if err != nil {
|
|
sendErr := client.SendWebsocketMessage(WebsocketEventResponse{Event: "GetOrderbook", Error: err.Error()})
|
|
if sendErr != nil {
|
|
log.Errorln(log.APIServerMgr, sendErr)
|
|
}
|
|
return err
|
|
}
|
|
ob, err := exch.GetCachedOrderbook(p, a)
|
|
if err != nil {
|
|
sendErr := client.SendWebsocketMessage(WebsocketEventResponse{Event: "GetOrderbook", Error: err.Error()})
|
|
if sendErr != nil {
|
|
log.Errorln(log.APIServerMgr, sendErr)
|
|
}
|
|
return err
|
|
}
|
|
|
|
return client.SendWebsocketMessage(WebsocketEventResponse{Event: "GetOrderbook", Data: ob})
|
|
}
|
|
|
|
func wsGetExchangeRates(client *websocketClient, _ any) error {
|
|
wsResp := WebsocketEventResponse{
|
|
Event: "GetExchangeRates",
|
|
}
|
|
|
|
var err error
|
|
wsResp.Data, err = currency.GetExchangeRates()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return client.SendWebsocketMessage(wsResp)
|
|
}
|
|
|
|
func wsGetPortfolio(client *websocketClient, _ any) error {
|
|
wsResp := WebsocketEventResponse{
|
|
Event: "GetPortfolio",
|
|
}
|
|
|
|
wsResp.Data = client.portfolioManager.GetPortfolioSummary()
|
|
return client.SendWebsocketMessage(wsResp)
|
|
}
|