alert: Add optimizations (#939)

* alert: Add optimizations

* alert: add basic benchmarks

* alert: fix linter issue

* documentation: change to text/template as html/template escapes to protect against code injection. Add readme.md for alert.

* README: Add package name

* alert: link up with engine settings

* request: isVerbose refactor

* Update exchanges/alert/alert_test.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* Update exchanges/alert/alert.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* glorious: nits

* glorious: fun police

* documentation: regen

Co-authored-by: Ryan O'Hara-Reid <ryan.oharareid@thrasher.io>
Co-authored-by: Scott <gloriousCode@users.noreply.github.com>
This commit is contained in:
Ryan O'Hara-Reid
2022-07-01 15:35:28 +10:00
committed by GitHub
parent 1736c1ed8d
commit 68db4155bf
11 changed files with 413 additions and 61 deletions

View File

@@ -144,10 +144,10 @@ Binaries will be published once the codebase reaches a stable condition.
|User|Contribution Amount|
|--|--|
| [thrasher-](https://github.com/thrasher-) | 666 |
| [shazbert](https://github.com/shazbert) | 249 |
| [gloriousCode](https://github.com/gloriousCode) | 195 |
| [shazbert](https://github.com/shazbert) | 256 |
| [gloriousCode](https://github.com/gloriousCode) | 196 |
| [dependabot-preview[bot]](https://github.com/apps/dependabot-preview) | 88 |
| [dependabot[bot]](https://github.com/apps/dependabot) | 73 |
| [dependabot[bot]](https://github.com/apps/dependabot) | 88 |
| [xtda](https://github.com/xtda) | 47 |
| [lrascao](https://github.com/lrascao) | 27 |
| [Rots](https://github.com/Rots) | 15 |

View File

@@ -6,12 +6,12 @@ import (
"errors"
"flag"
"fmt"
"html/template"
"log"
"net/http"
"os"
"path/filepath"
"strings"
"text/template"
"time"
"github.com/thrasher-corp/gocryptotrader/common"
@@ -430,14 +430,16 @@ func GetTemplateFiles() (*template.Template, error) {
return nil
}
var parseError error
tmpl, parseError = tmpl.ParseGlob(filepath.Join(path, "*.tmpl"))
if parseError != nil {
if strings.Contains(parseError.Error(), "pattern matches no files") {
var tmplExt *template.Template
tmplExt, err = tmpl.ParseGlob(filepath.Join(path, "*.tmpl"))
if err != nil {
fmt.Println(err)
if strings.Contains(err.Error(), "pattern matches no files") {
return nil
}
return parseError
return err
}
tmpl = tmplExt
return filepath.SkipDir
}
return nil

View File

@@ -0,0 +1,92 @@
{{define "exchanges alert" -}}
{{template "header" .}}
## Alert
+ This package allows for multiple routines to wait for a state change on any required data.
### Examples:
+ Implementation:
```go
// SomeChangingType defines an example struct with an embedded alert.Notice
// type for easy access to the notice methods.
type SomeChangingType struct {
ValueThatChanges int64
alert.Notice
mu sync.Mutex // Protection for routine shenanigans
}
// Update will update in a separate routine
func (s *SomeChangingType) Update(newValue int64) {
// This simulates a changing variable or state
s.mu.Lock()
s.ValueThatChanges = newValue
// This will alert any routines that are currently waiting for a change
s.Alert()
s.mu.Unlock()
}
// WhatsTheValue will retrieve the value that was changed and should be
// different from the past value. Efficiency++
func (s *SomeChangingType) WhatsTheValue() int64 {
s.mu.Lock()
value := s.ValueThatChanges
s.mu.Unlock()
return value
}
```
+ Routine waiting for change:
```go
// ExampleRoutineThatWaits defines an exchange potential routine that will wait
// for an impending change.
func ExampleRoutineThatWaits(potentialChange *SomeChangingType) {
// Every iteration requires a Wait() call.
for range potentialChange.Wait(nil) {
val := potentialChange.WhatsTheValue()
fmt.Println("Value:", val)
}
}
// AnotherExampleRoutineThatWaits defines an exchange potential routine that
// will wait for an impending change.
func AnotherExampleRoutineThatWaits(potentialChange *SomeChangingType) {
// Every iteration requires a Wait() call.
for {
select {
case <-potentialChange.Wait(nil):
val := potentialChange.WhatsTheValue()
fmt.Println("Value:", val)
case <-shutdownChannel:
fmt.Println("Good-Bye!")
return
}
}
}
// WARNING: PLEASE DON'T DO THIS.
// This will stop alerting for this specific data type due to the shared nature
// of the underlying channels using a sync.Pool.
func ABadExampleRoutineThatWaits(potentialChange *SomeChangingType) {
capturedChannel := potentialChange.Wait(nil)
for {
select {
case <-capturedChannel:
// This will produce incorrect results or no change.
val := potentialChange.WhatsTheValue()
fmt.Println("Value:", val)
case <-shutdownChannel:
fmt.Println("Good-Bye!")
return
}
}
}
```
### Please click GoDocs chevron above to view current GoDoc information for this package
{{template "contributions"}}
{{template "donations" .}}
{{end}}

View File

@@ -36,7 +36,7 @@ func TestCalculatePercentageGainOrLoss(t *testing.T) {
actualResult := CalculatePercentageGainOrLoss(originalInput, secondInput)
if expectedOutput != actualResult {
t.Errorf(
"Expected '%f'. Actual '%f'.", expectedOutput, actualResult)
"Expected '%v'. Actual '%v'.", expectedOutput, actualResult)
}
}

View File

@@ -17,6 +17,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/dispatch"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/alert"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
"github.com/thrasher-corp/gocryptotrader/exchanges/trade"
@@ -250,6 +251,15 @@ func validateSettings(b *Engine, s *Settings, flagSet FlagSet) {
err)
}
}
if b.Settings.AlertSystemPreAllocationCommsBuffer != alert.PreAllocCommsDefaultBuffer {
err = alert.SetPreAllocationCommsBuffer(b.Settings.AlertSystemPreAllocationCommsBuffer)
if err != nil {
gctlog.Errorf(gctlog.Global, "Could not set alert pre-allocation comms buffer to %v: %v",
b.Settings.AlertSystemPreAllocationCommsBuffer,
err)
}
}
}
// PrintSettings returns the engine settings
@@ -309,6 +319,7 @@ func PrintSettings(s *Settings) {
gctlog.Debugf(gctlog.Global, "\t Max HTTP request jobs: %v", s.MaxHTTPRequestJobsLimit)
gctlog.Debugf(gctlog.Global, "\t HTTP request max retry attempts: %v", s.RequestMaxRetryAttempts)
gctlog.Debugf(gctlog.Global, "\t Trade buffer processing interval: %v", s.TradeBufferProcessingInterval)
gctlog.Debugf(gctlog.Global, "\t Alert communications channel pre-allocation buffer size: %v", s.AlertSystemPreAllocationCommsBuffer)
gctlog.Debugf(gctlog.Global, "\t HTTP timeout: %v", s.HTTPTimeout)
gctlog.Debugf(gctlog.Global, "\t HTTP user agent: %v", s.HTTPUserAgent)
gctlog.Debugf(gctlog.Global, "- GCTSCRIPT SETTINGS: ")

View File

@@ -60,17 +60,18 @@ type Settings struct {
EnableExchangeRateHost bool
// Exchange tuning settings
EnableExchangeHTTPRateLimiter bool
EnableExchangeHTTPDebugging bool
EnableExchangeVerbose bool
ExchangePurgeCredentials bool
EnableExchangeAutoPairUpdates bool
DisableExchangeAutoPairUpdates bool
EnableExchangeRESTSupport bool
EnableExchangeWebsocketSupport bool
MaxHTTPRequestJobsLimit int
TradeBufferProcessingInterval time.Duration
RequestMaxRetryAttempts int
EnableExchangeHTTPRateLimiter bool
EnableExchangeHTTPDebugging bool
EnableExchangeVerbose bool
ExchangePurgeCredentials bool
EnableExchangeAutoPairUpdates bool
DisableExchangeAutoPairUpdates bool
EnableExchangeRESTSupport bool
EnableExchangeWebsocketSupport bool
MaxHTTPRequestJobsLimit int
TradeBufferProcessingInterval time.Duration
RequestMaxRetryAttempts int
AlertSystemPreAllocationCommsBuffer int // See exchanges/alert.go
// Global HTTP related settings
GlobalHTTPTimeout time.Duration

126
exchanges/alert/README.md Normal file
View File

@@ -0,0 +1,126 @@
# GoCryptoTrader package Alert
<img src="/common/gctlogo.png?raw=true" width="350px" height="350px" hspace="70">
[![Build Status](https://github.com/thrasher-corp/gocryptotrader/actions/workflows/tests.yml/badge.svg?branch=master)](https://github.com/thrasher-corp/gocryptotrader/actions/workflows/tests.yml)
[![Software License](https://img.shields.io/badge/License-MIT-orange.svg?style=flat-square)](https://github.com/thrasher-corp/gocryptotrader/blob/master/LICENSE)
[![GoDoc](https://godoc.org/github.com/thrasher-corp/gocryptotrader?status.svg)](https://godoc.org/github.com/thrasher-corp/gocryptotrader/exchanges/alert)
[![Coverage Status](http://codecov.io/github/thrasher-corp/gocryptotrader/coverage.svg?branch=master)](http://codecov.io/github/thrasher-corp/gocryptotrader?branch=master)
[![Go Report Card](https://goreportcard.com/badge/github.com/thrasher-corp/gocryptotrader)](https://goreportcard.com/report/github.com/thrasher-corp/gocryptotrader)
This alert package is part of the GoCryptoTrader codebase.
## This is still in active development
You can track ideas, planned features and what's in progress on this Trello board: [https://trello.com/b/ZAhMhpOy/gocryptotrader](https://trello.com/b/ZAhMhpOy/gocryptotrader).
Join our slack to discuss all things related to GoCryptoTrader! [GoCryptoTrader Slack](https://join.slack.com/t/gocryptotrader/shared_invite/enQtNTQ5NDAxMjA2Mjc5LTc5ZDE1ZTNiOGM3ZGMyMmY1NTAxYWZhODE0MWM5N2JlZDk1NDU0YTViYzk4NTk3OTRiMDQzNGQ1YTc4YmRlMTk)
## Alert
+ This package allows for multiple routines to wait for a state change on any required data.
### Examples:
+ Implementation:
```go
// SomeChangingType defines an example struct with an embedded alert.Notice
// type for easy access to the notice methods.
type SomeChangingType struct {
ValueThatChanges int64
alert.Notice
mu sync.Mutex // Protection for routine shenanigans
}
// Update will update in a separate routine
func (s *SomeChangingType) Update(newValue int64) {
// This simulates a changing variable or state
s.mu.Lock()
s.ValueThatChanges = newValue
// This will alert any routines that are currently waiting for a change
s.Alert()
s.mu.Unlock()
}
// WhatsTheValue will retrieve the value that was changed and should be
// different from the past value. Efficiency++
func (s *SomeChangingType) WhatsTheValue() int64 {
s.mu.Lock()
value := s.ValueThatChanges
s.mu.Unlock()
return value
}
```
+ Routine waiting for change:
```go
// ExampleRoutineThatWaits defines an exchange potential routine that will wait
// for an impending change.
func ExampleRoutineThatWaits(potentialChange *SomeChangingType) {
// Every iteration requires a Wait() call.
for range potentialChange.Wait(nil) {
val := potentialChange.WhatsTheValue()
fmt.Println("Value:", val)
}
}
// AnotherExampleRoutineThatWaits defines an exchange potential routine that
// will wait for an impending change.
func AnotherExampleRoutineThatWaits(potentialChange *SomeChangingType) {
// Every iteration requires a Wait() call.
for {
select {
case <-potentialChange.Wait(nil):
val := potentialChange.WhatsTheValue()
fmt.Println("Value:", val)
case <-shutdownChannel:
fmt.Println("Good-Bye!")
return
}
}
}
// WARNING: PLEASE DON'T DO THIS.
// This will stop alerting for this specific data type due to the shared nature
// of the underlying channels using a sync.Pool.
func ABadExampleRoutineThatWaits(potentialChange *SomeChangingType) {
capturedChannel := potentialChange.Wait(nil)
for {
select {
case <-capturedChannel:
// This will produce incorrect results or no change.
val := potentialChange.WhatsTheValue()
fmt.Println("Value:", val)
case <-shutdownChannel:
fmt.Println("Good-Bye!")
return
}
}
}
```
### Please click GoDocs chevron above to view current GoDoc information for this package
## Contribution
Please feel free to submit any pull requests or suggest any desired features to be added.
When submitting a PR, please abide by our coding guidelines:
+ Code must adhere to the official Go [formatting](https://golang.org/doc/effective_go.html#formatting) guidelines (i.e. uses [gofmt](https://golang.org/cmd/gofmt/)).
+ Code must be documented adhering to the official Go [commentary](https://golang.org/doc/effective_go.html#commentary) guidelines.
+ Code must adhere to our [coding style](https://github.com/thrasher-corp/gocryptotrader/blob/master/doc/coding_style.md).
+ Pull requests need to be based on and opened against the `master` branch.
## Donations
<img src="https://github.com/thrasher-corp/gocryptotrader/blob/master/web/src/assets/donate.png?raw=true" hspace="70">
If this framework helped you in any way, or you would like to support the developers working on it, please donate Bitcoin to:
***bc1qk0jareu4jytc0cfrhr5wgshsq8282awpavfahc***

View File

@@ -1,10 +1,51 @@
package alert
import (
"errors"
"fmt"
"sync"
"sync/atomic"
)
const (
inactive = uint32(iota)
active
alerting
dataToActuatorDefaultBuffer = 1
PreAllocCommsDefaultBuffer = 5
)
var (
// pool is a silent shared pool between all notice instances for alerting
// external routines waiting on a state change.
pool = sync.Pool{New: func() interface{} { return make(chan bool) }}
preAllocBufferSize = PreAllocCommsDefaultBuffer
mu sync.RWMutex
errInvalidBufferSize = errors.New("invalid buffer size cannot be equal or less than zero")
)
// SetPreAllocationCommsBuffer sets buffer size of the pre-allocated comms.
func SetPreAllocationCommsBuffer(size int) error {
if size <= 0 {
return fmt.Errorf("%w received %v", errInvalidBufferSize, size)
}
mu.Lock()
preAllocBufferSize = size
mu.Unlock()
return nil
}
// SetDefaultPreAllocationCommsBuffer sets default buffer size of the
// pre-allocated comms.
func SetDefaultPreAllocationCommsBuffer() {
mu.Lock()
preAllocBufferSize = PreAllocCommsDefaultBuffer
mu.Unlock()
}
// Notice defines fields required to alert sub-systems of a change of state so a
// routine can re-check in memory data
type Notice struct {
@@ -18,7 +59,12 @@ type Notice struct {
wg sync.WaitGroup
// Segregated lock only for waiting routines, so as this does not interfere
// with the main calling lock, this acts as a rolling gate.
m sync.Mutex
mu sync.Mutex
// toActuatorRoutine is communication between the alert call and the
// actuator routine
toActuatorRoutine chan struct{}
// alerters are a pre allocated channel of communications pipes
alerters chan chan struct{}
}
// Alert establishes a state change on the required struct.
@@ -26,58 +72,85 @@ func (n *Notice) Alert() {
// CompareAndSwap is used to swap from 1 -> 2 so we don't keep actuating
// the opposing compare and swap in method wait. This function can return
// freely when an alert operation is in process.
if !atomic.CompareAndSwapUint32(&n.sema, 1, 2) {
if !atomic.CompareAndSwapUint32(&n.sema, active, alerting) {
// Return if no waiting routines or currently alerting.
return
}
go n.actuate()
if n.toActuatorRoutine == nil {
// Buffered communications channel in communication with actuate routine,
// so as to not worry about slow receivers that will inhibit alert
// returning.
n.toActuatorRoutine = make(chan struct{}, dataToActuatorDefaultBuffer)
// Spawn persistent routine that blocks only when required instead of
// spawning a routine for every alert.
go n.actuate()
}
// Buffered channel will alert actuate routine without waiting and return.
n.toActuatorRoutine <- struct{}{}
}
// Actuate lock in a different routine, as alerting is a second order priority
// compared to updating and releasing calling routine.
func (n *Notice) actuate() {
n.m.Lock()
// Closing; alerts many waiting routines.
close(n.forAlert)
// Wait for waiting routines to receive alert and return.
n.wg.Wait()
atomic.SwapUint32(&n.sema, 0) // Swap back to neutral state.
n.m.Unlock()
for range n.toActuatorRoutine {
n.mu.Lock()
// Closing; alerts many waiting routines.
close(n.forAlert)
// Wait for waiting routines to receive alert and return.
n.wg.Wait()
atomic.SwapUint32(&n.sema, inactive) // Swap back to neutral state.
n.mu.Unlock()
}
}
// generator routine pre-loads chan struct communicators that will be closed.
func (n *Notice) generator() {
for {
// This will block once filled appropriately.
n.alerters <- make(chan struct{})
}
}
// Wait pauses calling routine until change of state has been established via
// notice method Alert. Kick allows for cancellation of waiting or when the
// caller has been shut down, if this is not needed it can be set to nil. This
// returns a channel so strategies can cleanly wait on a select statement case.
func (n *Notice) Wait(kick <-chan struct{}) <-chan bool {
reply := make(chan bool)
n.m.Lock()
n.wg.Add(1)
if atomic.CompareAndSwapUint32(&n.sema, 0, 1) {
n.forAlert = make(chan struct{})
// NOTE: Please see README.md for implementation example.
func (n *Notice) Wait(kick <-chan struct{}) chan bool {
reply, ok := pool.Get().(chan bool)
if !ok {
reply = make(chan bool)
}
n.mu.Lock()
if atomic.CompareAndSwapUint32(&n.sema, inactive, active) {
if n.alerters == nil {
mu.RLock()
n.alerters = make(chan chan struct{}, preAllocBufferSize)
mu.RUnlock()
go n.generator()
}
n.forAlert = <-n.alerters
}
n.wg.Add(1)
go n.hold(reply, kick)
n.m.Unlock()
n.mu.Unlock()
return reply
}
// hold waits on either channel in the event that the routine has
// finished/cancelled or an alert from an update has occurred.
func (n *Notice) hold(ch chan<- bool, kick <-chan struct{}) {
// finished/cancelled or an alert from an update has occurred. This routine
// has the potential to leak if receivers never read but this ensures sanity
// instead of closing and differentiation between alerting and kicking, also
// ensures chan bool item is clean before being put back into pool.
func (n *Notice) hold(ch chan bool, kick <-chan struct{}) {
select {
// In a select statement, if by chance there is no receiver or its late,
// we can still close and return, limiting dead-lock potential.
case <-n.forAlert: // Main waiting channel from alert
select {
case ch <- false:
default:
}
n.wg.Done()
ch <- false
case <-kick: // This can be nil.
select {
case ch <- true:
default:
}
n.wg.Done()
ch <- true
}
n.wg.Done()
close(ch)
pool.Put(ch)
}

View File

@@ -1,6 +1,7 @@
package alert
import (
"errors"
"log"
"sync"
"testing"
@@ -90,3 +91,55 @@ func isLeaky(t *testing.T, a *Notice, ch chan struct{}) {
default:
}
}
// 120801772 9.334 ns/op 0 B/op 0 allocs/op // PREV
// 146173060 9.154 ns/op 0 B/op 0 allocs/op // CURRENT
func BenchmarkAlert(b *testing.B) {
n := Notice{}
for x := 0; x < b.N; x++ {
n.Alert()
}
}
// 150352 9916 ns/op 681 B/op 4 allocs/op // PREV
// 87436 14724 ns/op 682 B/op 4 allocs/op // CURRENT
func BenchmarkWait(b *testing.B) {
n := Notice{}
for x := 0; x < b.N; x++ {
n.Wait(nil)
}
}
// getSize checks the buffer size for testing purposes
func getSize() int {
mu.RLock()
defer mu.RUnlock()
return preAllocBufferSize
}
func TestSetPreAllocationCommsBuffer(t *testing.T) {
t.Parallel()
err := SetPreAllocationCommsBuffer(-1)
if !errors.Is(err, errInvalidBufferSize) {
t.Fatalf("received: '%v' but expected '%v'", err, errInvalidBufferSize)
}
if getSize() != 5 {
t.Fatal("unexpected amount")
}
err = SetPreAllocationCommsBuffer(7)
if !errors.Is(err, nil) {
t.Fatalf("received: '%v' but expected '%v'", err, nil)
}
if getSize() != 7 {
t.Fatal("unexpected amount")
}
SetDefaultPreAllocationCommsBuffer()
if getSize() != PreAllocCommsDefaultBuffer {
t.Fatal("unexpected amount")
}
}

View File

@@ -385,14 +385,6 @@ func isVerbose(ctx context.Context, verbose bool) bool {
return true
}
val := ctx.Value(contextVerboseFlag)
if val == nil {
return false
}
isCtxVerbose, ok := val.(bool)
if !ok {
return false
}
isCtxVerbose, _ := ctx.Value(contextVerboseFlag).(bool)
return isCtxVerbose
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/core"
"github.com/thrasher-corp/gocryptotrader/dispatch"
"github.com/thrasher-corp/gocryptotrader/engine"
"github.com/thrasher-corp/gocryptotrader/exchanges/alert"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
"github.com/thrasher-corp/gocryptotrader/exchanges/trade"
"github.com/thrasher-corp/gocryptotrader/gctscript"
@@ -95,6 +96,7 @@ func main() {
flag.StringVar(&settings.HTTPProxy, "httpproxy", "", "sets the HTTP proxy server")
flag.BoolVar(&settings.EnableExchangeHTTPDebugging, "exchangehttpdebugging", false, "sets the exchanges HTTP debugging")
flag.DurationVar(&settings.TradeBufferProcessingInterval, "tradeprocessinginterval", trade.DefaultProcessorIntervalTime, "sets the interval to save trade buffer data to the database")
flag.IntVar(&settings.AlertSystemPreAllocationCommsBuffer, "alertbuffer", alert.PreAllocCommsDefaultBuffer, "sets the size of the pre-allocation communications buffer")
// Common tuning settings
flag.DurationVar(&settings.GlobalHTTPTimeout, "globalhttptimeout", time.Duration(0), "sets common HTTP timeout value for HTTP requests")