websocket: add SendMessageReturnResponse latency reporter (#1031)

* add websocket sync request latency reporter

* add globalReporter similar to request package

* gofmt

* connection level reporter and test

* fix SetupNewConnection
This commit is contained in:
E Sequeira
2022-09-19 03:39:29 +01:00
committed by GitHub
parent e78287b26e
commit c615a79421
5 changed files with 105 additions and 6 deletions

View File

@@ -41,11 +41,12 @@ type ChannelSubscription struct {
// ConnectionSetup defines variables for an individual stream connection
type ConnectionSetup struct {
ResponseCheckTimeout time.Duration
ResponseMaxLimit time.Duration
RateLimit int64
URL string
Authenticated bool
ResponseCheckTimeout time.Duration
ResponseMaxLimit time.Duration
RateLimit int64
URL string
Authenticated bool
ConnectionLevelReporter Reporter
}
// PingHandler container for ping handler settings
@@ -97,3 +98,9 @@ type WebsocketPositionUpdated struct {
type UnhandledMessageWarning struct {
Message string
}
// Reporter interface groups observability functionality over
// Websocket request latency.
type Reporter interface {
Latency(name string, message []byte, t time.Duration)
}

View File

@@ -46,6 +46,14 @@ var (
errClosedConnection = errors.New("use of closed network connection")
)
var globalReporter Reporter
// SetupGlobalReporter sets a reporter interface to be used
// for all exchange requests
func SetupGlobalReporter(r Reporter) {
globalReporter = r
}
// New initialises the websocket struct
func New() *Websocket {
return &Websocket{
@@ -183,6 +191,14 @@ func (w *Websocket) SetupNewConnection(c ConnectionSetup) error {
connectionURL = c.URL
}
if c.ConnectionLevelReporter == nil {
c.ConnectionLevelReporter = w.ExchangeLevelReporter
}
if c.ConnectionLevelReporter == nil {
c.ConnectionLevelReporter = globalReporter
}
newConn := &WebsocketConnection{
ExchangeName: w.exchangeName,
URL: connectionURL,
@@ -195,6 +211,7 @@ func (w *Websocket) SetupNewConnection(c ConnectionSetup) error {
Wg: w.Wg,
Match: w.Match,
RateLimit: c.RateLimit,
Reporter: c.ConnectionLevelReporter,
}
if c.Authenticated {

View File

@@ -5,6 +5,7 @@ import (
"compress/flate"
"compress/gzip"
"crypto/rand"
"encoding/json"
"fmt"
"io"
"math/big"
@@ -27,7 +28,13 @@ func (w *WebsocketConnection) SendMessageReturnResponse(signature, request inter
}
defer m.Cleanup()
err = w.SendJSONMessage(request)
b, err := json.Marshal(request)
if err != nil {
return nil, fmt.Errorf("error marshaling json for %s: %w", signature, err)
}
start := time.Now()
err = w.SendRawMessage(websocket.TextMessage, b)
if err != nil {
return nil, err
}
@@ -36,6 +43,10 @@ func (w *WebsocketConnection) SendMessageReturnResponse(signature, request inter
select {
case payload := <-m.C:
if w.Reporter != nil {
w.Reporter.Latency(w.ExchangeName, b, time.Since(start))
}
return payload, nil
case <-timer.C:
timer.Stop()

View File

@@ -778,6 +778,18 @@ func TestSendMessageWithResponse(t *testing.T) {
}
}
type reporter struct {
name string
msg []byte
t time.Duration
}
func (r *reporter) Latency(name string, message []byte, t time.Duration) {
r.name = name
r.msg = message
r.t = t
}
// readMessages helper func
func readMessages(t *testing.T, wc *WebsocketConnection) {
t.Helper()
@@ -1324,3 +1336,50 @@ func TestWebsocketConnectionShutdown(t *testing.T) {
t.Fatal(err)
}
}
// TestLatency logic test
func TestLatency(t *testing.T) {
t.Parallel()
r := &reporter{}
exch := "Kraken"
wc := &WebsocketConnection{
ExchangeName: exch,
Verbose: true,
URL: "wss://ws.kraken.com",
ResponseMaxLimit: time.Second * 5,
Match: NewMatch(),
Reporter: r,
}
if wc.ProxyURL != "" && !useProxyTests {
t.Skip("Proxy testing not enabled, skipping")
}
err := wc.Dial(&dialer, http.Header{})
if err != nil {
t.Fatal(err)
}
go readMessages(t, wc)
request := testRequest{
Event: "subscribe",
Pairs: []string{currency.NewPairWithDelimiter("XBT", "USD", "/").String()},
Subscription: testRequestData{
Name: "ticker",
},
RequestID: wc.GenerateMessageID(false),
}
_, err = wc.SendMessageReturnResponse(request.RequestID, request)
if err != nil {
t.Error(err)
}
if r.t == 0 {
t.Error("expected a nonzero duration, got zero")
}
if r.name != exch {
t.Errorf("expected %v, got %v", exch, r.name)
}
}

View File

@@ -92,6 +92,9 @@ type Websocket struct {
Conn Connection
// Authenticated stream connection
AuthConn Connection
// Latency reporter
ExchangeLevelReporter Reporter
}
// WebsocketSetup defines variables for setting up a websocket connection
@@ -138,4 +141,6 @@ type WebsocketConnection struct {
ResponseMaxLimit time.Duration
Traffic chan struct{}
readMessageErrors chan error
Reporter Reporter
}