From db7441c723a5cfefb396c0cc5e0e71594057b72d Mon Sep 17 00:00:00 2001 From: TaltaM <68383301+TaltaM@users.noreply.github.com> Date: Wed, 21 Jun 2023 04:33:40 +0200 Subject: [PATCH] dispatch: channel reuse fix (#1237) * Add test for dispatch channel reuse * Dispatcher - make chans bidirectional * No need to to keep the type assertion separate from the Get() * Unexport Pipe's channel and add getter --- dispatch/dispatch.go | 4 ++-- dispatch/dispatch_test.go | 20 +++++++++++++++----- dispatch/dispatch_types.go | 2 +- dispatch/mux.go | 11 ++++++++--- engine/rpcserver.go | 8 ++++---- exchanges/account/account_test.go | 2 +- 6 files changed, 31 insertions(+), 16 deletions(-) diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index d0323a85..a8d3f051 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -218,7 +218,7 @@ func (d *Dispatcher) publish(id uuid.UUID, data interface{}) error { // Subscribe subscribes a system and returns a communication chan, this does not // ensure initial push. -func (d *Dispatcher) subscribe(id uuid.UUID) (<-chan interface{}, error) { +func (d *Dispatcher) subscribe(id uuid.UUID) (chan interface{}, error) { if d == nil { return nil, errDispatcherNotInitialized } @@ -251,7 +251,7 @@ func (d *Dispatcher) subscribe(id uuid.UUID) (<-chan interface{}, error) { } // Unsubscribe unsubs a routine from the dispatcher -func (d *Dispatcher) unsubscribe(id uuid.UUID, usedChan <-chan interface{}) error { +func (d *Dispatcher) unsubscribe(id uuid.UUID, usedChan chan interface{}) error { if d == nil { return errDispatcherNotInitialized } diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index 4656c52a..566d9df5 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -183,7 +183,7 @@ func TestUnsubscribe(t *testing.T) { } // will return nil if not running - err = d.unsubscribe(nonEmptyUUID, make(<-chan interface{})) + err = d.unsubscribe(nonEmptyUUID, make(chan interface{})) if !errors.Is(err, nil) { t.Fatalf("received: '%v' but expected: '%v'", err, nil) } @@ -193,7 +193,7 @@ func TestUnsubscribe(t *testing.T) { t.Fatalf("received: '%v' but expected: '%v'", err, nil) } - err = d.unsubscribe(nonEmptyUUID, make(<-chan interface{})) + err = d.unsubscribe(nonEmptyUUID, make(chan interface{})) if !errors.Is(err, errDispatcherUUIDNotFoundInRouteList) { t.Fatalf("received: '%v' but expected: '%v'", err, errDispatcherUUIDNotFoundInRouteList) } @@ -203,7 +203,7 @@ func TestUnsubscribe(t *testing.T) { t.Fatalf("received: '%v' but expected: '%v'", err, nil) } - err = d.unsubscribe(id, make(<-chan interface{})) + err = d.unsubscribe(id, make(chan interface{})) if !errors.Is(err, errChannelNotFoundInUUIDRef) { t.Fatalf("received: '%v' but expected: '%v'", err, errChannelNotFoundInUUIDRef) } @@ -223,6 +223,16 @@ func TestUnsubscribe(t *testing.T) { if !errors.Is(err, nil) { t.Fatalf("received: '%v' but expected: '%v'", err, nil) } + + ch2, err := d.subscribe(id) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } + + err = d.unsubscribe(id, ch2) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } } func TestPublish(t *testing.T) { @@ -416,7 +426,7 @@ func TestMux(t *testing.T) { return } errChan <- nil - }(pipe.C, errChan, &wg) + }(pipe.c, errChan, &wg) wg.Wait() @@ -496,7 +506,7 @@ func TestMuxPublish(t *testing.T) { } }(mux) - <-pipe.C + <-pipe.Channel() // Shut down dispatch system err = d.stop() diff --git a/dispatch/dispatch_types.go b/dispatch/dispatch_types.go index 0a4b4471..32f64167 100644 --- a/dispatch/dispatch_types.go +++ b/dispatch/dispatch_types.go @@ -72,7 +72,7 @@ type Mux struct { // Pipe defines an outbound object to the desired routine type Pipe struct { // Channel to get all our lovely information - C <-chan interface{} + c chan interface{} // ID to tracked system id uuid.UUID // Reference to multiplexer diff --git a/dispatch/mux.go b/dispatch/mux.go index bcde3e85..516f20f0 100644 --- a/dispatch/mux.go +++ b/dispatch/mux.go @@ -38,11 +38,11 @@ func (m *Mux) Subscribe(id uuid.UUID) (Pipe, error) { return Pipe{}, err } - return Pipe{C: ch, id: id, m: m}, nil + return Pipe{c: ch, id: id, m: m}, nil } // Unsubscribe returns channel to the pool for the full signature set -func (m *Mux) Unsubscribe(id uuid.UUID, ch <-chan interface{}) error { +func (m *Mux) Unsubscribe(id uuid.UUID, ch chan interface{}) error { if m == nil { return errMuxIsNil } @@ -83,5 +83,10 @@ func (m *Mux) GetID() (uuid.UUID, error) { // Release returns the channel to the communications pool to be reused func (p *Pipe) Release() error { - return p.m.Unsubscribe(p.id, p.C) + return p.m.Unsubscribe(p.id, p.c) +} + +// Channel returns the Pipe's channel +func (p *Pipe) Channel() <-chan interface{} { + return p.c } diff --git a/engine/rpcserver.go b/engine/rpcserver.go index 0b51de19..7dc41a3f 100644 --- a/engine/rpcserver.go +++ b/engine/rpcserver.go @@ -714,7 +714,7 @@ func (s *RPCServer) GetAccountInfoStream(r *gctrpc.GetAccountInfoRequest, stream }() for { - data, ok := <-pipe.C + data, ok := <-pipe.Channel() if !ok { return errDispatchSystem } @@ -2188,7 +2188,7 @@ func (s *RPCServer) GetExchangeOrderbookStream(r *gctrpc.GetExchangeOrderbookStr }() for { - data, ok := <-pipe.C + data, ok := <-pipe.Channel() if !ok { return errDispatchSystem } @@ -2273,7 +2273,7 @@ func (s *RPCServer) GetTickerStream(r *gctrpc.GetTickerStreamRequest, stream gct }() for { - data, ok := <-pipe.C + data, ok := <-pipe.Channel() if !ok { return errDispatchSystem } @@ -2326,7 +2326,7 @@ func (s *RPCServer) GetExchangeTickerStream(r *gctrpc.GetExchangeTickerStreamReq }() for { - data, ok := <-pipe.C + data, ok := <-pipe.Channel() if !ok { return errDispatchSystem } diff --git a/exchanges/account/account_test.go b/exchanges/account/account_test.go index 90fe5980..b82c6146 100644 --- a/exchanges/account/account_test.go +++ b/exchanges/account/account_test.go @@ -213,7 +213,7 @@ func TestGetHoldings(t *testing.T) { for i := 0; i < 2; i++ { c := time.NewTimer(time.Second) select { - case <-p.C: + case <-p.Channel(): case <-c.C: } }