log: Add structured logging (#1171)

* basic implementation

* log: deprecate duplicate function, add tests and refine calls.

* linter: fixes

* linter: update struct

* linter and new type

* log tests: update to not lint issue

* linter: stop complaining please

* glorious: nits

* log: rm comment code

* glorious: nits

* glorious: nits

* glorious: nits

* glorious: nits missed

---------

Co-authored-by: Ryan O'Hara-Reid <ryan.oharareid@thrasher.io>
This commit is contained in:
Ryan O'Hara-Reid
2023-05-10 17:52:53 +10:00
committed by GitHub
parent 6e1cbfc31e
commit db8735ec99
44 changed files with 679 additions and 476 deletions

View File

@@ -181,7 +181,7 @@ func (m *apiServerManager) StartRESTServer() error {
if err != nil {
atomic.StoreInt32(&m.restStarted, 0)
if !errors.Is(err, http.ErrServerClosed) {
log.Error(log.APIServerMgr, err)
log.Errorln(log.APIServerMgr, err)
}
}
}()
@@ -296,7 +296,7 @@ func (m *apiServerManager) restGetAllEnabledAccountInfo(w http.ResponseWriter, r
func (m *apiServerManager) getIndex(w http.ResponseWriter, _ *http.Request) {
_, err := fmt.Fprint(w, restIndexResponse)
if err != nil {
log.Error(log.APIServerMgr, err)
log.Errorln(log.APIServerMgr, err)
}
w.WriteHeader(http.StatusOK)
}
@@ -443,7 +443,7 @@ func (m *apiServerManager) StartWebsocketServer() error {
if err != nil {
atomic.StoreInt32(&m.websocketStarted, 0)
if !errors.Is(err, http.ErrServerClosed) {
log.Error(log.APIServerMgr, err)
log.Errorln(log.APIServerMgr, err)
}
}
}()
@@ -502,7 +502,7 @@ func (c *websocketClient) read() {
c.Hub.Unregister <- c
conErr := c.Conn.Close()
if conErr != nil {
log.Error(log.APIServerMgr, conErr)
log.Errorln(log.APIServerMgr, conErr)
}
}()
@@ -547,7 +547,7 @@ func (c *websocketClient) read() {
log.Warnf(log.APIServerMgr, "Websocket: request %s failed due to unauthenticated request on an authenticated API\n", evt.Event)
err = c.SendWebsocketMessage(WebsocketEventResponse{Event: evt.Event, Error: "unauthorised request on authenticated API"})
if err != nil {
log.Error(log.APIServerMgr, err)
log.Errorln(log.APIServerMgr, err)
}
continue
}
@@ -565,7 +565,7 @@ func (c *websocketClient) write() {
defer func() {
err := c.Conn.Close()
if err != nil {
log.Error(log.APIServerMgr, err)
log.Errorln(log.APIServerMgr, err)
}
}()
for {
@@ -573,7 +573,7 @@ func (c *websocketClient) write() {
if !ok {
err := c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
if err != nil {
log.Error(log.APIServerMgr, err)
log.Errorln(log.APIServerMgr, err)
}
log.Debugln(log.APIServerMgr, "websocket: hub closed the channel")
return
@@ -586,7 +586,7 @@ func (c *websocketClient) write() {
}
_, err = w.Write(message)
if err != nil {
log.Error(log.APIServerMgr, err)
log.Errorln(log.APIServerMgr, err)
}
// Add queued chat messages to the current websocket message
@@ -594,7 +594,7 @@ func (c *websocketClient) write() {
for i := 0; i < n; i++ {
_, err = w.Write(<-c.Send)
if err != nil {
log.Error(log.APIServerMgr, err)
log.Errorln(log.APIServerMgr, err)
}
}
@@ -661,7 +661,7 @@ func (m *apiServerManager) WebsocketClientHandler(w http.ResponseWriter, r *http
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Error(log.APIServerMgr, err)
log.Errorln(log.APIServerMgr, err)
return
}
@@ -703,7 +703,7 @@ func wsAuth(client *websocketClient, data interface{}) error {
wsResp.Error = err.Error()
sendErr := client.SendWebsocketMessage(wsResp)
if sendErr != nil {
log.Error(log.APIServerMgr, sendErr)
log.Errorln(log.APIServerMgr, sendErr)
}
return err
}
@@ -726,7 +726,7 @@ func wsAuth(client *websocketClient, data interface{}) error {
client.authFailures++
sendErr := client.SendWebsocketMessage(wsResp)
if sendErr != nil {
log.Error(log.APIServerMgr, sendErr)
log.Errorln(log.APIServerMgr, sendErr)
}
if client.authFailures >= client.maxAuthFailures {
log.Debugf(log.APIServerMgr,
@@ -765,7 +765,7 @@ func wsSaveConfig(client *websocketClient, data interface{}) error {
wsResp.Error = err.Error()
sendErr := client.SendWebsocketMessage(wsResp)
if sendErr != nil {
log.Error(log.APIServerMgr, sendErr)
log.Errorln(log.APIServerMgr, sendErr)
}
return err
}
@@ -776,7 +776,7 @@ func wsSaveConfig(client *websocketClient, data interface{}) error {
wsResp.Error = err.Error()
sendErr := client.SendWebsocketMessage(wsResp)
if sendErr != nil {
log.Error(log.APIServerMgr, sendErr)
log.Errorln(log.APIServerMgr, sendErr)
}
return err
}
@@ -786,7 +786,7 @@ func wsSaveConfig(client *websocketClient, data interface{}) error {
wsResp.Error = err.Error()
sendErr := client.SendWebsocketMessage(wsResp)
if sendErr != nil {
log.Error(log.APIServerMgr, sendErr)
log.Errorln(log.APIServerMgr, sendErr)
}
return err
}
@@ -826,7 +826,7 @@ func wsGetTicker(client *websocketClient, data interface{}) error {
wsResp.Error = err.Error()
sendErr := client.SendWebsocketMessage(wsResp)
if sendErr != nil {
log.Error(log.APIServerMgr, sendErr)
log.Errorln(log.APIServerMgr, sendErr)
}
return err
}
@@ -846,7 +846,7 @@ func wsGetTicker(client *websocketClient, data interface{}) error {
wsResp.Error = err.Error()
sendErr := client.SendWebsocketMessage(wsResp)
if sendErr != nil {
log.Error(log.APIServerMgr, sendErr)
log.Errorln(log.APIServerMgr, sendErr)
}
return err
}
@@ -855,7 +855,7 @@ func wsGetTicker(client *websocketClient, data interface{}) error {
wsResp.Error = err.Error()
sendErr := client.SendWebsocketMessage(wsResp)
if sendErr != nil {
log.Error(log.APIServerMgr, sendErr)
log.Errorln(log.APIServerMgr, sendErr)
}
return err
}
@@ -886,7 +886,7 @@ func wsGetOrderbook(client *websocketClient, data interface{}) error {
wsResp.Error = err.Error()
sendErr := client.SendWebsocketMessage(wsResp)
if sendErr != nil {
log.Error(log.APIServerMgr, sendErr)
log.Errorln(log.APIServerMgr, sendErr)
}
return err
}
@@ -906,7 +906,7 @@ func wsGetOrderbook(client *websocketClient, data interface{}) error {
wsResp.Error = err.Error()
sendErr := client.SendWebsocketMessage(wsResp)
if sendErr != nil {
log.Error(log.APIServerMgr, sendErr)
log.Errorln(log.APIServerMgr, sendErr)
}
return err
}
@@ -915,7 +915,7 @@ func wsGetOrderbook(client *websocketClient, data interface{}) error {
wsResp.Error = err.Error()
sendErr := client.SendWebsocketMessage(wsResp)
if sendErr != nil {
log.Error(log.APIServerMgr, sendErr)
log.Errorln(log.APIServerMgr, sendErr)
}
return err
}

View File

@@ -160,7 +160,7 @@ func (m *DatabaseConnectionManager) run(wg *sync.WaitGroup) {
case <-t.C:
err := m.checkConnection()
if err != nil {
log.Error(log.DatabaseMgr, "Database connection error:", err)
log.Errorln(log.DatabaseMgr, "Database connection error:", err)
}
}
}
@@ -186,7 +186,7 @@ func (m *DatabaseConnectionManager) checkConnection() error {
}
if !m.dbConn.IsConnected() {
log.Info(log.DatabaseMgr, "Database connection reestablished")
log.Infoln(log.DatabaseMgr, "Database connection reestablished")
m.dbConn.SetConnected(true)
}
return nil

View File

@@ -132,7 +132,7 @@ func (m *DataHistoryManager) retrieveJobs() ([]*DataHistoryJob, error) {
}
err = m.validateJob(dbJob)
if err != nil {
log.Error(log.DataHistory, err)
log.Errorln(log.DataHistory, err)
continue
}
response = append(response, dbJob)
@@ -156,7 +156,7 @@ func (m *DataHistoryManager) PrepareJobs() ([]*DataHistoryJob, error) {
defer func() {
err = m.Stop()
if err != nil {
log.Error(log.DataHistory, err)
log.Errorln(log.DataHistory, err)
}
}()
return nil, fmt.Errorf("error retrieving jobs, has everything been setup? Data history manager will shut down. %w", err)
@@ -237,7 +237,7 @@ func (m *DataHistoryManager) run() {
if m.databaseConnectionInstance != nil && m.databaseConnectionInstance.IsConnected() {
go func() {
if err := m.runJobs(); err != nil {
log.Error(log.DataHistory, err)
log.Errorln(log.DataHistory, err)
}
}()
}
@@ -274,7 +274,7 @@ func (m *DataHistoryManager) runJobs() error {
for i := 0; (i < int(m.maxJobsPerCycle) || m.maxJobsPerCycle == -1) && i < len(validJobs); i++ {
err := m.runJob(validJobs[i])
if err != nil {
log.Error(log.DataHistory, err)
log.Errorln(log.DataHistory, err)
}
if m.verbose {
log.Debugf(log.DataHistory, "completed run of data history job %v", validJobs[i].Nickname)

View File

@@ -90,7 +90,7 @@ func NewFromSettings(settings *Settings, flagSet map[string]bool) (*Engine, erro
}
if *b.Config.Logging.Enabled {
err = gctlog.SetupGlobalLogger()
err = gctlog.SetupGlobalLogger(b.Config.Name, b.Config.Logging.AdvancedSettings.StructuredLogging)
if err != nil {
return nil, fmt.Errorf("failed to setup global logger. %w", err)
}
@@ -339,7 +339,7 @@ func (bot *Engine) Start() error {
if err != nil {
return fmt.Errorf("unable to set NTP check: %w", err)
}
gctlog.Info(gctlog.TimeMgr, responseMessage)
gctlog.Infoln(gctlog.TimeMgr, responseMessage)
}
bot.ntpManager, err = setupNTPManager(&bot.Config.NTPClient, *bot.Config.Logging.Enabled)
if err != nil {

View File

@@ -109,7 +109,7 @@ func (m *ntpManager) run() {
case <-t.C:
err := m.processTime()
if err != nil {
log.Error(log.TimeMgr, err)
log.Errorln(log.TimeMgr, err)
}
}
}
@@ -166,7 +166,7 @@ func (m *ntpManager) checkTimeInPools() time.Time {
log.Warnf(log.TimeMgr, "Unable to SetDeadline. Error: %s\n", err)
err = con.Close()
if err != nil {
log.Error(log.TimeMgr, err)
log.Errorln(log.TimeMgr, err)
}
continue
}
@@ -176,7 +176,7 @@ func (m *ntpManager) checkTimeInPools() time.Time {
log.Warnf(log.TimeMgr, "Unable to write. Error: %s\n", err)
err = con.Close()
if err != nil {
log.Error(log.TimeMgr, err)
log.Errorln(log.TimeMgr, err)
}
continue
}
@@ -186,7 +186,7 @@ func (m *ntpManager) checkTimeInPools() time.Time {
log.Warnf(log.TimeMgr, "Unable to read. Error: %s\n", err)
err = con.Close()
if err != nil {
log.Error(log.TimeMgr, err)
log.Errorln(log.TimeMgr, err)
}
continue
}
@@ -196,7 +196,7 @@ func (m *ntpManager) checkTimeInPools() time.Time {
err = con.Close()
if err != nil {
log.Error(log.TimeMgr, err)
log.Errorln(log.TimeMgr, err)
}
return time.Unix(int64(secs), nanos)
}

View File

@@ -147,12 +147,12 @@ func (m *OrderManager) CancelAllOrders(ctx context.Context, exchanges []exchange
log.Debugf(log.OrderMgr, "Cancelling order(s) for exchange %s.", exchanges[i].GetName())
cancel, err := orders[j].DeriveCancel()
if err != nil {
log.Error(log.OrderMgr, err)
log.Errorln(log.OrderMgr, err)
continue
}
err = m.Cancel(ctx, cancel)
if err != nil {
log.Error(log.OrderMgr, err)
log.Errorln(log.OrderMgr, err)
}
}
}
@@ -685,7 +685,7 @@ func (m *OrderManager) processOrders() {
var upsertResponse *OrderUpsertResponse
upsertResponse, err = m.UpsertOrder(&result[z])
if err != nil {
log.Error(log.OrderMgr, err)
log.Errorln(log.OrderMgr, err)
continue
}
for i := range orders {
@@ -708,7 +708,7 @@ func (m *OrderManager) processOrders() {
var sd time.Time
sd, err = m.orderStore.futuresPositionController.LastUpdated()
if err != nil {
log.Error(log.OrderMgr, err)
log.Errorln(log.OrderMgr, err)
return
}
if sd.IsZero() {
@@ -721,7 +721,7 @@ func (m *OrderManager) processOrders() {
})
if err != nil {
if !errors.Is(err, common.ErrNotYetImplemented) {
log.Error(log.OrderMgr, err)
log.Errorln(log.OrderMgr, err)
}
return
}
@@ -817,7 +817,7 @@ func (m *OrderManager) processMatchingOrders(exch exchange.IBotExchange, orders
}
err := m.FetchAndUpdateExchangeOrder(exch, &orders[x], orders[x].AssetType)
if err != nil {
log.Error(log.OrderMgr, err)
log.Errorln(log.OrderMgr, err)
}
}
if wg != nil {
@@ -919,10 +919,10 @@ func (m *OrderManager) UpsertOrder(od *order.Detail) (resp *OrderUpsertResponse,
upsertResponse.OrderDetails.Pair, upsertResponse.OrderDetails.Price, upsertResponse.OrderDetails.Amount,
upsertResponse.OrderDetails.Side, upsertResponse.OrderDetails.Type, upsertResponse.OrderDetails.Status)
if upsertResponse.IsNewOrder {
log.Info(log.OrderMgr, msg)
log.Infoln(log.OrderMgr, msg)
return upsertResponse, nil
}
log.Debug(log.OrderMgr, msg)
log.Debugln(log.OrderMgr, msg)
return upsertResponse, nil
}

View File

@@ -709,7 +709,7 @@ func (s *RPCServer) GetAccountInfoStream(r *gctrpc.GetAccountInfoRequest, stream
defer func() {
pipeErr := pipe.Release()
if pipeErr != nil {
log.Error(log.DispatchMgr, pipeErr)
log.Errorln(log.DispatchMgr, pipeErr)
}
}()
@@ -2183,7 +2183,7 @@ func (s *RPCServer) GetExchangeOrderbookStream(r *gctrpc.GetExchangeOrderbookStr
defer func() {
pipeErr := pipe.Release()
if pipeErr != nil {
log.Error(log.DispatchMgr, pipeErr)
log.Errorln(log.DispatchMgr, pipeErr)
}
}()
@@ -2268,7 +2268,7 @@ func (s *RPCServer) GetTickerStream(r *gctrpc.GetTickerStreamRequest, stream gct
defer func() {
pipeErr := pipe.Release()
if pipeErr != nil {
log.Error(log.DispatchMgr, pipeErr)
log.Errorln(log.DispatchMgr, pipeErr)
}
}()
@@ -2321,7 +2321,7 @@ func (s *RPCServer) GetExchangeTickerStream(r *gctrpc.GetExchangeTickerStreamReq
defer func() {
pipeErr := pipe.Release()
if pipeErr != nil {
log.Error(log.DispatchMgr, pipeErr)
log.Errorln(log.DispatchMgr, pipeErr)
}
}()
@@ -4111,7 +4111,7 @@ func (s *RPCServer) SetDataHistoryJobStatus(_ context.Context, r *gctrpc.SetData
status := "success"
err := s.dataHistoryManager.SetJobStatus(r.Nickname, r.Id, dataHistoryStatus(r.Status))
if err != nil {
log.Error(log.GRPCSys, err)
log.Errorln(log.GRPCSys, err)
status = "failed"
}

View File

@@ -218,7 +218,7 @@ func (m *syncManager) Start() error {
log.Debugln(log.SyncMgr, "Exchange CurrencyPairSyncer stopping.")
err := m.Stop()
if err != nil {
log.Error(log.SyncMgr, err)
log.Errorln(log.SyncMgr, err)
}
return
}
@@ -547,7 +547,7 @@ func (m *syncManager) worker() {
m.add(c)
} else {
log.Error(log.SyncMgr, err)
log.Errorln(log.SyncMgr, err)
continue
}
}
@@ -595,7 +595,7 @@ func (m *syncManager) worker() {
}
updateErr := m.Update(c.Exchange, c.Pair, c.AssetType, SyncItemOrderbook, err)
if updateErr != nil {
log.Error(log.SyncMgr, updateErr)
log.Errorln(log.SyncMgr, updateErr)
}
} else {
time.Sleep(time.Millisecond * 50)
@@ -673,7 +673,7 @@ func (m *syncManager) worker() {
}
updateErr := m.Update(c.Exchange, c.Pair, c.AssetType, SyncItemTicker, err)
if updateErr != nil {
log.Error(log.SyncMgr, updateErr)
log.Errorln(log.SyncMgr, updateErr)
}
}
} else {
@@ -688,7 +688,7 @@ func (m *syncManager) worker() {
m.setProcessing(c.Exchange, c.Pair, c.AssetType, SyncItemTrade, true)
err := m.Update(c.Exchange, c.Pair, c.AssetType, SyncItemTrade, nil)
if err != nil {
log.Error(log.SyncMgr, err)
log.Errorln(log.SyncMgr, err)
}
}
}

View File

@@ -174,7 +174,7 @@ func (m *websocketRoutineManager) websocketDataReceiver(ws *stream.Websocket) er
for x := range m.dataHandlers {
err := m.dataHandlers[x](ws.GetName(), data)
if err != nil {
log.Error(log.WebsocketMgr, err)
log.Errorln(log.WebsocketMgr, err)
}
}
m.mu.RUnlock()
@@ -190,7 +190,7 @@ func (m *websocketRoutineManager) websocketDataReceiver(ws *stream.Websocket) er
func (m *websocketRoutineManager) websocketDataHandler(exchName string, data interface{}) error {
switch d := data.(type) {
case string:
log.Info(log.WebsocketMgr, d)
log.Infoln(log.WebsocketMgr, d)
case error:
return fmt.Errorf("exchange %s websocket error - %s", exchName, data)
case stream.FundingData:
@@ -267,7 +267,7 @@ func (m *websocketRoutineManager) websocketDataHandler(exchName string, data int
case order.ClassificationError:
return fmt.Errorf("%w %s", d.Err, d.Error())
case stream.UnhandledMessageWarning:
log.Warn(log.WebsocketMgr, d.Message)
log.Warnln(log.WebsocketMgr, d.Message)
case account.Change:
if m.verbose {
m.printAccountHoldingsChangeSummary(d)