From ea1ef4d0d0b091bb355869a5b10fc2275f04d1c5 Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Tue, 3 Apr 2018 09:04:14 +1000 Subject: [PATCH] Update to test handling of async fetching through request handler package. (#114) * Migrated localbitcoins package to request handler. * Added interim update to routines.go for async fetching of ticker and orderbook. --- exchanges/localbitcoins/localbitcoins.go | 21 ++-- routines.go | 121 +++++++++++++++++------ 2 files changed, 103 insertions(+), 39 deletions(-) diff --git a/exchanges/localbitcoins/localbitcoins.go b/exchanges/localbitcoins/localbitcoins.go index ddf19a9c..0e2b7352 100644 --- a/exchanges/localbitcoins/localbitcoins.go +++ b/exchanges/localbitcoins/localbitcoins.go @@ -325,14 +325,14 @@ func (l *LocalBitcoins) GetTradeInfo(contactID string) error { // GetCountryCodes returns a list of valid and recognized countrycodes func (l *LocalBitcoins) GetCountryCodes() error { - return common.SendHTTPGetRequest(localbitcoinsAPIURL+localbitcoinsAPICountryCodes, true, l.Verbose, nil) + return l.SendHTTPRequest(localbitcoinsAPIURL+localbitcoinsAPICountryCodes, nil) } // GetCurrencies returns a list of valid and recognized fiat currencies. Also // contains human readable name for every currency and boolean that tells if // currency is an altcoin. func (l *LocalBitcoins) GetCurrencies() error { - return common.SendHTTPGetRequest(localbitcoinsAPIURL+localbitcoinsAPICurrencies, true, l.Verbose, nil) + return l.SendHTTPRequest(localbitcoinsAPIURL+localbitcoinsAPICurrencies, nil) } // GetDashboardInfo returns a list of trades on the data key contact_list. This @@ -435,13 +435,13 @@ func (l *LocalBitcoins) MarkNotifications(notificationID string) error { // and code for payment methods, and possible limitations in currencies and bank // name choices. func (l *LocalBitcoins) GetPaymentMethods() error { - return common.SendHTTPGetRequest(localbitcoinsAPIURL+localbitcoinsAPIPaymentMethods, true, l.Verbose, nil) + return l.SendHTTPRequest(localbitcoinsAPIURL+localbitcoinsAPIPaymentMethods, nil) } // GetPaymentMethodsByCountry returns a list of valid payment methods filtered // by countrycodes. func (l *LocalBitcoins) GetPaymentMethodsByCountry(countryCode string) error { - return common.SendHTTPGetRequest(localbitcoinsAPIURL+localbitcoinsAPIPaymentMethods+countryCode, true, l.Verbose, nil) + return l.SendHTTPRequest(localbitcoinsAPIURL+localbitcoinsAPIPaymentMethods+countryCode, nil) } // CheckPincode checks the given PIN code against the token owners currently @@ -475,7 +475,7 @@ func (l *LocalBitcoins) CheckPincode(pin int) (bool, error) { // GetPlaces Looks up places near lat, lon and provides full URLs to buy and // sell listings for each. func (l *LocalBitcoins) GetPlaces(lat, lon int, location, countryCode string) error { - return common.SendHTTPGetRequest(localbitcoinsAPIURL+localbitcoinsAPIPlaces, true, l.Verbose, nil) + return l.SendHTTPRequest(localbitcoinsAPIURL+localbitcoinsAPIPlaces, nil) } // VerifyUsername returns list of real name verifiers for the user. Returns a @@ -590,20 +590,19 @@ func (l *LocalBitcoins) GetWalletAddress() (string, error) { // GetBitcoinsWithCashAd returns buy or sell as cash local advertisements. func (l *LocalBitcoins) GetBitcoinsWithCashAd(locationID, locationSlug string, BuySide bool) error { - return common.SendHTTPGetRequest(localbitcoinsAPIURL+localbitcoinsAPICashBuy, true, l.Verbose, nil) + return l.SendHTTPRequest(localbitcoinsAPIURL+localbitcoinsAPICashBuy, nil) } // GetBitcoinsOnlineAd this API returns buy or sell Bitcoin online ads. func (l *LocalBitcoins) GetBitcoinsOnlineAd(countryCode, countryName, paymentMethod string, BuySide bool) error { - return common.SendHTTPGetRequest(localbitcoinsAPIURL+localbitcoinsAPIOnlineBuy, true, l.Verbose, nil) + return l.SendHTTPRequest(localbitcoinsAPIURL+localbitcoinsAPIOnlineBuy, nil) } // GetTicker returns list of all completed trades. func (l *LocalBitcoins) GetTicker() (map[string]Ticker, error) { result := make(map[string]Ticker) - return result, - common.SendHTTPGetRequest(localbitcoinsAPIURL+localbitcoinsAPITicker, true, l.Verbose, &result) + return result, l.SendHTTPRequest(localbitcoinsAPIURL+localbitcoinsAPITicker, &result) } // GetTrades returns all closed trades in online buy and online sell categories, @@ -612,7 +611,7 @@ func (l *LocalBitcoins) GetTrades(currency string, values url.Values) ([]Trade, path := common.EncodeURLValues(fmt.Sprintf("%s/%s/trades.json", localbitcoinsAPIURL+localbitcoinsAPIBitcoincharts, currency), values) result := []Trade{} - return result, common.SendHTTPGetRequest(path, true, l.Verbose, &result) + return result, l.SendHTTPRequest(path, &result) } // GetOrderbook returns buy and sell bitcoin online advertisements. Amount is @@ -627,7 +626,7 @@ func (l *LocalBitcoins) GetOrderbook(currency string) (Orderbook, error) { path := fmt.Sprintf("%s/%s/orderbook.json", localbitcoinsAPIURL+localbitcoinsAPIBitcoincharts, currency) resp := response{} - err := common.SendHTTPGetRequest(path, true, l.Verbose, &resp) + err := l.SendHTTPRequest(path, &resp) if err != nil { return Orderbook{}, err diff --git a/routines.go b/routines.go index 65ebe765..6b7a690c 100644 --- a/routines.go +++ b/routines.go @@ -3,6 +3,7 @@ package main import ( "fmt" "log" + "sync" "time" "github.com/thrasher-/gocryptotrader/currency" @@ -177,7 +178,10 @@ func relayWebsocketEvent(result interface{}, event, assetType, exchangeName stri // currency pairs and exchanges func TickerUpdaterRoutine() { log.Println("Starting ticker updater routine") + var waitExchanges sync.WaitGroup + for { + waitExchanges.Add(len(bot.exchanges)) for x := range bot.exchanges { if bot.exchanges[x] == nil { continue @@ -195,28 +199,46 @@ func TickerUpdaterRoutine() { exchangeName, err) } - for y := range enabledCurrencies { - currency := enabledCurrencies[y] + blocker := make(chan int, 1) - if len(assetTypes) > 1 { - for z := range assetTypes { + go func(c chan int, l int, wg *sync.WaitGroup) { + for i := 0; i < l; i++ { + <-c + } + log.Printf("Finished exchange %s ticker fetching for enabled currencies", exchangeName) + wg.Done() + }(blocker, len(enabledCurrencies), &waitExchanges) + + for y := range enabledCurrencies { + + go func(x, y int, c chan int) { + currency := enabledCurrencies[y] + if len(assetTypes) > 1 { + for z := range assetTypes { + result, err = bot.exchanges[x].UpdateTicker(currency, assetTypes[z]) + printSummary(result, currency, assetTypes[z], exchangeName, err) + if err == nil { + relayWebsocketEvent(result, "ticker_update", assetTypes[z], exchangeName) + } + } + } else { result, err = bot.exchanges[x].UpdateTicker(currency, - assetTypes[z]) - printSummary(result, currency, assetTypes[z], exchangeName, err) + assetTypes[0]) + printSummary(result, currency, assetTypes[0], exchangeName, err) if err == nil { - relayWebsocketEvent(result, "ticker_update", assetTypes[z], exchangeName) + relayWebsocketEvent(result, "ticker_update", assetTypes[0], exchangeName) } } - } else { - result, err = bot.exchanges[x].UpdateTicker(currency, - assetTypes[0]) - printSummary(result, currency, assetTypes[0], exchangeName, err) - if err == nil { - relayWebsocketEvent(result, "ticker_update", assetTypes[0], exchangeName) + select { + case c <- 1: + default: + log.Fatal("channel blocked in ticker monitoring routine") } - } + }(x, y, blocker) } } + waitExchanges.Wait() + log.Println("All enabled currency tickers fetched") time.Sleep(time.Second * 10) } } @@ -225,7 +247,10 @@ func TickerUpdaterRoutine() { // currency pairs and exchanges func OrderbookUpdaterRoutine() { log.Println("Starting orderbook updater routine") + var waitExchanges sync.WaitGroup + for { + waitExchanges.Add(len(bot.exchanges)) for x := range bot.exchanges { if bot.exchanges[x] == nil { continue @@ -242,28 +267,68 @@ func OrderbookUpdaterRoutine() { exchangeName, err) } - for y := range enabledCurrencies { - currency := enabledCurrencies[y] + blocker := make(chan int, 1) - if len(assetTypes) > 1 { - for z := range assetTypes { + go func(c chan int, l int, wg *sync.WaitGroup) { + for i := 0; i < l; i++ { + <-c + } + log.Printf("Finished exchange %s orderbook fetching for enabled currencies", exchangeName) + wg.Done() + }(blocker, len(enabledCurrencies), &waitExchanges) + + for y := range enabledCurrencies { + go func(y int, x int, assetTypes []string, c chan int) { + currency := enabledCurrencies[y] + var subWg sync.WaitGroup + + if len(assetTypes) > 1 { + subBlocker := make(chan int, 1) + + subWg.Add(len(assetTypes)) + + go func(c chan int, l int, wg *sync.WaitGroup) { + for i := 0; i < l; i++ { + <-c + } + wg.Done() + }(subBlocker, len(assetTypes), &subWg) + + for z := range assetTypes { + go func(z int, x int, c chan int) { + result, err = bot.exchanges[x].UpdateOrderbook(currency, + assetTypes[z]) + printOrderbookSummary(result, currency, assetTypes[z], exchangeName, err) + if err == nil { + relayWebsocketEvent(result, "orderbook_update", assetTypes[z], exchangeName) + } + select { + case subBlocker <- 1: + default: + log.Fatal("channel blocked in subroutine assetTypes monitoring") + } + }(z, x, subBlocker) + } + + } else { result, err = bot.exchanges[x].UpdateOrderbook(currency, - assetTypes[z]) - printOrderbookSummary(result, currency, assetTypes[z], exchangeName, err) + assetTypes[0]) + printOrderbookSummary(result, currency, assetTypes[0], exchangeName, err) if err == nil { - relayWebsocketEvent(result, "orderbook_update", assetTypes[z], exchangeName) + relayWebsocketEvent(result, "orderbook_update", assetTypes[0], exchangeName) } } - } else { - result, err = bot.exchanges[x].UpdateOrderbook(currency, - assetTypes[0]) - printOrderbookSummary(result, currency, assetTypes[0], exchangeName, err) - if err == nil { - relayWebsocketEvent(result, "orderbook_update", assetTypes[0], exchangeName) + select { + case c <- 1: + default: + log.Fatal("channel blocked in orderbook monitoring routine") } - } + subWg.Wait() + }(y, x, assetTypes, blocker) } } + waitExchanges.Wait() + log.Println("All enabled currency orderbooks fetched") time.Sleep(time.Second * 10) } }