Files
gocryptotrader/exchanges/request/request.go
Ryan O'Hara-Reid cad7586e98 exchange/websocket, gateio: Rename/export package again, add websocket request functions for futures trading (#1603)
* gateio: Add multi asset websocket support WIP.

* meow

* Add tests and shenanigans

* integrate flushing and for enabling/disabling pairs from rpc shenanigans

* some changes

* linter: fixes strikes again.

* Change name ConnectionAssociation -> ConnectionCandidate for better clarity on purpose. Change connections map to point to candidate to track subscriptions for future dynamic connections holder and drop struct ConnectionDetails.

* Add subscription tests (state functional)

* glorious:nits + proxy handling

* Spelling

* linter: fixerino

* instead of nil, dont do nil.

* clean up nils

* cya nils

* don't need to set URL or check if its running

* stream match update

* update tests

* linter: fix

* glorious: nits + handle context cancellations

* stop ping handler routine leak

* * Fix bug where reader routine on error that is not a disconnection error but websocket frame error or anything really makes the reader routine return and then connection never cycles and the buffer gets filled.
* Handle reconnection via an errors.Is check which is simpler and in that scope allow for quick disconnect reconnect without waiting for connection cycle.
* Dial now uses code from DialContext but just calls context.Background()
* Don't allow reader to return on parse binary response error. Just output error and return a non nil response

* Allow rollback on connect on any error across all connections

* fix shadow jutsu

* glorious/gk: nitters - adds in ws mock server

* linter: fix

* fix deadlock on connection as the previous channel had no reader and would hang connection reader for eternity.

* glorious: whooops

* gk: nits

* Leak issue and edge case

* Websocket: Add SendMessageReturnResponses

* whooooooopsie

* gk: nitssssss

* Update exchanges/stream/stream_match.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/stream/stream_match_test.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* linter: appease the linter gods

* gk: nits

* gk: drain brain

* started

* more changes before merge match pr

* gateio: still building out

* gateio: finish spot

* fix up tests in gateio

* Add tests for stream package

* rm unused field

* glorious: nits

* rn files, specifically set function names to asset and offload routing to websocket type.

* linter: fix

* Add futures websocket request support

* gateio: integrate with IBOTExchange (cherry pick my nose)

* linter: fix

* glorious: nits

* add counter and update gateio

* fix collision issue

* Update exchanges/stream/websocket.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* glorious: nits

* add tests

* linter: fix

* After merge

* Add error connection info

* upgrade to upstream merge

* Fix edge case where it does not reconnect made by an already closed connection

* stream coverage

* glorious: nits

* glorious: nits removed asset error handling in stream package

* linter: fix

* rm block

* Add basic readme

* fix asset enabled flush cycle for multi connection

* spella: fix

* linter: fix

* Add glorious suggestions, fix some race thing

* reinstate name before any routine gets spawned

* stop on error in mock tests

* glorious: nits

* Set correct price

* glorious: nits found in CI build

* Add test for drain, bumped wait times as there seems to be something happening on macos CI builds, used context.WithTimeout because its instant.

* mutex across shutdown and connect for protection

* lint: fix

* test time withoffset, reinstate stop

* fix whoops

* const trafficCheckInterval; rm testmain

* y

* fix lint

* bump time check window

* stream: fix intermittant test failures while testing routines and remove code that is not needed.

* spells

* cant do what I did

* protect race due to routine.

* update testURL

* use mock websocket connection instead of test URL's

* linter: fix

* remove url because its throwing errors on CI builds

* connections drop all the time, don't need to worry about not being able to echo back ws data as it can be easily reviewed _test file side.

* remove another superfluous url thats not really set up for this

* spawn overwatch routine when there is no errors, inline checker instead of waiting for a time period, add sleep inline with echo handler as this is really quick and wanted to ensure that latency is handing correctly

* linter: fixerino uperino

* fix ID bug, why I do this, I don't know.

* glorious: panix

* linter: things

* whoops

* dont need to make consecutive Unix() calls

* websocket: fix potential panic on error and no responses and adding waitForResponses

* rm json parser and handle in json package instead

* in favour of json package unmarshalling

* linter: fix

* linter: fix again

* * change field name OutboundRequestSignature to WrapperDefinedConnectionSignature for agnostic inbound and outbound connections.
* change method name GetOutboundConnection to GetConnection for agnostic inbound and outbound connections.
* drop outbound field map for improved performance just using a range and field check (less complex as well)
* change field name connections to connectionToWrapper for better clarity

* spells and magic and wands

* glorious: nits

* comparable check for signature

* mv err var

* glorious: nits and stuff

* attempt to fix race

* glorious: nits

* gk: nits; engine log cleanup

* gk: nits; OCD

* gk: nits; move function change file names

* gk: nits; 🚀

* gk: nits; convert variadic function and message inspection to interface and include a specific function for that handling so as to not need nil on every call

* gk: nits; continued

* gk: engine nits; rm loaded exchange

* gk: nits; drop WebsocketLoginResponse

* stream: Add match method EnsureMatchWithData

* gk: nits; rn Inspect to IsFinal

* gk: nits; rn to MessageFilter

* linter: fix

* gateio: update rate limit definitions (cherry-pick)

* Add test and missing

* Shared REST rate limit definitions with Websocket service, set lookup item to nil for systems that do not require rate limiting; add glorious nit

* integrate rate limits for websocket trading spot

* conform to match upstream changes

* standardise names to upstream style

* fix wrapper standards test when sending a auth request through a websocket connection

* whoops

* Update exchanges/gateio/gateio_types.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* glorious: nits

* linter: fix

* linter: overload

* whoops

* spelling fixes on recent merge

* glorious: nits

* linter: fix?

* glorious: nits

* gk: assert errors touched

* gk: unexport derive functions

* gk: nitssssssss

* fix test

* gk: nitters v1

* gk: http status

* gk/nits: Add getAssetFromFuturesPair

* gk: nits single response when submitting

* gk: new pair with delimiter in tests

* gk: param update slice to slice of pointers

* gk: add asset type in params, includes t.Context() for tests

* linter: fix

* linter: fix

* fix merge whoopsie

* glorious: nits

* gk: nit

* shift over to websocket package error

* internal/exchange/websocket -> exchange/websocket

* PEAK OCD!

* appease the OCD gods

* thrasher: nits

---------

Co-authored-by: shazbert <ryan.oharareid@thrasher.io>
Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>
Co-authored-by: Scott <gloriousCode@users.noreply.github.com>
2025-04-11 16:47:33 +10:00

387 lines
11 KiB
Go

package request
import (
"context"
"errors"
"fmt"
"io"
"maps"
"net/http"
"net/http/httputil"
"net/url"
"time"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/common/timedmutex"
"github.com/thrasher-corp/gocryptotrader/encoding/json"
"github.com/thrasher-corp/gocryptotrader/exchanges/mock"
"github.com/thrasher-corp/gocryptotrader/exchanges/nonce"
"github.com/thrasher-corp/gocryptotrader/log"
)
const (
// UnsetRequest is an unset request authentication level
UnsetRequest AuthType = 0
// UnauthenticatedRequest denotes a request with no credentials
UnauthenticatedRequest = iota << 1
// AuthenticatedRequest denotes a request using API credentials
AuthenticatedRequest
contextVerboseFlag verbosity = "verbose"
)
// AuthType helps distinguish the purpose of a HTTP request
type AuthType uint8
var (
// ErrRequestSystemIsNil defines and error if the request system has not
// been set up yet.
ErrRequestSystemIsNil = errors.New("request system is nil")
// ErrAuthRequestFailed is a wrapping error to denote that it's an auth request that failed
ErrAuthRequestFailed = errors.New("authenticated request failed")
// ErrBadStatus is a wrapping error to denote that the HTTP status code was unsuccessful
ErrBadStatus = errors.New("unsuccessful HTTP status code")
errRequestFunctionIsNil = errors.New("request function is nil")
errRequestItemNil = errors.New("request item is nil")
errInvalidPath = errors.New("invalid path")
errHeaderResponseMapIsNil = errors.New("header response map is nil")
errFailedToRetryRequest = errors.New("failed to retry request")
errContextRequired = errors.New("context is required")
errTransportNotSet = errors.New("transport not set, cannot set timeout")
errRequestTypeUnpopulated = errors.New("request type bool is not populated")
)
// New returns a new Requester
func New(name string, httpRequester *http.Client, opts ...RequesterOption) (*Requester, error) {
protectedClient, err := newProtectedClient(httpRequester)
if err != nil {
return nil, fmt.Errorf("cannot set up a new requester for %s: %w", name, err)
}
r := &Requester{
_HTTPClient: protectedClient,
name: name,
backoff: DefaultBackoff(),
retryPolicy: DefaultRetryPolicy,
maxRetries: MaxRetryAttempts,
timedLock: timedmutex.NewTimedMutex(DefaultMutexLockTimeout),
reporter: globalReporter,
}
for _, o := range opts {
o(r)
}
return r, nil
}
// SendPayload handles sending HTTP/HTTPS requests
func (r *Requester) SendPayload(ctx context.Context, ep EndpointLimit, newRequest Generate, requestType AuthType) error {
if r == nil {
return ErrRequestSystemIsNil
}
if ctx == nil {
return errContextRequired
}
if requestType == UnsetRequest {
return errRequestTypeUnpopulated
}
defer r.timedLock.UnlockIfLocked()
if newRequest == nil {
return errRequestFunctionIsNil
}
err := r.doRequest(ctx, ep, newRequest)
if err != nil && requestType == AuthenticatedRequest {
err = common.AppendError(err, ErrAuthRequestFailed)
}
return err
}
// validateRequest validates the requester item fields
func (i *Item) validateRequest(ctx context.Context, r *Requester) (*http.Request, error) {
if i == nil {
return nil, errRequestItemNil
}
if i.Path == "" {
return nil, errInvalidPath
}
if i.HeaderResponse != nil && *i.HeaderResponse == nil {
return nil, errHeaderResponseMapIsNil
}
if !i.NonceEnabled {
r.timedLock.LockForDuration()
}
req, err := http.NewRequestWithContext(ctx, i.Method, i.Path, i.Body)
if err != nil {
return nil, err
}
if i.HTTPDebugging {
// Err not evaluated due to validation check above
dump, _ := httputil.DumpRequestOut(req, true)
log.Debugf(log.RequestSys, "DumpRequest:\n%s", dump)
}
for k, v := range i.Headers {
req.Header.Add(k, v)
}
if r.userAgent != "" && req.Header.Get(userAgent) == "" {
req.Header.Add(userAgent, r.userAgent)
}
return req, nil
}
// DoRequest performs a HTTP/HTTPS request with the supplied params
func (r *Requester) doRequest(ctx context.Context, endpoint EndpointLimit, newRequest Generate) error {
for attempt := 1; ; attempt++ {
// Check if context has finished before executing new attempt.
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if r.limiter != nil {
// Initiate a rate limit reservation and sleep on requested endpoint
err := r.InitiateRateLimit(ctx, endpoint)
if err != nil {
return fmt.Errorf("failed to rate limit HTTP request: %w", err)
}
}
p, err := newRequest()
if err != nil {
return err
}
req, err := p.validateRequest(ctx, r)
if err != nil {
return err
}
verbose := IsVerbose(ctx, p.Verbose)
if verbose {
log.Debugf(log.RequestSys, "%s attempt %d request path: %s", r.name, attempt, p.Path)
for k, d := range req.Header {
log.Debugf(log.RequestSys, "%s request header [%s]: %s", r.name, k, d)
}
log.Debugf(log.RequestSys, "%s request type: %s", r.name, p.Method)
if req.GetBody != nil {
bodyCopy, bodyErr := req.GetBody()
if bodyErr != nil {
return bodyErr
}
payload, bodyErr := io.ReadAll(bodyCopy)
err = bodyCopy.Close()
if err != nil {
log.Errorf(log.RequestSys, "%s failed to close request body %s", r.name, err)
}
if bodyErr != nil {
return bodyErr
}
log.Debugf(log.RequestSys, "%s request body: %s", r.name, payload)
}
}
start := time.Now()
resp, err := r._HTTPClient.do(req)
if r.reporter != nil && err == nil {
r.reporter.Latency(r.name, p.Method, p.Path, time.Since(start))
}
if retry, checkErr := r.retryPolicy(resp, err); checkErr != nil {
return checkErr
} else if retry {
if err == nil {
// If the body isn't fully read, the connection cannot be reused
r.drainBody(resp.Body)
}
if attempt > r.maxRetries {
if err != nil {
return fmt.Errorf("%w, err: %v", errFailedToRetryRequest, err)
}
return fmt.Errorf("%w, status: %s", errFailedToRetryRequest, resp.Status)
}
after := RetryAfter(resp, time.Now())
backoff := r.backoff(attempt)
delay := max(backoff, after)
if dl, ok := req.Context().Deadline(); ok && dl.Before(time.Now().Add(delay)) {
if err != nil {
return fmt.Errorf("deadline would be exceeded by retry, err: %v", err)
}
return fmt.Errorf("deadline would be exceeded by retry, status: %s", resp.Status)
}
if verbose {
log.Errorf(log.RequestSys, "%s request has failed. Retrying request in %s, attempt %d", r.name, delay, attempt)
}
if delay > 0 {
// Allow for context cancellation while delaying the retry.
select {
case <-time.After(delay):
case <-ctx.Done():
return ctx.Err()
}
}
continue
}
contents, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
// Even in the case of an erroneous condition below, yield the parsed
// response to caller.
var unmarshallError error
if p.Result != nil {
unmarshallError = json.Unmarshal(contents, p.Result)
}
if p.HTTPRecording {
// This dumps http responses for future mocking implementations
err = mock.HTTPRecord(resp, r.name, contents)
if err != nil {
return fmt.Errorf("mock recording failure %w, request %v: resp: %v", err, req, resp)
}
}
if p.HeaderResponse != nil {
maps.Copy(*p.HeaderResponse, resp.Header)
}
if resp.StatusCode < http.StatusOK ||
resp.StatusCode > http.StatusNoContent {
return fmt.Errorf("%s %w: %d raw response: %s",
r.name,
ErrBadStatus,
resp.StatusCode,
string(contents))
}
if p.HTTPDebugging {
dump, dumpErr := httputil.DumpResponse(resp, false)
if err != nil {
log.Errorf(log.RequestSys, "DumpResponse invalid response: %v:", dumpErr)
}
log.Debugf(log.RequestSys, "DumpResponse Headers (%v):\n%s", p.Path, dump)
log.Debugf(log.RequestSys, "DumpResponse Body (%v):\n %s", p.Path, string(contents))
}
err = resp.Body.Close()
if err != nil {
log.Errorf(log.RequestSys, "%s failed to close request body %s", r.name, err)
}
if verbose {
log.Debugf(log.RequestSys, "HTTP status: %s, Code: %v", resp.Status, resp.StatusCode)
if !p.HTTPDebugging {
log.Debugf(log.RequestSys, "%s raw response: %s", r.name, string(contents))
}
}
return unmarshallError
}
}
func (r *Requester) drainBody(body io.ReadCloser) {
if _, err := io.Copy(io.Discard, io.LimitReader(body, drainBodyLimit)); err != nil {
log.Errorf(log.RequestSys, "%s failed to drain request body %s", r.name, err)
}
if err := body.Close(); err != nil {
log.Errorf(log.RequestSys, "%s failed to close request body %s", r.name, err)
}
}
// GetNonce returns a nonce for requests. This locks and enforces concurrent
// nonce FIFO on the buffered job channel
func (r *Requester) GetNonce(set nonce.Setter) nonce.Value {
r.timedLock.LockForDuration()
return r.Nonce.GetAndIncrement(set)
}
// SetProxy sets a proxy address for the client transport
func (r *Requester) SetProxy(p *url.URL) error {
if r == nil {
return ErrRequestSystemIsNil
}
return r._HTTPClient.setProxy(p)
}
// SetHTTPClient sets exchanges HTTP client
func (r *Requester) SetHTTPClient(newClient *http.Client) error {
if r == nil {
return ErrRequestSystemIsNil
}
protectedClient, err := newProtectedClient(newClient)
if err != nil {
return err
}
r._HTTPClient = protectedClient
return nil
}
// SetHTTPClientTimeout sets the timeout value for the exchanges HTTP Client and
// also the underlying transports idle connection timeout
func (r *Requester) SetHTTPClientTimeout(timeout time.Duration) error {
if r == nil {
return ErrRequestSystemIsNil
}
return r._HTTPClient.setHTTPClientTimeout(timeout)
}
// SetHTTPClientUserAgent sets the exchanges HTTP user agent
func (r *Requester) SetHTTPClientUserAgent(userAgent string) error {
if r == nil {
return ErrRequestSystemIsNil
}
r.userAgent = userAgent
return nil
}
// GetHTTPClientUserAgent gets the exchanges HTTP user agent
func (r *Requester) GetHTTPClientUserAgent() (string, error) {
if r == nil {
return "", ErrRequestSystemIsNil
}
return r.userAgent, nil
}
// Shutdown releases persistent memory for garbage collection.
func (r *Requester) Shutdown() error {
if r == nil {
return ErrRequestSystemIsNil
}
return r._HTTPClient.release()
}
// WithVerbose adds verbosity to a request context so that specific requests
// can have distinct verbosity without impacting all requests.
func WithVerbose(ctx context.Context) context.Context {
return context.WithValue(ctx, contextVerboseFlag, true)
}
// IsVerbose checks main verbosity first then checks context verbose values
// for specific request verbosity.
func IsVerbose(ctx context.Context, verbose bool) bool {
if !verbose {
verbose, _ = ctx.Value(contextVerboseFlag).(bool)
}
return verbose
}