From c615a79421416802389faeb0dd2a77f2c39e764e Mon Sep 17 00:00:00 2001 From: E Sequeira Date: Mon, 19 Sep 2022 03:39:29 +0100 Subject: [PATCH] websocket: add SendMessageReturnResponse latency reporter (#1031) * add websocket sync request latency reporter * add globalReporter similar to request package * gofmt * connection level reporter and test * fix SetupNewConnection --- exchanges/stream/stream_types.go | 17 +++++-- exchanges/stream/websocket.go | 17 +++++++ exchanges/stream/websocket_connection.go | 13 +++++- exchanges/stream/websocket_test.go | 59 ++++++++++++++++++++++++ exchanges/stream/websocket_types.go | 5 ++ 5 files changed, 105 insertions(+), 6 deletions(-) diff --git a/exchanges/stream/stream_types.go b/exchanges/stream/stream_types.go index 76006234..053e5688 100644 --- a/exchanges/stream/stream_types.go +++ b/exchanges/stream/stream_types.go @@ -41,11 +41,12 @@ type ChannelSubscription struct { // ConnectionSetup defines variables for an individual stream connection type ConnectionSetup struct { - ResponseCheckTimeout time.Duration - ResponseMaxLimit time.Duration - RateLimit int64 - URL string - Authenticated bool + ResponseCheckTimeout time.Duration + ResponseMaxLimit time.Duration + RateLimit int64 + URL string + Authenticated bool + ConnectionLevelReporter Reporter } // PingHandler container for ping handler settings @@ -97,3 +98,9 @@ type WebsocketPositionUpdated struct { type UnhandledMessageWarning struct { Message string } + +// Reporter interface groups observability functionality over +// Websocket request latency. +type Reporter interface { + Latency(name string, message []byte, t time.Duration) +} diff --git a/exchanges/stream/websocket.go b/exchanges/stream/websocket.go index 0d413f72..d4e6281f 100644 --- a/exchanges/stream/websocket.go +++ b/exchanges/stream/websocket.go @@ -46,6 +46,14 @@ var ( errClosedConnection = errors.New("use of closed network connection") ) +var globalReporter Reporter + +// SetupGlobalReporter sets a reporter interface to be used +// for all exchange requests +func SetupGlobalReporter(r Reporter) { + globalReporter = r +} + // New initialises the websocket struct func New() *Websocket { return &Websocket{ @@ -183,6 +191,14 @@ func (w *Websocket) SetupNewConnection(c ConnectionSetup) error { connectionURL = c.URL } + if c.ConnectionLevelReporter == nil { + c.ConnectionLevelReporter = w.ExchangeLevelReporter + } + + if c.ConnectionLevelReporter == nil { + c.ConnectionLevelReporter = globalReporter + } + newConn := &WebsocketConnection{ ExchangeName: w.exchangeName, URL: connectionURL, @@ -195,6 +211,7 @@ func (w *Websocket) SetupNewConnection(c ConnectionSetup) error { Wg: w.Wg, Match: w.Match, RateLimit: c.RateLimit, + Reporter: c.ConnectionLevelReporter, } if c.Authenticated { diff --git a/exchanges/stream/websocket_connection.go b/exchanges/stream/websocket_connection.go index 65ae597a..e1ea5af7 100644 --- a/exchanges/stream/websocket_connection.go +++ b/exchanges/stream/websocket_connection.go @@ -5,6 +5,7 @@ import ( "compress/flate" "compress/gzip" "crypto/rand" + "encoding/json" "fmt" "io" "math/big" @@ -27,7 +28,13 @@ func (w *WebsocketConnection) SendMessageReturnResponse(signature, request inter } defer m.Cleanup() - err = w.SendJSONMessage(request) + b, err := json.Marshal(request) + if err != nil { + return nil, fmt.Errorf("error marshaling json for %s: %w", signature, err) + } + + start := time.Now() + err = w.SendRawMessage(websocket.TextMessage, b) if err != nil { return nil, err } @@ -36,6 +43,10 @@ func (w *WebsocketConnection) SendMessageReturnResponse(signature, request inter select { case payload := <-m.C: + if w.Reporter != nil { + w.Reporter.Latency(w.ExchangeName, b, time.Since(start)) + } + return payload, nil case <-timer.C: timer.Stop() diff --git a/exchanges/stream/websocket_test.go b/exchanges/stream/websocket_test.go index 89cb96ab..7342d743 100644 --- a/exchanges/stream/websocket_test.go +++ b/exchanges/stream/websocket_test.go @@ -778,6 +778,18 @@ func TestSendMessageWithResponse(t *testing.T) { } } +type reporter struct { + name string + msg []byte + t time.Duration +} + +func (r *reporter) Latency(name string, message []byte, t time.Duration) { + r.name = name + r.msg = message + r.t = t +} + // readMessages helper func func readMessages(t *testing.T, wc *WebsocketConnection) { t.Helper() @@ -1324,3 +1336,50 @@ func TestWebsocketConnectionShutdown(t *testing.T) { t.Fatal(err) } } + +// TestLatency logic test +func TestLatency(t *testing.T) { + t.Parallel() + r := &reporter{} + exch := "Kraken" + wc := &WebsocketConnection{ + ExchangeName: exch, + Verbose: true, + URL: "wss://ws.kraken.com", + ResponseMaxLimit: time.Second * 5, + Match: NewMatch(), + Reporter: r, + } + if wc.ProxyURL != "" && !useProxyTests { + t.Skip("Proxy testing not enabled, skipping") + } + + err := wc.Dial(&dialer, http.Header{}) + if err != nil { + t.Fatal(err) + } + + go readMessages(t, wc) + + request := testRequest{ + Event: "subscribe", + Pairs: []string{currency.NewPairWithDelimiter("XBT", "USD", "/").String()}, + Subscription: testRequestData{ + Name: "ticker", + }, + RequestID: wc.GenerateMessageID(false), + } + + _, err = wc.SendMessageReturnResponse(request.RequestID, request) + if err != nil { + t.Error(err) + } + + if r.t == 0 { + t.Error("expected a nonzero duration, got zero") + } + + if r.name != exch { + t.Errorf("expected %v, got %v", exch, r.name) + } +} diff --git a/exchanges/stream/websocket_types.go b/exchanges/stream/websocket_types.go index 08ad2d2c..41444f1a 100644 --- a/exchanges/stream/websocket_types.go +++ b/exchanges/stream/websocket_types.go @@ -92,6 +92,9 @@ type Websocket struct { Conn Connection // Authenticated stream connection AuthConn Connection + + // Latency reporter + ExchangeLevelReporter Reporter } // WebsocketSetup defines variables for setting up a websocket connection @@ -138,4 +141,6 @@ type WebsocketConnection struct { ResponseMaxLimit time.Duration Traffic chan struct{} readMessageErrors chan error + + Reporter Reporter }