mirror of
https://github.com/d0zingcat/gocryptotrader.git
synced 2026-05-13 15:09:42 +00:00
stream: Add verbosity to outbound requests that also returns responses (#1640)
* Add verbosity * Update exchanges/stream/websocket_connection.go Co-authored-by: Scott <gloriousCode@users.noreply.github.com> * glorious: nits * Update exchanges/stream/websocket_test.go Co-authored-by: Scott <gloriousCode@users.noreply.github.com> * Update exchanges/stream/websocket_connection.go Co-authored-by: Scott <gloriousCode@users.noreply.github.com> * reduce double log on ultimate verbose * glorious: extra * Update exchanges/stream/websocket_connection.go Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io> * nits for sauron --------- Co-authored-by: shazbert <ryan.oharareid@thrasher.io> Co-authored-by: Scott <gloriousCode@users.noreply.github.com> Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io>
This commit is contained in:
@@ -167,7 +167,7 @@ func (r *Requester) doRequest(ctx context.Context, endpoint EndpointLimit, newRe
|
||||
return err
|
||||
}
|
||||
|
||||
verbose := isVerbose(ctx, p.Verbose)
|
||||
verbose := IsVerbose(ctx, p.Verbose)
|
||||
|
||||
if verbose {
|
||||
log.Debugf(log.RequestSys, "%s attempt %d request path: %s", r.name, attempt, p.Path)
|
||||
@@ -380,9 +380,9 @@ func WithVerbose(ctx context.Context) context.Context {
|
||||
return context.WithValue(ctx, contextVerboseFlag, true)
|
||||
}
|
||||
|
||||
// isVerbose checks main verbosity first then checks context verbose values
|
||||
// IsVerbose checks main verbosity first then checks context verbose values
|
||||
// for specific request verbosity.
|
||||
func isVerbose(ctx context.Context, verbose bool) bool {
|
||||
func IsVerbose(ctx context.Context, verbose bool) bool {
|
||||
if verbose {
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -700,29 +700,10 @@ func TestGetHTTPClientUserAgent(t *testing.T) {
|
||||
|
||||
func TestContextVerbosity(t *testing.T) {
|
||||
t.Parallel()
|
||||
if isVerbose(context.Background(), false) {
|
||||
t.Fatal("unexpected value")
|
||||
}
|
||||
|
||||
if !isVerbose(context.Background(), true) {
|
||||
t.Fatal("unexpected value")
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
ctx = WithVerbose(ctx)
|
||||
if !isVerbose(ctx, false) {
|
||||
t.Fatal("unexpected value")
|
||||
}
|
||||
|
||||
ctx = context.Background()
|
||||
ctx = context.WithValue(ctx, contextVerboseFlag, false)
|
||||
if isVerbose(ctx, false) {
|
||||
t.Fatal("unexpected value")
|
||||
}
|
||||
|
||||
ctx = context.Background()
|
||||
ctx = context.WithValue(ctx, contextVerboseFlag, "bruh")
|
||||
if isVerbose(ctx, false) {
|
||||
t.Fatal("unexpected value")
|
||||
}
|
||||
require.False(t, IsVerbose(context.Background(), false))
|
||||
require.True(t, IsVerbose(context.Background(), true))
|
||||
require.True(t, IsVerbose(WithVerbose(context.Background()), false))
|
||||
require.False(t, IsVerbose(context.WithValue(context.Background(), contextVerboseFlag, false), false))
|
||||
require.False(t, IsVerbose(context.WithValue(context.Background(), contextVerboseFlag, "bruh"), false))
|
||||
require.True(t, IsVerbose(context.WithValue(context.Background(), contextVerboseFlag, true), false))
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@@ -57,9 +58,9 @@ func (w *WebsocketConnection) Dial(dialer *websocket.Dialer, headers http.Header
|
||||
// SendJSONMessage sends a JSON encoded message over the connection
|
||||
func (w *WebsocketConnection) SendJSONMessage(ctx context.Context, data interface{}) error {
|
||||
return w.writeToConn(ctx, func() error {
|
||||
if w.Verbose {
|
||||
if request.IsVerbose(ctx, w.Verbose) {
|
||||
if msg, err := json.Marshal(data); err == nil { // WriteJSON will error for us anyway
|
||||
log.Debugf(log.WebsocketMgr, "%s websocket connection: sending message: %s\n", w.ExchangeName, msg)
|
||||
log.Debugf(log.WebsocketMgr, "%v %v: Sending message: %v", w.ExchangeName, removeURLQueryString(w.URL), string(msg))
|
||||
}
|
||||
}
|
||||
return w.Connection.WriteJSON(data)
|
||||
@@ -69,8 +70,8 @@ func (w *WebsocketConnection) SendJSONMessage(ctx context.Context, data interfac
|
||||
// SendRawMessage sends a message over the connection without JSON encoding it
|
||||
func (w *WebsocketConnection) SendRawMessage(ctx context.Context, messageType int, message []byte) error {
|
||||
return w.writeToConn(ctx, func() error {
|
||||
if w.Verbose {
|
||||
log.Debugf(log.WebsocketMgr, "%v websocket connection: sending message [%s]\n", w.ExchangeName, message)
|
||||
if request.IsVerbose(ctx, w.Verbose) {
|
||||
log.Debugf(log.WebsocketMgr, "%v %v: Sending message: %v", w.ExchangeName, removeURLQueryString(w.URL), string(message))
|
||||
}
|
||||
return w.Connection.WriteMessage(messageType, message)
|
||||
})
|
||||
@@ -190,18 +191,12 @@ func (w *WebsocketConnection) ReadMessage() Response {
|
||||
case websocket.BinaryMessage:
|
||||
standardMessage, err = w.parseBinaryResponse(resp)
|
||||
if err != nil {
|
||||
log.Errorf(log.WebsocketMgr,
|
||||
"%v websocket connection: parseBinaryResponse error: %v",
|
||||
w.ExchangeName,
|
||||
err)
|
||||
log.Errorf(log.WebsocketMgr, "%v %v: Parse binary response error: %v", w.ExchangeName, removeURLQueryString(w.URL), err)
|
||||
return Response{}
|
||||
}
|
||||
}
|
||||
if w.Verbose {
|
||||
log.Debugf(log.WebsocketMgr,
|
||||
"%v websocket connection: message received: %v",
|
||||
w.ExchangeName,
|
||||
string(standardMessage))
|
||||
log.Debugf(log.WebsocketMgr, "%v %v: Message received: %v", w.ExchangeName, removeURLQueryString(w.URL), string(standardMessage))
|
||||
}
|
||||
return Response{Raw: standardMessage, Type: mType}
|
||||
}
|
||||
@@ -287,8 +282,8 @@ func (w *WebsocketConnection) SendMessageReturnResponse(ctx context.Context, sig
|
||||
|
||||
// SendMessageReturnResponses will send a WS message to the connection and wait for N responses
|
||||
// An error of ErrSignatureTimeout can be ignored if individual responses are being otherwise tracked
|
||||
func (w *WebsocketConnection) SendMessageReturnResponses(ctx context.Context, signature, request any, expected int) ([][]byte, error) {
|
||||
outbound, err := json.Marshal(request)
|
||||
func (w *WebsocketConnection) SendMessageReturnResponses(ctx context.Context, signature, payload any, expected int) ([][]byte, error) {
|
||||
outbound, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error marshaling json for %s: %w", signature, err)
|
||||
}
|
||||
@@ -326,5 +321,19 @@ func (w *WebsocketConnection) SendMessageReturnResponses(ctx context.Context, si
|
||||
w.Reporter.Latency(w.ExchangeName, outbound, time.Since(start))
|
||||
}
|
||||
|
||||
// Only check context verbosity. If the exchange is verbose, it will log the responses in the ReadMessage() call.
|
||||
if request.IsVerbose(ctx, false) {
|
||||
for i := range resps {
|
||||
log.Debugf(log.WebsocketMgr, "%v %v: Received response [%d/%d]: %v", w.ExchangeName, removeURLQueryString(w.URL), i+1, len(resps), string(resps[i]))
|
||||
}
|
||||
}
|
||||
|
||||
return resps, err
|
||||
}
|
||||
|
||||
func removeURLQueryString(url string) string {
|
||||
if index := strings.Index(url, "?"); index != -1 {
|
||||
return url[:index]
|
||||
}
|
||||
return url
|
||||
}
|
||||
|
||||
@@ -1241,3 +1241,10 @@ func TestCheckSubscriptions(t *testing.T) {
|
||||
err = ws.checkSubscriptions(subscription.List{{}})
|
||||
assert.NoError(t, err, "checkSubscriptions should not error")
|
||||
}
|
||||
|
||||
func TestRemoveURLQueryString(t *testing.T) {
|
||||
t.Parallel()
|
||||
assert.Equal(t, "https://www.google.com", removeURLQueryString("https://www.google.com?test=1"), "removeURLQueryString should remove query string")
|
||||
assert.Equal(t, "https://www.google.com", removeURLQueryString("https://www.google.com"), "removeURLQueryString should not change URL")
|
||||
assert.Equal(t, "", removeURLQueryString(""), "removeURLQueryString should be equal")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user