diff --git a/bots/controllers/generic/arbitrage_controller.py b/bots/controllers/generic/arbitrage_controller.py new file mode 100644 index 0000000..825a866 --- /dev/null +++ b/bots/controllers/generic/arbitrage_controller.py @@ -0,0 +1,148 @@ +from decimal import Decimal +from typing import List + +import pandas as pd + +from hummingbot.client.ui.interface_utils import format_df_for_printout +from hummingbot.core.data_type.common import MarketDict +from hummingbot.data_feed.candles_feed.data_types import CandlesConfig +from hummingbot.strategy_v2.controllers.controller_base import ControllerBase, ControllerConfigBase +from hummingbot.strategy_v2.executors.arbitrage_executor.data_types import ArbitrageExecutorConfig +from hummingbot.strategy_v2.executors.data_types import ConnectorPair +from hummingbot.strategy_v2.models.base import RunnableStatus +from hummingbot.strategy_v2.models.executor_actions import CreateExecutorAction, ExecutorAction + + +class ArbitrageControllerConfig(ControllerConfigBase): + controller_name: str = "arbitrage_controller" + candles_config: List[CandlesConfig] = [] + exchange_pair_1: ConnectorPair = ConnectorPair(connector_name="binance", trading_pair="PENGU-USDT") + exchange_pair_2: ConnectorPair = ConnectorPair(connector_name="solana_jupiter_mainnet-beta", trading_pair="PENGU-USDC") + min_profitability: Decimal = Decimal("0.01") + delay_between_executors: int = 10 # in seconds + max_executors_imbalance: int = 1 + rate_connector: str = "binance" + quote_conversion_asset: str = "USDT" + + def update_markets(self, markets: MarketDict) -> MarketDict: + return [markets.add_or_update(cp.connector_name, cp.trading_pair) for cp in [self.exchange_pair_1, self.exchange_pair_2]][-1] + + +class ArbitrageController(ControllerBase): + gas_token_by_network = { + "ethereum": "ETH", + "solana": "SOL", + "binance-smart-chain": "BNB", + "polygon": "POL", + "avalanche": "AVAX", + "dexalot": "AVAX" + } + + def __init__(self, config: ArbitrageControllerConfig, *args, **kwargs): + self.config = config + super().__init__(config, *args, **kwargs) + self._imbalance = 0 + self._last_buy_closed_timestamp = 0 + self._last_sell_closed_timestamp = 0 + self._len_active_buy_arbitrages = 0 + self._len_active_sell_arbitrages = 0 + self.base_asset = self.config.exchange_pair_1.trading_pair.split("-")[0] + self.initialize_rate_sources() + + def initialize_rate_sources(self): + rates_required = [] + for connector_pair in [self.config.exchange_pair_1, self.config.exchange_pair_2]: + base, quote = connector_pair.trading_pair.split("-") + # Add rate source for gas token + if connector_pair.is_amm_connector(): + gas_token = self.get_gas_token(connector_pair.connector_name) + if gas_token != quote: + rates_required.append(ConnectorPair(connector_name=self.config.rate_connector, + trading_pair=f"{gas_token}-{quote}")) + + # Add rate source for quote conversion asset + if quote != self.config.quote_conversion_asset: + rates_required.append(ConnectorPair(connector_name=self.config.rate_connector, + trading_pair=f"{quote}-{self.config.quote_conversion_asset}")) + + # Add rate source for trading pairs + rates_required.append(ConnectorPair(connector_name=connector_pair.connector_name, + trading_pair=connector_pair.trading_pair)) + if len(rates_required) > 0: + self.market_data_provider.initialize_rate_sources(rates_required) + + def get_gas_token(self, connector_name: str) -> str: + _, chain, _ = connector_name.split("_") + return self.gas_token_by_network[chain] + + async def update_processed_data(self): + pass + + def determine_executor_actions(self) -> List[ExecutorAction]: + self.update_arbitrage_stats() + executor_actions = [] + current_time = self.market_data_provider.time() + if (abs(self._imbalance) >= self.config.max_executors_imbalance or + self._last_buy_closed_timestamp + self.config.delay_between_executors > current_time or + self._last_sell_closed_timestamp + self.config.delay_between_executors > current_time): + return executor_actions + if self._len_active_buy_arbitrages == 0: + executor_actions.append(self.create_arbitrage_executor_action(self.config.exchange_pair_1, + self.config.exchange_pair_2)) + if self._len_active_sell_arbitrages == 0: + executor_actions.append(self.create_arbitrage_executor_action(self.config.exchange_pair_2, + self.config.exchange_pair_1)) + return executor_actions + + def create_arbitrage_executor_action(self, buying_exchange_pair: ConnectorPair, + selling_exchange_pair: ConnectorPair): + try: + if buying_exchange_pair.is_amm_connector(): + gas_token = self.get_gas_token(buying_exchange_pair.connector_name) + pair = buying_exchange_pair.trading_pair.split("-")[0] + "-" + gas_token + gas_conversion_price = self.market_data_provider.get_rate(pair) + elif selling_exchange_pair.is_amm_connector(): + gas_token = self.get_gas_token(selling_exchange_pair.connector_name) + pair = selling_exchange_pair.trading_pair.split("-")[0] + "-" + gas_token + gas_conversion_price = self.market_data_provider.get_rate(pair) + else: + gas_conversion_price = None + rate = self.market_data_provider.get_rate(self.base_asset + "-" + self.config.quote_conversion_asset) + amount_quantized = self.market_data_provider.quantize_order_amount( + buying_exchange_pair.connector_name, buying_exchange_pair.trading_pair, + self.config.total_amount_quote / rate) + arbitrage_config = ArbitrageExecutorConfig( + timestamp=self.market_data_provider.time(), + buying_market=buying_exchange_pair, + selling_market=selling_exchange_pair, + order_amount=amount_quantized, + min_profitability=self.config.min_profitability, + gas_conversion_price=gas_conversion_price, + ) + return CreateExecutorAction( + executor_config=arbitrage_config, + controller_id=self.config.id) + except Exception as e: + self.logger().error( + f"Error creating executor to buy on {buying_exchange_pair.connector_name} and sell on {selling_exchange_pair.connector_name}, {e}") + + def update_arbitrage_stats(self): + closed_executors = [e for e in self.executors_info if e.status == RunnableStatus.TERMINATED] + active_executors = [e for e in self.executors_info if e.status != RunnableStatus.TERMINATED] + buy_arbitrages = [arbitrage for arbitrage in closed_executors if + arbitrage.config.buying_market == self.config.exchange_pair_1] + sell_arbitrages = [arbitrage for arbitrage in closed_executors if + arbitrage.config.buying_market == self.config.exchange_pair_2] + self._imbalance = len(buy_arbitrages) - len(sell_arbitrages) + self._last_buy_closed_timestamp = max([arbitrage.close_timestamp for arbitrage in buy_arbitrages]) if len( + buy_arbitrages) > 0 else 0 + self._last_sell_closed_timestamp = max([arbitrage.close_timestamp for arbitrage in sell_arbitrages]) if len( + sell_arbitrages) > 0 else 0 + self._len_active_buy_arbitrages = len([arbitrage for arbitrage in active_executors if + arbitrage.config.buying_market == self.config.exchange_pair_1]) + self._len_active_sell_arbitrages = len([arbitrage for arbitrage in active_executors if + arbitrage.config.buying_market == self.config.exchange_pair_2]) + + def to_format_status(self) -> List[str]: + all_executors_custom_info = pd.DataFrame(e.custom_info for e in self.executors_info) + return [format_df_for_printout(all_executors_custom_info, table_format="psql", )] diff --git a/bots/controllers/generic/grid_strike.py b/bots/controllers/generic/grid_strike.py index 20daf9e..825082c 100644 --- a/bots/controllers/generic/grid_strike.py +++ b/bots/controllers/generic/grid_strike.py @@ -1,9 +1,9 @@ from decimal import Decimal -from typing import Dict, List, Optional, Set +from typing import List, Optional from pydantic import Field -from hummingbot.core.data_type.common import OrderType, PositionMode, PriceType, TradeType +from hummingbot.core.data_type.common import MarketDict, OrderType, PositionMode, PriceType, TradeType from hummingbot.data_feed.candles_feed.data_types import CandlesConfig from hummingbot.strategy_v2.controllers import ControllerBase, ControllerConfigBase from hummingbot.strategy_v2.executors.data_types import ConnectorPair @@ -52,11 +52,8 @@ class GridStrikeConfig(ControllerConfigBase): take_profit_order_type=OrderType.LIMIT_MAKER, ) - def update_markets(self, markets: Dict[str, Set[str]]) -> Dict[str, Set[str]]: - if self.connector_name not in markets: - markets[self.connector_name] = set() - markets[self.connector_name].add(self.trading_pair) - return markets + def update_markets(self, markets: MarketDict) -> MarketDict: + return markets.add_or_update(self.connector_name, self.trading_pair) class GridStrike(ControllerBase): diff --git a/bots/controllers/generic/pmm.py b/bots/controllers/generic/pmm.py new file mode 100644 index 0000000..97e5513 --- /dev/null +++ b/bots/controllers/generic/pmm.py @@ -0,0 +1,647 @@ +from decimal import Decimal +from typing import List, Optional, Tuple, Union + +from pydantic import Field, field_validator +from pydantic_core.core_schema import ValidationInfo + +from hummingbot.core.data_type.common import MarketDict, OrderType, PositionMode, PriceType, TradeType +from hummingbot.data_feed.candles_feed.data_types import CandlesConfig +from hummingbot.strategy_v2.controllers.controller_base import ControllerBase, ControllerConfigBase +from hummingbot.strategy_v2.executors.data_types import ConnectorPair +from hummingbot.strategy_v2.executors.order_executor.data_types import ExecutionStrategy, OrderExecutorConfig +from hummingbot.strategy_v2.executors.position_executor.data_types import PositionExecutorConfig, TripleBarrierConfig +from hummingbot.strategy_v2.models.executor_actions import CreateExecutorAction, ExecutorAction, StopExecutorAction +from hummingbot.strategy_v2.models.executors import CloseType + + +class PMMConfig(ControllerConfigBase): + """ + This class represents the base configuration for a market making controller. + """ + controller_type: str = "generic" + controller_name: str = "pmm" + candles_config: List[CandlesConfig] = [] + connector_name: str = Field( + default="binance", + json_schema_extra={ + "prompt_on_new": True, + "prompt": "Enter the name of the connector to use (e.g., binance):", + } + ) + trading_pair: str = Field( + default="BTC-FDUSD", + json_schema_extra={ + "prompt_on_new": True, + "prompt": "Enter the trading pair to trade on (e.g., BTC-FDUSD):", + } + ) + portfolio_allocation: Decimal = Field( + default=Decimal("0.05"), + json_schema_extra={ + "prompt_on_new": True, + "prompt": "Enter the maximum quote exposure percentage around mid price (e.g., 0.05 for 5% of total quote allocation):", + } + ) + target_base_pct: Decimal = Field( + default=Decimal("0.2"), + json_schema_extra={ + "prompt_on_new": True, + "prompt": "Enter the target base percentage (e.g., 0.2 for 20%):", + } + ) + min_base_pct: Decimal = Field( + default=Decimal("0.1"), + json_schema_extra={ + "prompt_on_new": True, + "prompt": "Enter the minimum base percentage (e.g., 0.1 for 10%):", + } + ) + max_base_pct: Decimal = Field( + default=Decimal("0.4"), + json_schema_extra={ + "prompt_on_new": True, + "prompt": "Enter the maximum base percentage (e.g., 0.4 for 40%):", + } + ) + buy_spreads: List[float] = Field( + default="0.01,0.02", + json_schema_extra={ + "prompt_on_new": True, "is_updatable": True, + "prompt": "Enter a comma-separated list of buy spreads (e.g., '0.01, 0.02'):", + } + ) + sell_spreads: List[float] = Field( + default="0.01,0.02", + json_schema_extra={ + "prompt_on_new": True, "is_updatable": True, + "prompt": "Enter a comma-separated list of sell spreads (e.g., '0.01, 0.02'):", + } + ) + buy_amounts_pct: Union[List[Decimal], None] = Field( + default=None, + json_schema_extra={ + "prompt_on_new": True, "is_updatable": True, + "prompt": "Enter a comma-separated list of buy amounts as percentages (e.g., '50, 50'), or leave blank to distribute equally:", + } + ) + sell_amounts_pct: Union[List[Decimal], None] = Field( + default=None, + json_schema_extra={ + "prompt_on_new": True, "is_updatable": True, + "prompt": "Enter a comma-separated list of sell amounts as percentages (e.g., '50, 50'), or leave blank to distribute equally:", + } + ) + executor_refresh_time: int = Field( + default=60 * 5, + json_schema_extra={ + "prompt_on_new": True, "is_updatable": True, + "prompt": "Enter the refresh time in seconds for executors (e.g., 300 for 5 minutes):", + } + ) + cooldown_time: int = Field( + default=15, + json_schema_extra={ + "prompt_on_new": True, "is_updatable": True, + "prompt": "Enter the cooldown time in seconds between after replacing an executor that traded (e.g., 15):", + } + ) + leverage: int = Field( + default=20, + json_schema_extra={ + "prompt_on_new": True, "is_updatable": True, + "prompt": "Enter the leverage to use for trading (e.g., 20 for 20x leverage). Set it to 1 for spot trading:", + } + ) + position_mode: PositionMode = Field(default="HEDGE") + take_profit: Optional[Decimal] = Field( + default=Decimal("0.02"), gt=0, + json_schema_extra={ + "prompt_on_new": True, "is_updatable": True, + "prompt": "Enter the take profit as a decimal (e.g., 0.02 for 2%):", + } + ) + take_profit_order_type: Optional[OrderType] = Field( + default="LIMIT_MAKER", + json_schema_extra={ + "prompt_on_new": True, "is_updatable": True, + "prompt": "Enter the order type for take profit (e.g., LIMIT_MAKER):", + } + ) + max_skew: Decimal = Field( + default=Decimal("1.0"), + json_schema_extra={ + "prompt_on_new": True, "is_updatable": True, + "prompt": "Enter the maximum skew factor (e.g., 1.0):", + } + ) + global_take_profit: Decimal = Decimal("0.02") + global_stop_loss: Decimal = Decimal("0.05") + + @field_validator("take_profit", mode="before") + @classmethod + def validate_target(cls, v): + if isinstance(v, str): + if v == "": + return None + return Decimal(v) + return v + + @field_validator('take_profit_order_type', mode="before") + @classmethod + def validate_order_type(cls, v) -> OrderType: + if isinstance(v, OrderType): + return v + elif v is None: + return OrderType.MARKET + elif isinstance(v, str): + if v.upper() in OrderType.__members__: + return OrderType[v.upper()] + elif isinstance(v, int): + try: + return OrderType(v) + except ValueError: + pass + raise ValueError(f"Invalid order type: {v}. Valid options are: {', '.join(OrderType.__members__)}") + + @field_validator('buy_spreads', 'sell_spreads', mode="before") + @classmethod + def parse_spreads(cls, v): + if v is None: + return [] + if isinstance(v, str): + if v == "": + return [] + return [float(x.strip()) for x in v.split(',')] + return v + + @field_validator('buy_amounts_pct', 'sell_amounts_pct', mode="before") + @classmethod + def parse_and_validate_amounts(cls, v, validation_info: ValidationInfo): + field_name = validation_info.field_name + if v is None or v == "": + spread_field = field_name.replace('amounts_pct', 'spreads') + return [1 for _ in validation_info.data[spread_field]] + if isinstance(v, str): + return [float(x.strip()) for x in v.split(',')] + elif isinstance(v, list) and len(v) != len(validation_info.data[field_name.replace('amounts_pct', 'spreads')]): + raise ValueError( + f"The number of {field_name} must match the number of {field_name.replace('amounts_pct', 'spreads')}.") + return v + + @field_validator('position_mode', mode="before") + @classmethod + def validate_position_mode(cls, v) -> PositionMode: + if isinstance(v, str): + if v.upper() in PositionMode.__members__: + return PositionMode[v.upper()] + raise ValueError(f"Invalid position mode: {v}. Valid options are: {', '.join(PositionMode.__members__)}") + return v + + @property + def triple_barrier_config(self) -> TripleBarrierConfig: + return TripleBarrierConfig( + take_profit=self.take_profit, + trailing_stop=None, + open_order_type=OrderType.LIMIT_MAKER, # Defaulting to LIMIT as is a Maker Controller + take_profit_order_type=self.take_profit_order_type, + stop_loss_order_type=OrderType.MARKET, # Defaulting to MARKET as per requirement + time_limit_order_type=OrderType.MARKET # Defaulting to MARKET as per requirement + ) + + def update_parameters(self, trade_type: TradeType, new_spreads: Union[List[float], str], new_amounts_pct: Optional[Union[List[int], str]] = None): + spreads_field = 'buy_spreads' if trade_type == TradeType.BUY else 'sell_spreads' + amounts_pct_field = 'buy_amounts_pct' if trade_type == TradeType.BUY else 'sell_amounts_pct' + + setattr(self, spreads_field, self.parse_spreads(new_spreads)) + if new_amounts_pct is not None: + setattr(self, amounts_pct_field, self.parse_and_validate_amounts(new_amounts_pct, self.__dict__, self.__fields__[amounts_pct_field])) + else: + setattr(self, amounts_pct_field, [1 for _ in getattr(self, spreads_field)]) + + def get_spreads_and_amounts_in_quote(self, trade_type: TradeType) -> Tuple[List[float], List[float]]: + buy_amounts_pct = getattr(self, 'buy_amounts_pct') + sell_amounts_pct = getattr(self, 'sell_amounts_pct') + + # Calculate total percentages across buys and sells + total_pct = sum(buy_amounts_pct) + sum(sell_amounts_pct) + + # Normalize amounts_pct based on total percentages + if trade_type == TradeType.BUY: + normalized_amounts_pct = [amt_pct / total_pct for amt_pct in buy_amounts_pct] + else: # TradeType.SELL + normalized_amounts_pct = [amt_pct / total_pct for amt_pct in sell_amounts_pct] + + spreads = getattr(self, f'{trade_type.name.lower()}_spreads') + return spreads, [amt_pct * self.total_amount_quote * self.portfolio_allocation for amt_pct in normalized_amounts_pct] + + def update_markets(self, markets: MarketDict) -> MarketDict: + return markets.add_or_update(self.connector_name, self.trading_pair) + + +class PMM(ControllerBase): + """ + This class represents the base class for a market making controller. + """ + + def __init__(self, config: PMMConfig, *args, **kwargs): + super().__init__(config, *args, **kwargs) + self.config = config + self.market_data_provider.initialize_rate_sources([ConnectorPair( + connector_name=config.connector_name, trading_pair=config.trading_pair)]) + + def determine_executor_actions(self) -> List[ExecutorAction]: + """ + Determine actions based on the provided executor handler report. + """ + actions = [] + actions.extend(self.create_actions_proposal()) + actions.extend(self.stop_actions_proposal()) + return actions + + def create_actions_proposal(self) -> List[ExecutorAction]: + """ + Create actions proposal based on the current state of the controller. + """ + create_actions = [] + + # Check if a position reduction executor for TP/SL is already sent + reduction_executor_exists = any( + executor.is_active and + executor.custom_info.get("level_id") == "global_tp_sl" + for executor in self.executors_info + ) + + if (not reduction_executor_exists and + self.processed_data["current_base_pct"] > self.config.target_base_pct and + (self.processed_data["unrealized_pnl_pct"] > self.config.global_take_profit or + self.processed_data["unrealized_pnl_pct"] < -self.config.global_stop_loss)): + + # Create a global take profit or stop loss executor + create_actions.append(CreateExecutorAction( + controller_id=self.config.id, + executor_config=OrderExecutorConfig( + timestamp=self.market_data_provider.time(), + connector_name=self.config.connector_name, + trading_pair=self.config.trading_pair, + side=TradeType.SELL, + amount=self.processed_data["position_amount"], + execution_strategy=ExecutionStrategy.MARKET, + price=self.processed_data["reference_price"], + level_id="global_tp_sl" # Use a specific level_id to identify this as a TP/SL executor + ) + )) + return create_actions + levels_to_execute = self.get_levels_to_execute() + # Pre-calculate all spreads and amounts for buy and sell sides + buy_spreads, buy_amounts_quote = self.config.get_spreads_and_amounts_in_quote(TradeType.BUY) + sell_spreads, sell_amounts_quote = self.config.get_spreads_and_amounts_in_quote(TradeType.SELL) + reference_price = Decimal(self.processed_data["reference_price"]) + # Get current position info for skew calculation + current_pct = self.processed_data["current_base_pct"] + min_pct = self.config.min_base_pct + max_pct = self.config.max_base_pct + # Calculate skew factors (0 to 1) - how much to scale orders + if max_pct > min_pct: # Prevent division by zero + # For buys: full size at min_pct, decreasing as we approach max_pct + buy_skew = (max_pct - current_pct) / (max_pct - min_pct) + # For sells: full size at max_pct, decreasing as we approach min_pct + sell_skew = (current_pct - min_pct) / (max_pct - min_pct) + # Ensure values stay between 0.2 and 1.0 (never go below 20% of original size) + buy_skew = max(min(buy_skew, Decimal("1.0")), self.config.max_skew) + sell_skew = max(min(sell_skew, Decimal("1.0")), self.config.max_skew) + else: + buy_skew = sell_skew = Decimal("1.0") + # Create executors for each level + for level_id in levels_to_execute: + trade_type = self.get_trade_type_from_level_id(level_id) + level = self.get_level_from_level_id(level_id) + if trade_type == TradeType.BUY: + spread_in_pct = Decimal(buy_spreads[level]) * Decimal(self.processed_data["spread_multiplier"]) + amount_quote = Decimal(buy_amounts_quote[level]) + skew = buy_skew + else: # TradeType.SELL + spread_in_pct = Decimal(sell_spreads[level]) * Decimal(self.processed_data["spread_multiplier"]) + amount_quote = Decimal(sell_amounts_quote[level]) + skew = sell_skew + # Calculate price + side_multiplier = Decimal("-1") if trade_type == TradeType.BUY else Decimal("1") + price = reference_price * (Decimal("1") + side_multiplier * spread_in_pct) + # Calculate amount with skew applied + amount = self.market_data_provider.quantize_order_amount(self.config.connector_name, + self.config.trading_pair, + (amount_quote / price) * skew) + if amount == Decimal("0"): + self.logger().warning(f"The amount of the level {level_id} is 0. Skipping.") + executor_config = self.get_executor_config(level_id, price, amount) + if executor_config is not None: + create_actions.append(CreateExecutorAction( + controller_id=self.config.id, + executor_config=executor_config + )) + return create_actions + + def get_levels_to_execute(self) -> List[str]: + working_levels = self.filter_executors( + executors=self.executors_info, + filter_func=lambda x: x.is_active or (x.close_type == CloseType.STOP_LOSS and self.market_data_provider.time() - x.close_timestamp < self.config.cooldown_time) + ) + working_levels_ids = [executor.custom_info["level_id"] for executor in working_levels] + return self.get_not_active_levels_ids(working_levels_ids) + + def stop_actions_proposal(self) -> List[ExecutorAction]: + """ + Create a list of actions to stop the executors based on order refresh and early stop conditions. + """ + stop_actions = [] + stop_actions.extend(self.executors_to_refresh()) + stop_actions.extend(self.executors_to_early_stop()) + return stop_actions + + def executors_to_refresh(self) -> List[ExecutorAction]: + executors_to_refresh = self.filter_executors( + executors=self.executors_info, + filter_func=lambda x: not x.is_trading and x.is_active and self.market_data_provider.time() - x.timestamp > self.config.executor_refresh_time) + return [StopExecutorAction( + controller_id=self.config.id, + keep_position=True, + executor_id=executor.id) for executor in executors_to_refresh] + + def executors_to_early_stop(self) -> List[ExecutorAction]: + """ + Get the executors to early stop based on the current state of market data. This method can be overridden to + implement custom behavior. + """ + executors_to_early_stop = self.filter_executors( + executors=self.executors_info, + filter_func=lambda x: x.is_active and x.is_trading and self.market_data_provider.time() - x.custom_info["open_order_last_update"] > self.config.cooldown_time) + return [StopExecutorAction( + controller_id=self.config.id, + keep_position=True, + executor_id=executor.id) for executor in executors_to_early_stop] + + async def update_processed_data(self): + """ + Update the processed data for the controller. This method should be reimplemented to modify the reference price + and spread multiplier based on the market data. By default, it will update the reference price as mid price and + the spread multiplier as 1. + """ + reference_price = self.market_data_provider.get_price_by_type(self.config.connector_name, + self.config.trading_pair, PriceType.MidPrice) + position_held = next((position for position in self.positions_held if + (position.trading_pair == self.config.trading_pair) & + (position.connector_name == self.config.connector_name)), None) + target_position = self.config.total_amount_quote * self.config.target_base_pct + if position_held is not None: + position_amount = position_held.amount + current_base_pct = position_held.amount_quote / self.config.total_amount_quote + deviation = (target_position - position_held.amount_quote) / target_position + unrealized_pnl_pct = position_held.unrealized_pnl_quote / position_held.amount_quote if position_held.amount_quote != 0 else Decimal("0") + else: + position_amount = 0 + current_base_pct = 0 + deviation = 1 + unrealized_pnl_pct = 0 + + self.processed_data = {"reference_price": Decimal(reference_price), "spread_multiplier": Decimal("1"), + "deviation": deviation, "current_base_pct": current_base_pct, + "unrealized_pnl_pct": unrealized_pnl_pct, "position_amount": position_amount} + + def get_executor_config(self, level_id: str, price: Decimal, amount: Decimal): + """ + Get the executor config for a given level id. + """ + trade_type = self.get_trade_type_from_level_id(level_id) + level_multiplier = self.get_level_from_level_id(level_id) + 1 + return PositionExecutorConfig( + timestamp=self.market_data_provider.time(), + level_id=level_id, + connector_name=self.config.connector_name, + trading_pair=self.config.trading_pair, + entry_price=price, + amount=amount, + triple_barrier_config=self.config.triple_barrier_config.new_instance_with_adjusted_volatility(level_multiplier), + leverage=self.config.leverage, + side=trade_type, + ) + + def get_level_id_from_side(self, trade_type: TradeType, level: int) -> str: + """ + Get the level id based on the trade type and the level. + """ + return f"{trade_type.name.lower()}_{level}" + + def get_trade_type_from_level_id(self, level_id: str) -> TradeType: + return TradeType.BUY if level_id.startswith("buy") else TradeType.SELL + + def get_level_from_level_id(self, level_id: str) -> int: + return int(level_id.split('_')[1]) + + def get_not_active_levels_ids(self, active_levels_ids: List[str]) -> List[str]: + """ + Get the levels to execute based on the current state of the controller. + """ + buy_ids_missing = [self.get_level_id_from_side(TradeType.BUY, level) for level in range(len(self.config.buy_spreads)) + if self.get_level_id_from_side(TradeType.BUY, level) not in active_levels_ids] + sell_ids_missing = [self.get_level_id_from_side(TradeType.SELL, level) for level in range(len(self.config.sell_spreads)) + if self.get_level_id_from_side(TradeType.SELL, level) not in active_levels_ids] + if self.processed_data["current_base_pct"] < self.config.min_base_pct: + return buy_ids_missing + elif self.processed_data["current_base_pct"] > self.config.max_base_pct: + return sell_ids_missing + return buy_ids_missing + sell_ids_missing + + def to_format_status(self) -> List[str]: + """ + Get the status of the controller in a formatted way with ASCII visualizations. + """ + from decimal import Decimal + from itertools import zip_longest + + status = [] + + # Get all required data + base_pct = self.processed_data['current_base_pct'] + min_pct = self.config.min_base_pct + max_pct = self.config.max_base_pct + target_pct = self.config.target_base_pct + skew = base_pct - target_pct + skew_pct = skew / target_pct if target_pct != 0 else Decimal('0') + max_skew = getattr(self.config, 'max_skew', Decimal('0.0')) + + # Fixed widths - adjusted based on screenshot analysis + outer_width = 92 # Total width including outer borders + inner_width = outer_width - 4 # Inner content width + half_width = (inner_width) // 2 - 1 # Width of each column in split sections + bar_width = inner_width - 15 # Width of visualization bars (accounting for label) + + # Header - omit ID since it's shown above in controller header + status.append("╒" + "═" * (inner_width) + "╕") + + header_line = ( + f"{self.config.connector_name}:{self.config.trading_pair} " + f"Price: {self.processed_data['reference_price']} " + f"Alloc: {self.config.portfolio_allocation:.1%} " + f"Spread Mult: {self.processed_data['spread_multiplier']} |" + ) + + status.append(f"│ {header_line:<{inner_width}} │") + + # Position and PnL sections with precise widths + status.append(f"├{'─' * half_width}┬{'─' * half_width}┤") + status.append(f"│ {'POSITION STATUS':<{half_width - 2}} │ {'PROFIT & LOSS':<{half_width - 2}} │") + status.append(f"├{'─' * half_width}┼{'─' * half_width}┤") + + # Position data for left column + position_info = [ + f"Current: {base_pct:.2%}", + f"Target: {target_pct:.2%}", + f"Min/Max: {min_pct:.2%}/{max_pct:.2%}", + f"Skew: {skew_pct:+.2%} (max {max_skew:.2%})" + ] + + # PnL data for right column + pnl_info = [] + if 'unrealized_pnl_pct' in self.processed_data: + pnl = self.processed_data['unrealized_pnl_pct'] + pnl_sign = "+" if pnl >= 0 else "" + pnl_info = [ + f"Unrealized: {pnl_sign}{pnl:.2%}", + f"Take Profit: {self.config.global_take_profit:.2%}", + f"Stop Loss: {-self.config.global_stop_loss:.2%}", + f"Leverage: {self.config.leverage}x" + ] + + # Display position and PnL info side by side with exact spacing + for pos_line, pnl_line in zip_longest(position_info, pnl_info, fillvalue=""): + status.append(f"│ {pos_line:<{half_width - 2}} │ {pnl_line:<{half_width - 2}} │") + + # Adjust visualization section - ensure consistent spacing + status.append(f"├{'─' * (inner_width)}┤") + status.append(f"│ {'VISUALIZATIONS':<{inner_width}} │") + status.append(f"├{'─' * (inner_width)}┤") + + # Position bar with exact spacing and characters + filled_width = int(base_pct * bar_width) + min_pos = int(min_pct * bar_width) + max_pos = int(max_pct * bar_width) + target_pos = int(target_pct * bar_width) + + # Build position bar character by character + position_bar = "" + for i in range(bar_width): + if i == filled_width: + position_bar += "◆" # Current position + elif i == min_pos: + position_bar += "┃" # Min threshold + elif i == max_pos: + position_bar += "┃" # Max threshold + elif i == target_pos: + position_bar += "┇" # Target threshold + elif i < filled_width: + position_bar += "█" # Filled area + else: + position_bar += "░" # Empty area + + # Ensure consistent label spacing as seen in screenshot + status.append(f"│ Position: [{position_bar}] │") + + # Skew visualization with exact spacing + skew_bar_width = bar_width + center = skew_bar_width // 2 + skew_pos = center + int(skew_pct * center * 2) + skew_pos = max(0, min(skew_bar_width - 1, skew_pos)) + + # Build skew bar character by character + skew_bar = "" + for i in range(skew_bar_width): + if i == center: + skew_bar += "┃" # Center line + elif i == skew_pos: + skew_bar += "⬤" # Current skew + else: + skew_bar += "─" # Empty line + + # Match spacing from screenshot with exact character counts + status.append(f"│ Skew: [{skew_bar}] │") + + # PnL visualization if available + if 'unrealized_pnl_pct' in self.processed_data: + pnl = self.processed_data['unrealized_pnl_pct'] + take_profit = self.config.global_take_profit + stop_loss = -self.config.global_stop_loss + + pnl_bar_width = bar_width + center = pnl_bar_width // 2 + + # Calculate positions with exact scaling + max_range = max(abs(take_profit), abs(stop_loss), abs(pnl)) * Decimal("1.2") + scale = (pnl_bar_width // 2) / max_range + + pnl_pos = center + int(pnl * scale) + take_profit_pos = center + int(take_profit * scale) + stop_loss_pos = center + int(stop_loss * scale) + + # Ensure positions are within bounds + pnl_pos = max(0, min(pnl_bar_width - 1, pnl_pos)) + take_profit_pos = max(0, min(pnl_bar_width - 1, take_profit_pos)) + stop_loss_pos = max(0, min(pnl_bar_width - 1, stop_loss_pos)) + + # Build PnL bar character by character + pnl_bar = "" + for i in range(pnl_bar_width): + if i == center: + pnl_bar += "│" # Center line + elif i == pnl_pos: + pnl_bar += "⬤" # Current PnL + elif i == take_profit_pos: + pnl_bar += "T" # Take profit line + elif i == stop_loss_pos: + pnl_bar += "S" # Stop loss line + elif (pnl >= 0 and center <= i < pnl_pos) or (pnl < 0 and pnl_pos < i <= center): + pnl_bar += "█" if pnl >= 0 else "▓" + else: + pnl_bar += "─" + + # Match spacing from screenshot + status.append(f"│ PnL: [{pnl_bar}] │") + + # Executors section with precise column widths + status.append(f"├{'─' * half_width}┬{'─' * half_width}┤") + status.append(f"│ {'EXECUTORS STATUS':<{half_width - 2}} │ {'EXECUTOR VISUALIZATION':<{half_width - 2}} │") + status.append(f"├{'─' * half_width}┼{'─' * half_width}┤") + + # Count active executors by type + active_buy = sum(1 for info in self.executors_info + if info.is_active and self.get_trade_type_from_level_id(info.custom_info["level_id"]) == TradeType.BUY) + active_sell = sum(1 for info in self.executors_info + if info.is_active and self.get_trade_type_from_level_id(info.custom_info["level_id"]) == TradeType.SELL) + total_active = sum(1 for info in self.executors_info if info.is_active) + + # Executor information with fixed formatting + executor_info = [ + f"Total Active: {total_active}", + f"Total Created: {len(self.executors_info)}", + f"Buy Executors: {active_buy}", + f"Sell Executors: {active_sell}" + ] + + if 'deviation' in self.processed_data: + executor_info.append(f"Target Deviation: {self.processed_data['deviation']:.4f}") + + # Visualization with consistent block characters for buy/sell representation + buy_bars = "▮" * active_buy if active_buy > 0 else "─" + sell_bars = "▮" * active_sell if active_sell > 0 else "─" + + executor_viz = [ + f"Buy: {buy_bars}", + f"Sell: {sell_bars}" + ] + + # Display with fixed width columns + for exec_line, viz_line in zip_longest(executor_info, executor_viz, fillvalue=""): + status.append(f"│ {exec_line:<{half_width - 2}} │ {viz_line:<{half_width - 2}} │") + + # Bottom border with exact width + status.append(f"╘{'═' * (inner_width)}╛") + + return status diff --git a/bots/controllers/generic/pmm_adjusted.py b/bots/controllers/generic/pmm_adjusted.py new file mode 100644 index 0000000..e9bc266 --- /dev/null +++ b/bots/controllers/generic/pmm_adjusted.py @@ -0,0 +1,669 @@ +from decimal import Decimal +from typing import List, Optional, Tuple, Union + +from pydantic import Field, field_validator +from pydantic_core.core_schema import ValidationInfo + +from hummingbot.core.data_type.common import MarketDict, OrderType, PositionMode, PriceType, TradeType +from hummingbot.data_feed.candles_feed.data_types import CandlesConfig +from hummingbot.strategy_v2.controllers.controller_base import ControllerBase, ControllerConfigBase +from hummingbot.strategy_v2.executors.data_types import ConnectorPair +from hummingbot.strategy_v2.executors.order_executor.data_types import ExecutionStrategy, OrderExecutorConfig +from hummingbot.strategy_v2.executors.position_executor.data_types import PositionExecutorConfig, TripleBarrierConfig +from hummingbot.strategy_v2.models.executor_actions import CreateExecutorAction, ExecutorAction, StopExecutorAction +from hummingbot.strategy_v2.models.executors import CloseType + + +class PMMAdjustedConfig(ControllerConfigBase): + """ + This class represents the base configuration for a market making controller. + """ + controller_type: str = "generic" + controller_name: str = "pmm_adjusted" + candles_config: List[CandlesConfig] = [] + connector_name: str = Field( + default="binance", + json_schema_extra={ + "prompt_on_new": True, + "prompt": "Enter the name of the connector to use (e.g., binance):", + } + ) + trading_pair: str = Field( + default="BTC-FDUSD", + json_schema_extra={ + "prompt_on_new": True, + "prompt": "Enter the trading pair to trade on (e.g., BTC-FDUSD):", + } + ) + candles_connector_name: str = Field(default="binance") + candles_trading_pair: str = Field(default="BTC-USDT") + candles_interval: str = Field(default="1s") + + portfolio_allocation: Decimal = Field( + default=Decimal("0.05"), + json_schema_extra={ + "prompt_on_new": True, + "prompt": "Enter the maximum quote exposure percentage around mid price (e.g., 0.05 for 5% of total quote allocation):", + } + ) + target_base_pct: Decimal = Field( + default=Decimal("0.2"), + json_schema_extra={ + "prompt_on_new": True, + "prompt": "Enter the target base percentage (e.g., 0.2 for 20%):", + } + ) + min_base_pct: Decimal = Field( + default=Decimal("0.1"), + json_schema_extra={ + "prompt_on_new": True, + "prompt": "Enter the minimum base percentage (e.g., 0.1 for 10%):", + } + ) + max_base_pct: Decimal = Field( + default=Decimal("0.4"), + json_schema_extra={ + "prompt_on_new": True, + "prompt": "Enter the maximum base percentage (e.g., 0.4 for 40%):", + } + ) + buy_spreads: List[float] = Field( + default="0.01,0.02", + json_schema_extra={ + "prompt_on_new": True, "is_updatable": True, + "prompt": "Enter a comma-separated list of buy spreads (e.g., '0.01, 0.02'):", + } + ) + sell_spreads: List[float] = Field( + default="0.01,0.02", + json_schema_extra={ + "prompt_on_new": True, "is_updatable": True, + "prompt": "Enter a comma-separated list of sell spreads (e.g., '0.01, 0.02'):", + } + ) + buy_amounts_pct: Union[List[Decimal], None] = Field( + default=None, + json_schema_extra={ + "prompt_on_new": True, "is_updatable": True, + "prompt": "Enter a comma-separated list of buy amounts as percentages (e.g., '50, 50'), or leave blank to distribute equally:", + } + ) + sell_amounts_pct: Union[List[Decimal], None] = Field( + default=None, + json_schema_extra={ + "prompt_on_new": True, "is_updatable": True, + "prompt": "Enter a comma-separated list of sell amounts as percentages (e.g., '50, 50'), or leave blank to distribute equally:", + } + ) + executor_refresh_time: int = Field( + default=60 * 5, + json_schema_extra={ + "prompt_on_new": True, "is_updatable": True, + "prompt": "Enter the refresh time in seconds for executors (e.g., 300 for 5 minutes):", + } + ) + cooldown_time: int = Field( + default=15, + json_schema_extra={ + "prompt_on_new": True, "is_updatable": True, + "prompt": "Enter the cooldown time in seconds between after replacing an executor that traded (e.g., 15):", + } + ) + leverage: int = Field( + default=20, + json_schema_extra={ + "prompt_on_new": True, "is_updatable": True, + "prompt": "Enter the leverage to use for trading (e.g., 20 for 20x leverage). Set it to 1 for spot trading:", + } + ) + position_mode: PositionMode = Field(default="HEDGE") + take_profit: Optional[Decimal] = Field( + default=Decimal("0.02"), gt=0, + json_schema_extra={ + "prompt_on_new": True, "is_updatable": True, + "prompt": "Enter the take profit as a decimal (e.g., 0.02 for 2%):", + } + ) + take_profit_order_type: Optional[OrderType] = Field( + default="LIMIT_MAKER", + json_schema_extra={ + "prompt_on_new": True, "is_updatable": True, + "prompt": "Enter the order type for take profit (e.g., LIMIT_MAKER):", + } + ) + max_skew: Decimal = Field( + default=Decimal("1.0"), + json_schema_extra={ + "prompt_on_new": True, "is_updatable": True, + "prompt": "Enter the maximum skew factor (e.g., 1.0):", + } + ) + global_take_profit: Decimal = Decimal("0.02") + global_stop_loss: Decimal = Decimal("0.05") + + @field_validator("take_profit", mode="before") + @classmethod + def validate_target(cls, v): + if isinstance(v, str): + if v == "": + return None + return Decimal(v) + return v + + @field_validator('take_profit_order_type', mode="before") + @classmethod + def validate_order_type(cls, v) -> OrderType: + if isinstance(v, OrderType): + return v + elif v is None: + return OrderType.MARKET + elif isinstance(v, str): + if v.upper() in OrderType.__members__: + return OrderType[v.upper()] + elif isinstance(v, int): + try: + return OrderType(v) + except ValueError: + pass + raise ValueError(f"Invalid order type: {v}. Valid options are: {', '.join(OrderType.__members__)}") + + @field_validator('buy_spreads', 'sell_spreads', mode="before") + @classmethod + def parse_spreads(cls, v): + if v is None: + return [] + if isinstance(v, str): + if v == "": + return [] + return [float(x.strip()) for x in v.split(',')] + return v + + @field_validator('buy_amounts_pct', 'sell_amounts_pct', mode="before") + @classmethod + def parse_and_validate_amounts(cls, v, validation_info: ValidationInfo): + field_name = validation_info.field_name + if v is None or v == "": + spread_field = field_name.replace('amounts_pct', 'spreads') + return [1 for _ in validation_info.data[spread_field]] + if isinstance(v, str): + return [float(x.strip()) for x in v.split(',')] + elif isinstance(v, list) and len(v) != len(validation_info.data[field_name.replace('amounts_pct', 'spreads')]): + raise ValueError( + f"The number of {field_name} must match the number of {field_name.replace('amounts_pct', 'spreads')}.") + return v + + @field_validator('position_mode', mode="before") + @classmethod + def validate_position_mode(cls, v) -> PositionMode: + if isinstance(v, str): + if v.upper() in PositionMode.__members__: + return PositionMode[v.upper()] + raise ValueError(f"Invalid position mode: {v}. Valid options are: {', '.join(PositionMode.__members__)}") + return v + + @property + def triple_barrier_config(self) -> TripleBarrierConfig: + return TripleBarrierConfig( + take_profit=self.take_profit, + trailing_stop=None, + open_order_type=OrderType.LIMIT_MAKER, # Defaulting to LIMIT as is a Maker Controller + take_profit_order_type=self.take_profit_order_type, + stop_loss_order_type=OrderType.MARKET, # Defaulting to MARKET as per requirement + time_limit_order_type=OrderType.MARKET # Defaulting to MARKET as per requirement + ) + + def update_parameters(self, trade_type: TradeType, new_spreads: Union[List[float], str], new_amounts_pct: Optional[Union[List[int], str]] = None): + spreads_field = 'buy_spreads' if trade_type == TradeType.BUY else 'sell_spreads' + amounts_pct_field = 'buy_amounts_pct' if trade_type == TradeType.BUY else 'sell_amounts_pct' + + setattr(self, spreads_field, self.parse_spreads(new_spreads)) + if new_amounts_pct is not None: + setattr(self, amounts_pct_field, self.parse_and_validate_amounts(new_amounts_pct, self.__dict__, self.__fields__[amounts_pct_field])) + else: + setattr(self, amounts_pct_field, [1 for _ in getattr(self, spreads_field)]) + + def get_spreads_and_amounts_in_quote(self, trade_type: TradeType) -> Tuple[List[float], List[float]]: + buy_amounts_pct = getattr(self, 'buy_amounts_pct') + sell_amounts_pct = getattr(self, 'sell_amounts_pct') + + # Calculate total percentages across buys and sells + total_pct = sum(buy_amounts_pct) + sum(sell_amounts_pct) + + # Normalize amounts_pct based on total percentages + if trade_type == TradeType.BUY: + normalized_amounts_pct = [amt_pct / total_pct for amt_pct in buy_amounts_pct] + else: # TradeType.SELL + normalized_amounts_pct = [amt_pct / total_pct for amt_pct in sell_amounts_pct] + + spreads = getattr(self, f'{trade_type.name.lower()}_spreads') + return spreads, [amt_pct * self.total_amount_quote * self.portfolio_allocation for amt_pct in normalized_amounts_pct] + + def update_markets(self, markets: MarketDict) -> MarketDict: + return markets.add_or_update(self.connector_name, self.trading_pair) + + +class PMMAdjusted(ControllerBase): + """ + This class represents the base class for a market making controller. + """ + + def __init__(self, config: PMMAdjustedConfig, *args, **kwargs): + super().__init__(config, *args, **kwargs) + self.config = config + self.market_data_provider.initialize_rate_sources([ConnectorPair( + connector_name=config.connector_name, trading_pair=config.trading_pair)]) + self.config.candles_config = [ + CandlesConfig(connector=self.config.candles_connector_name, + trading_pair=self.config.candles_trading_pair, + interval=self.config.candles_interval) + ] + + def determine_executor_actions(self) -> List[ExecutorAction]: + """ + Determine actions based on the provided executor handler report. + """ + actions = [] + actions.extend(self.create_actions_proposal()) + actions.extend(self.stop_actions_proposal()) + return actions + + def create_actions_proposal(self) -> List[ExecutorAction]: + """ + Create actions proposal based on the current state of the controller. + """ + create_actions = [] + + # Check if a position reduction executor for TP/SL is already sent + reduction_executor_exists = any( + executor.is_active and + executor.custom_info.get("level_id") == "global_tp_sl" + for executor in self.executors_info + ) + + if (not reduction_executor_exists and + self.processed_data["current_base_pct"] > self.config.target_base_pct and + (self.processed_data["unrealized_pnl_pct"] > self.config.global_take_profit or + self.processed_data["unrealized_pnl_pct"] < -self.config.global_stop_loss)): + + # Create a global take profit or stop loss executor + create_actions.append(CreateExecutorAction( + controller_id=self.config.id, + executor_config=OrderExecutorConfig( + timestamp=self.market_data_provider.time(), + connector_name=self.config.connector_name, + trading_pair=self.config.trading_pair, + side=TradeType.SELL, + amount=self.processed_data["position_amount"], + execution_strategy=ExecutionStrategy.MARKET, + price=self.processed_data["reference_price"], + level_id="global_tp_sl" # Use a specific level_id to identify this as a TP/SL executor + ) + )) + return create_actions + levels_to_execute = self.get_levels_to_execute() + # Pre-calculate all spreads and amounts for buy and sell sides + buy_spreads, buy_amounts_quote = self.config.get_spreads_and_amounts_in_quote(TradeType.BUY) + sell_spreads, sell_amounts_quote = self.config.get_spreads_and_amounts_in_quote(TradeType.SELL) + reference_price = Decimal(self.processed_data["reference_price"]) + # Get current position info for skew calculation + current_pct = self.processed_data["current_base_pct"] + min_pct = self.config.min_base_pct + max_pct = self.config.max_base_pct + # Calculate skew factors (0 to 1) - how much to scale orders + if max_pct > min_pct: # Prevent division by zero + # For buys: full size at min_pct, decreasing as we approach max_pct + buy_skew = (max_pct - current_pct) / (max_pct - min_pct) + # For sells: full size at max_pct, decreasing as we approach min_pct + sell_skew = (current_pct - min_pct) / (max_pct - min_pct) + # Ensure values stay between 0.2 and 1.0 (never go below 20% of original size) + buy_skew = max(min(buy_skew, Decimal("1.0")), self.config.max_skew) + sell_skew = max(min(sell_skew, Decimal("1.0")), self.config.max_skew) + else: + buy_skew = sell_skew = Decimal("1.0") + # Create executors for each level + for level_id in levels_to_execute: + trade_type = self.get_trade_type_from_level_id(level_id) + level = self.get_level_from_level_id(level_id) + if trade_type == TradeType.BUY: + spread_in_pct = Decimal(buy_spreads[level]) * Decimal(self.processed_data["spread_multiplier"]) + amount_quote = Decimal(buy_amounts_quote[level]) + skew = buy_skew + else: # TradeType.SELL + spread_in_pct = Decimal(sell_spreads[level]) * Decimal(self.processed_data["spread_multiplier"]) + amount_quote = Decimal(sell_amounts_quote[level]) + skew = sell_skew + # Calculate price + side_multiplier = Decimal("-1") if trade_type == TradeType.BUY else Decimal("1") + price = reference_price * (Decimal("1") + side_multiplier * spread_in_pct) + # Calculate amount with skew applied + amount = self.market_data_provider.quantize_order_amount(self.config.connector_name, + self.config.trading_pair, + (amount_quote / price) * skew) + if amount == Decimal("0"): + self.logger().warning(f"The amount of the level {level_id} is 0. Skipping.") + executor_config = self.get_executor_config(level_id, price, amount) + if executor_config is not None: + create_actions.append(CreateExecutorAction( + controller_id=self.config.id, + executor_config=executor_config + )) + return create_actions + + def get_levels_to_execute(self) -> List[str]: + working_levels = self.filter_executors( + executors=self.executors_info, + filter_func=lambda x: x.is_active or (x.close_type == CloseType.STOP_LOSS and self.market_data_provider.time() - x.close_timestamp < self.config.cooldown_time) + ) + working_levels_ids = [executor.custom_info["level_id"] for executor in working_levels] + return self.get_not_active_levels_ids(working_levels_ids) + + def stop_actions_proposal(self) -> List[ExecutorAction]: + """ + Create a list of actions to stop the executors based on order refresh and early stop conditions. + """ + stop_actions = [] + stop_actions.extend(self.executors_to_refresh()) + stop_actions.extend(self.executors_to_early_stop()) + return stop_actions + + def executors_to_refresh(self) -> List[ExecutorAction]: + executors_to_refresh = self.filter_executors( + executors=self.executors_info, + filter_func=lambda x: not x.is_trading and x.is_active and self.market_data_provider.time() - x.timestamp > self.config.executor_refresh_time) + return [StopExecutorAction( + controller_id=self.config.id, + keep_position=True, + executor_id=executor.id) for executor in executors_to_refresh] + + def executors_to_early_stop(self) -> List[ExecutorAction]: + """ + Get the executors to early stop based on the current state of market data. This method can be overridden to + implement custom behavior. + """ + executors_to_early_stop = self.filter_executors( + executors=self.executors_info, + filter_func=lambda x: x.is_active and x.is_trading and self.market_data_provider.time() - x.custom_info["open_order_last_update"] > self.config.cooldown_time) + return [StopExecutorAction( + controller_id=self.config.id, + keep_position=True, + executor_id=executor.id) for executor in executors_to_early_stop] + + async def update_processed_data(self): + """ + Update the processed data for the controller. This method should be reimplemented to modify the reference price + and spread multiplier based on the market data. By default, it will update the reference price as mid price and + the spread multiplier as 1. + """ + reference_price = self.get_current_candles_price() + position_held = next((position for position in self.positions_held if + (position.trading_pair == self.config.trading_pair) & + (position.connector_name == self.config.connector_name)), None) + target_position = self.config.total_amount_quote * self.config.target_base_pct + if position_held is not None: + position_amount = position_held.amount + current_base_pct = position_held.amount_quote / self.config.total_amount_quote + deviation = (target_position - position_held.amount_quote) / target_position + unrealized_pnl_pct = position_held.unrealized_pnl_quote / position_held.amount_quote if position_held.amount_quote != 0 else Decimal("0") + else: + position_amount = 0 + current_base_pct = 0 + deviation = 1 + unrealized_pnl_pct = 0 + + self.processed_data = {"reference_price": Decimal(reference_price), "spread_multiplier": Decimal("1"), + "deviation": deviation, "current_base_pct": current_base_pct, + "unrealized_pnl_pct": unrealized_pnl_pct, "position_amount": position_amount} + + def get_current_candles_price(self) -> Decimal: + """ + Get the current price from the candles data provider. + """ + candles = self.market_data_provider.get_candles_df(self.config.candles_connector_name, + self.config.candles_trading_pair, + self.config.candles_interval) + if candles is not None and not candles.empty: + last_candle = candles.iloc[-1] + return Decimal(last_candle['close']) + else: + self.logger().warning(f"No candles data available for {self.config.candles_connector_name} - {self.config.candles_trading_pair} at {self.config.candles_interval}. Using last known price.") + return Decimal(self.market_data_provider.get_price_by_type(self.config.connector_name, self.config.trading_pair, PriceType.MidPrice)) + + def get_executor_config(self, level_id: str, price: Decimal, amount: Decimal): + """ + Get the executor config for a given level id. + """ + trade_type = self.get_trade_type_from_level_id(level_id) + level_multiplier = self.get_level_from_level_id(level_id) + 1 + return PositionExecutorConfig( + timestamp=self.market_data_provider.time(), + level_id=level_id, + connector_name=self.config.connector_name, + trading_pair=self.config.trading_pair, + entry_price=price, + amount=amount, + triple_barrier_config=self.config.triple_barrier_config.new_instance_with_adjusted_volatility(level_multiplier), + leverage=self.config.leverage, + side=trade_type, + ) + + def get_level_id_from_side(self, trade_type: TradeType, level: int) -> str: + """ + Get the level id based on the trade type and the level. + """ + return f"{trade_type.name.lower()}_{level}" + + def get_trade_type_from_level_id(self, level_id: str) -> TradeType: + return TradeType.BUY if level_id.startswith("buy") else TradeType.SELL + + def get_level_from_level_id(self, level_id: str) -> int: + return int(level_id.split('_')[1]) + + def get_not_active_levels_ids(self, active_levels_ids: List[str]) -> List[str]: + """ + Get the levels to execute based on the current state of the controller. + """ + buy_ids_missing = [self.get_level_id_from_side(TradeType.BUY, level) for level in range(len(self.config.buy_spreads)) + if self.get_level_id_from_side(TradeType.BUY, level) not in active_levels_ids] + sell_ids_missing = [self.get_level_id_from_side(TradeType.SELL, level) for level in range(len(self.config.sell_spreads)) + if self.get_level_id_from_side(TradeType.SELL, level) not in active_levels_ids] + if self.processed_data["current_base_pct"] < self.config.min_base_pct: + return buy_ids_missing + elif self.processed_data["current_base_pct"] > self.config.max_base_pct: + return sell_ids_missing + return buy_ids_missing + sell_ids_missing + + def to_format_status(self) -> List[str]: + """ + Get the status of the controller in a formatted way with ASCII visualizations. + """ + from decimal import Decimal + from itertools import zip_longest + + status = [] + + # Get all required data + base_pct = self.processed_data['current_base_pct'] + min_pct = self.config.min_base_pct + max_pct = self.config.max_base_pct + target_pct = self.config.target_base_pct + skew = base_pct - target_pct + skew_pct = skew / target_pct if target_pct != 0 else Decimal('0') + max_skew = getattr(self.config, 'max_skew', Decimal('0.0')) + + # Fixed widths - adjusted based on screenshot analysis + outer_width = 92 # Total width including outer borders + inner_width = outer_width - 4 # Inner content width + half_width = (inner_width) // 2 - 1 # Width of each column in split sections + bar_width = inner_width - 15 # Width of visualization bars (accounting for label) + + # Header - omit ID since it's shown above in controller header + status.append("╒" + "═" * (inner_width) + "╕") + + header_line = ( + f"{self.config.connector_name}:{self.config.trading_pair} " + f"Price: {self.processed_data['reference_price']} " + f"Alloc: {self.config.portfolio_allocation:.1%} " + f"Spread Mult: {self.processed_data['spread_multiplier']} |" + ) + + status.append(f"│ {header_line:<{inner_width}} │") + + # Position and PnL sections with precise widths + status.append(f"├{'─' * half_width}┬{'─' * half_width}┤") + status.append(f"│ {'POSITION STATUS':<{half_width - 2}} │ {'PROFIT & LOSS':<{half_width - 2}} │") + status.append(f"├{'─' * half_width}┼{'─' * half_width}┤") + + # Position data for left column + position_info = [ + f"Current: {base_pct:.2%}", + f"Target: {target_pct:.2%}", + f"Min/Max: {min_pct:.2%}/{max_pct:.2%}", + f"Skew: {skew_pct:+.2%} (max {max_skew:.2%})" + ] + + # PnL data for right column + pnl_info = [] + if 'unrealized_pnl_pct' in self.processed_data: + pnl = self.processed_data['unrealized_pnl_pct'] + pnl_sign = "+" if pnl >= 0 else "" + pnl_info = [ + f"Unrealized: {pnl_sign}{pnl:.2%}", + f"Take Profit: {self.config.global_take_profit:.2%}", + f"Stop Loss: {-self.config.global_stop_loss:.2%}", + f"Leverage: {self.config.leverage}x" + ] + + # Display position and PnL info side by side with exact spacing + for pos_line, pnl_line in zip_longest(position_info, pnl_info, fillvalue=""): + status.append(f"│ {pos_line:<{half_width - 2}} │ {pnl_line:<{half_width - 2}} │") + + # Adjust visualization section - ensure consistent spacing + status.append(f"├{'─' * (inner_width)}┤") + status.append(f"│ {'VISUALIZATIONS':<{inner_width}} │") + status.append(f"├{'─' * (inner_width)}┤") + + # Position bar with exact spacing and characters + filled_width = int(base_pct * bar_width) + min_pos = int(min_pct * bar_width) + max_pos = int(max_pct * bar_width) + target_pos = int(target_pct * bar_width) + + # Build position bar character by character + position_bar = "" + for i in range(bar_width): + if i == filled_width: + position_bar += "◆" # Current position + elif i == min_pos: + position_bar += "┃" # Min threshold + elif i == max_pos: + position_bar += "┃" # Max threshold + elif i == target_pos: + position_bar += "┇" # Target threshold + elif i < filled_width: + position_bar += "█" # Filled area + else: + position_bar += "░" # Empty area + + # Ensure consistent label spacing as seen in screenshot + status.append(f"│ Position: [{position_bar}] │") + + # Skew visualization with exact spacing + skew_bar_width = bar_width + center = skew_bar_width // 2 + skew_pos = center + int(skew_pct * center * 2) + skew_pos = max(0, min(skew_bar_width - 1, skew_pos)) + + # Build skew bar character by character + skew_bar = "" + for i in range(skew_bar_width): + if i == center: + skew_bar += "┃" # Center line + elif i == skew_pos: + skew_bar += "⬤" # Current skew + else: + skew_bar += "─" # Empty line + + # Match spacing from screenshot with exact character counts + status.append(f"│ Skew: [{skew_bar}] │") + + # PnL visualization if available + if 'unrealized_pnl_pct' in self.processed_data: + pnl = self.processed_data['unrealized_pnl_pct'] + take_profit = self.config.global_take_profit + stop_loss = -self.config.global_stop_loss + + pnl_bar_width = bar_width + center = pnl_bar_width // 2 + + # Calculate positions with exact scaling + max_range = max(abs(take_profit), abs(stop_loss), abs(pnl)) * Decimal("1.2") + scale = (pnl_bar_width // 2) / max_range + + pnl_pos = center + int(pnl * scale) + take_profit_pos = center + int(take_profit * scale) + stop_loss_pos = center + int(stop_loss * scale) + + # Ensure positions are within bounds + pnl_pos = max(0, min(pnl_bar_width - 1, pnl_pos)) + take_profit_pos = max(0, min(pnl_bar_width - 1, take_profit_pos)) + stop_loss_pos = max(0, min(pnl_bar_width - 1, stop_loss_pos)) + + # Build PnL bar character by character + pnl_bar = "" + for i in range(pnl_bar_width): + if i == center: + pnl_bar += "│" # Center line + elif i == pnl_pos: + pnl_bar += "⬤" # Current PnL + elif i == take_profit_pos: + pnl_bar += "T" # Take profit line + elif i == stop_loss_pos: + pnl_bar += "S" # Stop loss line + elif (pnl >= 0 and center <= i < pnl_pos) or (pnl < 0 and pnl_pos < i <= center): + pnl_bar += "█" if pnl >= 0 else "▓" + else: + pnl_bar += "─" + + # Match spacing from screenshot + status.append(f"│ PnL: [{pnl_bar}] │") + + # Executors section with precise column widths + status.append(f"├{'─' * half_width}┬{'─' * half_width}┤") + status.append(f"│ {'EXECUTORS STATUS':<{half_width - 2}} │ {'EXECUTOR VISUALIZATION':<{half_width - 2}} │") + status.append(f"├{'─' * half_width}┼{'─' * half_width}┤") + + # Count active executors by type + active_buy = sum(1 for info in self.executors_info + if info.is_active and self.get_trade_type_from_level_id(info.custom_info["level_id"]) == TradeType.BUY) + active_sell = sum(1 for info in self.executors_info + if info.is_active and self.get_trade_type_from_level_id(info.custom_info["level_id"]) == TradeType.SELL) + total_active = sum(1 for info in self.executors_info if info.is_active) + + # Executor information with fixed formatting + executor_info = [ + f"Total Active: {total_active}", + f"Total Created: {len(self.executors_info)}", + f"Buy Executors: {active_buy}", + f"Sell Executors: {active_sell}" + ] + + if 'deviation' in self.processed_data: + executor_info.append(f"Target Deviation: {self.processed_data['deviation']:.4f}") + + # Visualization with consistent block characters for buy/sell representation + buy_bars = "▮" * active_buy if active_buy > 0 else "─" + sell_bars = "▮" * active_sell if active_sell > 0 else "─" + + executor_viz = [ + f"Buy: {buy_bars}", + f"Sell: {sell_bars}" + ] + + # Display with fixed width columns + for exec_line, viz_line in zip_longest(executor_info, executor_viz, fillvalue=""): + status.append(f"│ {exec_line:<{half_width - 2}} │ {viz_line:<{half_width - 2}} │") + + # Bottom border with exact width + status.append(f"╘{'═' * (inner_width)}╛") + + return status diff --git a/bots/controllers/generic/quantum_grid_allocator.py b/bots/controllers/generic/quantum_grid_allocator.py new file mode 100644 index 0000000..19b7a47 --- /dev/null +++ b/bots/controllers/generic/quantum_grid_allocator.py @@ -0,0 +1,492 @@ +from decimal import Decimal +from typing import Dict, List, Set, Union + +import pandas_ta as ta # noqa: F401 +from pydantic import Field, field_validator + +from hummingbot.core.data_type.common import OrderType, PositionMode, PriceType, TradeType +from hummingbot.data_feed.candles_feed.data_types import CandlesConfig +from hummingbot.strategy_v2.controllers import ControllerBase, ControllerConfigBase +from hummingbot.strategy_v2.executors.data_types import ConnectorPair +from hummingbot.strategy_v2.executors.grid_executor.data_types import GridExecutorConfig +from hummingbot.strategy_v2.executors.position_executor.data_types import TripleBarrierConfig +from hummingbot.strategy_v2.models.executor_actions import CreateExecutorAction, StopExecutorAction +from hummingbot.strategy_v2.models.executors_info import ExecutorInfo + + +class QGAConfig(ControllerConfigBase): + controller_name: str = "quantum_grid_allocator" + candles_config: List[CandlesConfig] = [] + + # Portfolio allocation zones + long_only_threshold: Decimal = Field(default=Decimal("0.2"), json_schema_extra={"is_updatable": True}) + short_only_threshold: Decimal = Field(default=Decimal("0.2"), json_schema_extra={"is_updatable": True}) + hedge_ratio: Decimal = Field(default=Decimal("2"), json_schema_extra={"is_updatable": True}) + + # Grid allocation multipliers + base_grid_value_pct: Decimal = Field(default=Decimal("0.08"), json_schema_extra={"is_updatable": True}) + max_grid_value_pct: Decimal = Field(default=Decimal("0.15"), json_schema_extra={"is_updatable": True}) + + # Order frequency settings + safe_extra_spread: Decimal = Field(default=Decimal("0.0001"), json_schema_extra={"is_updatable": True}) + favorable_order_frequency: int = Field(default=2, json_schema_extra={"is_updatable": True}) + unfavorable_order_frequency: int = Field(default=5, json_schema_extra={"is_updatable": True}) + max_orders_per_batch: int = Field(default=1, json_schema_extra={"is_updatable": True}) + + # Portfolio allocation + portfolio_allocation: Dict[str, Decimal] = Field( + default={ + "SOL": Decimal("0.50"), # 50% + }, + json_schema_extra={"is_updatable": True}) + # Grid parameters + grid_range: Decimal = Field(default=Decimal("0.002"), json_schema_extra={"is_updatable": True}) + tp_sl_ratio: Decimal = Field(default=Decimal("0.8"), json_schema_extra={"is_updatable": True}) + min_order_amount: Decimal = Field(default=Decimal("5"), json_schema_extra={"is_updatable": True}) + # Risk parameters + max_deviation: Decimal = Field(default=Decimal("0.05"), json_schema_extra={"is_updatable": True}) + max_open_orders: int = Field(default=2, json_schema_extra={"is_updatable": True}) + # Exchange settings + connector_name: str = "binance" + leverage: int = 1 + position_mode: PositionMode = PositionMode.HEDGE + quote_asset: str = "FDUSD" + fee_asset: str = "BNB" + # Grid price multipliers + min_spread_between_orders: Decimal = Field( + default=Decimal("0.0001"), # 0.01% between orders + json_schema_extra={"is_updatable": True}) + grid_tp_multiplier: Decimal = Field( + default=Decimal("0.0001"), # 0.2% take profit + json_schema_extra={"is_updatable": True}) + # Grid safety parameters + limit_price_spread: Decimal = Field( + default=Decimal("0.001"), # 0.1% spread for limit price + json_schema_extra={"is_updatable": True}) + activation_bounds: Decimal = Field( + default=Decimal("0.0002"), # Activation bounds for orders + json_schema_extra={"is_updatable": True}) + bb_lenght: int = 100 + bb_std_dev: float = 2.0 + interval: str = "1s" + dynamic_grid_range: bool = Field(default=False, json_schema_extra={"is_updatable": True}) + show_terminated_details: bool = False + + @property + def quote_asset_allocation(self) -> Decimal: + """Calculate the implicit quote asset (FDUSD) allocation""" + return Decimal("1") - sum(self.portfolio_allocation.values()) + + @field_validator("portfolio_allocation") + @classmethod + def validate_allocation(cls, v): + total = sum(v.values()) + if total >= Decimal("1"): + raise ValueError(f"Total allocation {total} exceeds or equals 100%. Must leave room for FDUSD allocation.") + if "FDUSD" in v: + raise ValueError("FDUSD should not be explicitly allocated as it is the quote asset") + return v + + def update_markets(self, markets: Dict[str, Set[str]]) -> Dict[str, Set[str]]: + if self.connector_name not in markets: + markets[self.connector_name] = set() + for asset in self.portfolio_allocation: + markets[self.connector_name].add(f"{asset}-{self.quote_asset}") + return markets + + +class QuantumGridAllocator(ControllerBase): + def __init__(self, config: QGAConfig, *args, **kwargs): + self.config = config + self.metrics = {} + # Track unfavorable grid IDs + self.unfavorable_grid_ids = set() + # Track held positions from unfavorable grids + self.unfavorable_positions = { + f"{asset}-{config.quote_asset}": { + 'long': {'size': Decimal('0'), 'value': Decimal('0'), 'weighted_price': Decimal('0')}, + 'short': {'size': Decimal('0'), 'value': Decimal('0'), 'weighted_price': Decimal('0')} + } + for asset in config.portfolio_allocation + } + self.config.candles_config = [CandlesConfig( + connector=config.connector_name, + trading_pair=trading_pair + "-" + config.quote_asset, + interval=config.interval, + max_records=config.bb_lenght + 100 + ) for trading_pair in config.portfolio_allocation.keys()] + super().__init__(config, *args, **kwargs) + self.initialize_rate_sources() + + def initialize_rate_sources(self): + fee_pair = ConnectorPair(connector_name=self.config.connector_name, trading_pair=f"{self.config.fee_asset}-{self.config.quote_asset}") + self.market_data_provider.initialize_rate_sources([fee_pair]) + + async def update_processed_data(self): + # Get the bb width to use it as the range for the grid + for asset in self.config.portfolio_allocation: + trading_pair = f"{asset}-{self.config.quote_asset}" + candles = self.market_data_provider.get_candles_df( + connector_name=self.config.connector_name, + trading_pair=trading_pair, + interval=self.config.interval, + max_records=self.config.bb_lenght + 100 + ) + if len(candles) == 0: + bb_width = self.config.grid_range + else: + bb = ta.bbands(candles["close"], length=self.config.bb_lenght, std=self.config.bb_std_dev) + bb_width = bb[f"BBB_{self.config.bb_lenght}_{self.config.bb_std_dev}"].iloc[-1] / 100 + self.processed_data[trading_pair] = { + "bb_width": bb_width + } + + def update_portfolio_metrics(self): + """ + Calculate theoretical vs actual portfolio allocations + """ + metrics = { + "theoretical": {}, + "actual": {}, + "difference": {}, + } + + # Get real balances and calculate total portfolio value + quote_balance = self.market_data_provider.get_balance(self.config.connector_name, self.config.quote_asset) + total_value_quote = quote_balance + + # Calculate actual allocations including positions + for asset in self.config.portfolio_allocation: + trading_pair = f"{asset}-{self.config.quote_asset}" + price = self.get_mid_price(trading_pair) + # Get balance and add any position from active grid + balance = self.market_data_provider.get_balance(self.config.connector_name, asset) + value = balance * price + total_value_quote += value + metrics["actual"][asset] = value + # Calculate theoretical allocations and differences + for asset in self.config.portfolio_allocation: + theoretical_value = total_value_quote * self.config.portfolio_allocation[asset] + metrics["theoretical"][asset] = theoretical_value + metrics["difference"][asset] = metrics["actual"][asset] - theoretical_value + # Add quote asset metrics + metrics["actual"][self.config.quote_asset] = quote_balance + metrics["theoretical"][self.config.quote_asset] = total_value_quote * self.config.quote_asset_allocation + metrics["difference"][self.config.quote_asset] = quote_balance - metrics["theoretical"][self.config.quote_asset] + metrics["total_portfolio_value"] = total_value_quote + self.metrics = metrics + + def get_active_grids_by_asset(self) -> Dict[str, List[ExecutorInfo]]: + """Group active grids by asset using filter_executors""" + active_grids = {} + for asset in self.config.portfolio_allocation: + if asset == self.config.quote_asset: + continue + trading_pair = f"{asset}-{self.config.quote_asset}" + active_executors = self.filter_executors( + executors=self.executors_info, + filter_func=lambda e: ( + e.is_active and + e.config.trading_pair == trading_pair + ) + ) + if active_executors: + active_grids[asset] = active_executors + return active_grids + + def to_format_status(self) -> List[str]: + """Generate a detailed status report with portfolio, grid, and position information""" + status_lines = [] + total_value = self.metrics.get("total_portfolio_value", Decimal("0")) + # Portfolio Status + status_lines.append(f"Total Portfolio Value: ${total_value:,.2f}") + status_lines.append("") + status_lines.append("Portfolio Status:") + status_lines.append("-" * 80) + status_lines.append( + f"{'Asset':<8} | " + f"{'Actual':>10} | " + f"{'Target':>10} | " + f"{'Diff':>10} | " + f"{'Dev %':>8}" + ) + status_lines.append("-" * 80) + # Show metrics for each asset + for asset in self.config.portfolio_allocation: + actual = self.metrics["actual"].get(asset, Decimal("0")) + theoretical = self.metrics["theoretical"].get(asset, Decimal("0")) + difference = self.metrics["difference"].get(asset, Decimal("0")) + deviation_pct = (difference / theoretical * 100) if theoretical != Decimal("0") else Decimal("0") + status_lines.append( + f"{asset:<8} | " + f"${actual:>9.2f} | " + f"${theoretical:>9.2f} | " + f"${difference:>+9.2f} | " + f"{deviation_pct:>+7.1f}%" + ) + # Add quote asset metrics + quote_asset = self.config.quote_asset + actual = self.metrics["actual"].get(quote_asset, Decimal("0")) + theoretical = self.metrics["theoretical"].get(quote_asset, Decimal("0")) + difference = self.metrics["difference"].get(quote_asset, Decimal("0")) + deviation_pct = (difference / theoretical * 100) if theoretical != Decimal("0") else Decimal("0") + status_lines.append("-" * 80) + status_lines.append( + f"{quote_asset:<8} | " + f"${actual:>9.2f} | " + f"${theoretical:>9.2f} | " + f"${difference:>+9.2f} | " + f"{deviation_pct:>+7.1f}%" + ) + # Active Grids Summary + active_grids = self.get_active_grids_by_asset() + if active_grids: + status_lines.append("") + status_lines.append("Active Grids:") + status_lines.append("-" * 140) + status_lines.append( + f"{'Asset':<8} {'Side':<6} | " + f"{'Total ($)':<10} {'Position':<10} {'Volume':<10} | " + f"{'PnL':<10} {'RPnL':<10} {'Fees':<10} | " + f"{'Start':<10} {'Current':<10} {'End':<10} {'Limit':<10}" + ) + status_lines.append("-" * 140) + for asset, executors in active_grids.items(): + for executor in executors: + config = executor.config + custom_info = executor.custom_info + trading_pair = config.trading_pair + current_price = self.get_mid_price(trading_pair) + # Get grid metrics + total_amount = Decimal(str(config.total_amount_quote)) + position_size = Decimal(str(custom_info.get('position_size_quote', '0'))) + volume = executor.filled_amount_quote + pnl = executor.net_pnl_quote + realized_pnl_quote = custom_info.get('realized_pnl_quote', Decimal('0')) + fees = executor.cum_fees_quote + status_lines.append( + f"{asset:<8} {config.side.name:<6} | " + f"${total_amount:<9.2f} ${position_size:<9.2f} ${volume:<9.2f} | " + f"${pnl:>+9.2f} ${realized_pnl_quote:>+9.2f} ${fees:>9.2f} | " + f"{config.start_price:<10.4f} {current_price:<10.4f} {config.end_price:<10.4f} {config.limit_price:<10.4f}" + ) + + status_lines.append("-" * 100 + "\n") + return status_lines + + def tp_multiplier(self): + return self.config.tp_sl_ratio + + def sl_multiplier(self): + return 1 - self.config.tp_sl_ratio + + def determine_executor_actions(self) -> List[Union[CreateExecutorAction, StopExecutorAction]]: + actions = [] + self.update_portfolio_metrics() + active_grids_by_asset = self.get_active_grids_by_asset() + for asset in self.config.portfolio_allocation: + if asset == self.config.quote_asset: + continue + trading_pair = f"{asset}-{self.config.quote_asset}" + # Check if there are any active grids for this asset + if asset in active_grids_by_asset: + self.logger().debug(f"Skipping {trading_pair} - Active grid exists") + continue + theoretical = self.metrics["theoretical"][asset] + difference = self.metrics["difference"][asset] + deviation = difference / theoretical if theoretical != Decimal("0") else Decimal("0") + mid_price = self.get_mid_price(trading_pair) + + # Calculate dynamic grid value percentage based on deviation + abs_deviation = abs(deviation) + grid_value_pct = self.config.max_grid_value_pct if abs_deviation > self.config.max_deviation else self.config.base_grid_value_pct + + self.logger().info( + f"{trading_pair} Grid Sizing - " + f"Deviation: {deviation:+.1%}, " + f"Grid Value %: {grid_value_pct:.1%}" + ) + if self.config.dynamic_grid_range: + grid_range = Decimal(self.processed_data[trading_pair]["bb_width"]) + else: + grid_range = self.config.grid_range + + # Determine which zone we're in by normalizing the deviation over the theoretical allocation + if deviation < -self.config.long_only_threshold: + # Long-only zone - only create buy grids + if difference < Decimal("0"): # Only if we need to buy + grid_value = min(abs(difference), theoretical * grid_value_pct) + start_price = mid_price * (1 - grid_range * self.sl_multiplier()) + end_price = mid_price * (1 + grid_range * self.tp_multiplier()) + grid_action = self.create_grid_executor( + trading_pair=trading_pair, + side=TradeType.BUY, + start_price=start_price, + end_price=end_price, + grid_value=grid_value, + is_unfavorable=False + ) + if grid_action is not None: + actions.append(grid_action) + elif deviation > self.config.short_only_threshold: + # Short-only zone - only create sell grids + if difference > Decimal("0"): # Only if we need to sell + grid_value = min(abs(difference), theoretical * grid_value_pct) + start_price = mid_price * (1 - grid_range * self.tp_multiplier()) + end_price = mid_price * (1 + grid_range * self.sl_multiplier()) + grid_action = self.create_grid_executor( + trading_pair=trading_pair, + side=TradeType.SELL, + start_price=start_price, + end_price=end_price, + grid_value=grid_value, + is_unfavorable=False + ) + if grid_action is not None: + actions.append(grid_action) + else: + # we create a buy and a sell grid with higher range pct and the base grid value pct + # to hedge the position + grid_value = theoretical * grid_value_pct + if difference < Decimal("0"): # create a bigger buy grid and sell grid + # Create buy grid + start_price = mid_price * (1 - 2 * grid_range * self.sl_multiplier()) + end_price = mid_price * (1 + grid_range * self.tp_multiplier()) + buy_grid_action = self.create_grid_executor( + trading_pair=trading_pair, + side=TradeType.BUY, + start_price=start_price, + end_price=end_price, + grid_value=grid_value, + is_unfavorable=False + ) + if buy_grid_action is not None: + actions.append(buy_grid_action) + # Create sell grid + start_price = mid_price * (1 - grid_range * self.tp_multiplier()) + end_price = mid_price * (1 + 2 * grid_range * self.sl_multiplier()) + sell_grid_action = self.create_grid_executor( + trading_pair=trading_pair, + side=TradeType.SELL, + start_price=start_price, + end_price=end_price, + grid_value=grid_value, + is_unfavorable=False + ) + if sell_grid_action is not None: + actions.append(sell_grid_action) + if difference > Decimal("0"): + # Create sell grid + start_price = mid_price * (1 - 2 * grid_range * self.tp_multiplier()) + end_price = mid_price * (1 + grid_range * self.sl_multiplier()) + sell_grid_action = self.create_grid_executor( + trading_pair=trading_pair, + side=TradeType.SELL, + start_price=start_price, + end_price=end_price, + grid_value=grid_value, + is_unfavorable=False + ) + if sell_grid_action is not None: + actions.append(sell_grid_action) + # Create buy grid + start_price = mid_price * (1 - grid_range * self.sl_multiplier()) + end_price = mid_price * (1 + 2 * grid_range * self.tp_multiplier()) + buy_grid_action = self.create_grid_executor( + trading_pair=trading_pair, + side=TradeType.BUY, + start_price=start_price, + end_price=end_price, + grid_value=grid_value, + is_unfavorable=False + ) + if buy_grid_action is not None: + actions.append(buy_grid_action) + return actions + + def create_grid_executor( + self, + trading_pair: str, + side: TradeType, + start_price: Decimal, + end_price: Decimal, + grid_value: Decimal, + is_unfavorable: bool = False + ) -> CreateExecutorAction: + """Creates a grid executor with dynamic sizing and range adjustments""" + # Get trading rules and minimum notional + trading_rules = self.market_data_provider.get_trading_rules(self.config.connector_name, trading_pair) + min_notional = max( + self.config.min_order_amount, + trading_rules.min_notional_size if trading_rules else Decimal("5.0") + ) + # Add safety margin and check if grid value is sufficient + min_grid_value = min_notional * Decimal("5") # Ensure room for at least 5 levels + if grid_value < min_grid_value: + self.logger().info( + f"Grid value {grid_value} is too small for {trading_pair}. " + f"Minimum required for viable grid: {min_grid_value}" + ) + return None # Skip grid creation if value is too small + + # Select order frequency based on grid favorability + order_frequency = ( + self.config.unfavorable_order_frequency if is_unfavorable + else self.config.favorable_order_frequency + ) + # Calculate limit price to be more aggressive than grid boundaries + if side == TradeType.BUY: + # For buys, limit price should be lower than start price + limit_price = start_price * (1 - self.config.limit_price_spread) + else: + # For sells, limit price should be higher than end price + limit_price = end_price * (1 + self.config.limit_price_spread) + # Create the executor action + action = CreateExecutorAction( + controller_id=self.config.id, + executor_config=GridExecutorConfig( + timestamp=self.market_data_provider.time(), + connector_name=self.config.connector_name, + trading_pair=trading_pair, + side=side, + start_price=start_price, + end_price=end_price, + limit_price=limit_price, + leverage=self.config.leverage, + total_amount_quote=grid_value, + safe_extra_spread=self.config.safe_extra_spread, + min_spread_between_orders=self.config.min_spread_between_orders, + min_order_amount_quote=self.config.min_order_amount, + max_open_orders=self.config.max_open_orders, + order_frequency=order_frequency, # Use dynamic order frequency + max_orders_per_batch=self.config.max_orders_per_batch, + activation_bounds=self.config.activation_bounds, + keep_position=True, # Always keep position for potential reversal + coerce_tp_to_step=True, + triple_barrier_config=TripleBarrierConfig( + take_profit=self.config.grid_tp_multiplier, + open_order_type=OrderType.LIMIT_MAKER, + take_profit_order_type=OrderType.LIMIT_MAKER, + stop_loss=None, + time_limit=None, + trailing_stop=None, + ))) + # Track unfavorable grid configs + if is_unfavorable: + self.unfavorable_grid_ids.add(action.executor_config.id) + self.logger().info( + f"Created unfavorable grid for {trading_pair} - " + f"Side: {side.name}, Value: ${grid_value:,.2f}, " + f"Order Frequency: {order_frequency}s" + ) + else: + self.logger().info( + f"Created favorable grid for {trading_pair} - " + f"Side: {side.name}, Value: ${grid_value:,.2f}, " + f"Order Frequency: {order_frequency}s" + ) + + return action + + def get_mid_price(self, trading_pair: str) -> Decimal: + return self.market_data_provider.get_price_by_type(self.config.connector_name, trading_pair, PriceType.MidPrice) diff --git a/bots/controllers/generic/stat_arb.py b/bots/controllers/generic/stat_arb.py new file mode 100644 index 0000000..527db07 --- /dev/null +++ b/bots/controllers/generic/stat_arb.py @@ -0,0 +1,475 @@ +from decimal import Decimal +from typing import List + +import numpy as np +from sklearn.linear_model import LinearRegression + +from hummingbot.core.data_type.common import OrderType, PositionAction, PositionMode, PriceType, TradeType +from hummingbot.data_feed.candles_feed.data_types import CandlesConfig +from hummingbot.strategy_v2.controllers import ControllerBase, ControllerConfigBase +from hummingbot.strategy_v2.executors.data_types import ConnectorPair, PositionSummary +from hummingbot.strategy_v2.executors.order_executor.data_types import ExecutionStrategy, OrderExecutorConfig +from hummingbot.strategy_v2.executors.position_executor.data_types import PositionExecutorConfig, TripleBarrierConfig +from hummingbot.strategy_v2.models.executor_actions import CreateExecutorAction, ExecutorAction, StopExecutorAction + + +class StatArbConfig(ControllerConfigBase): + """ + Configuration for a statistical arbitrage controller that trades two cointegrated assets. + """ + controller_type: str = "generic" + controller_name: str = "stat_arb" + candles_config: List[CandlesConfig] = [] + connector_pair_dominant: ConnectorPair = ConnectorPair(connector_name="binance_perpetual", trading_pair="SOL-USDT") + connector_pair_hedge: ConnectorPair = ConnectorPair(connector_name="binance_perpetual", trading_pair="POPCAT-USDT") + interval: str = "1m" + lookback_period: int = 300 + entry_threshold: Decimal = Decimal("2.0") + take_profit: Decimal = Decimal("0.0008") + tp_global: Decimal = Decimal("0.01") + sl_global: Decimal = Decimal("0.05") + min_amount_quote: Decimal = Decimal("10") + quoter_spread: Decimal = Decimal("0.0001") + quoter_cooldown: int = 30 + quoter_refresh: int = 10 + max_orders_placed_per_side: int = 2 + max_orders_filled_per_side: int = 2 + max_position_deviation: Decimal = Decimal("0.1") + pos_hedge_ratio: Decimal = Decimal("1.0") + leverage: int = 20 + position_mode: PositionMode = PositionMode.HEDGE + + @property + def triple_barrier_config(self) -> TripleBarrierConfig: + return TripleBarrierConfig( + take_profit=self.take_profit, + open_order_type=OrderType.LIMIT_MAKER, + take_profit_order_type=OrderType.LIMIT_MAKER, + ) + + def update_markets(self, markets: dict) -> dict: + """Update markets dictionary with both trading pairs""" + # Add dominant pair + if self.connector_pair_dominant.connector_name not in markets: + markets[self.connector_pair_dominant.connector_name] = set() + markets[self.connector_pair_dominant.connector_name].add(self.connector_pair_dominant.trading_pair) + + # Add hedge pair + if self.connector_pair_hedge.connector_name not in markets: + markets[self.connector_pair_hedge.connector_name] = set() + markets[self.connector_pair_hedge.connector_name].add(self.connector_pair_hedge.trading_pair) + + return markets + + +class StatArb(ControllerBase): + """ + Statistical arbitrage controller that trades two cointegrated assets. + """ + + def __init__(self, config: StatArbConfig, *args, **kwargs): + super().__init__(config, *args, **kwargs) + self.config = config + self.theoretical_dominant_quote = self.config.total_amount_quote * (1 / (1 + self.config.pos_hedge_ratio)) + self.theoretical_hedge_quote = self.config.total_amount_quote * (self.config.pos_hedge_ratio / (1 + self.config.pos_hedge_ratio)) + + # Initialize processed data dictionary + self.processed_data = { + "dominant_price": None, + "hedge_price": None, + "spread": None, + "z_score": None, + "hedge_ratio": None, + "position_dominant": Decimal("0"), + "position_hedge": Decimal("0"), + "active_orders_dominant": [], + "active_orders_hedge": [], + "pair_pnl": Decimal("0"), + "signal": 0 # 0: no signal, 1: long dominant/short hedge, -1: short dominant/long hedge + } + + # Setup candles config if not already set + if len(self.config.candles_config) == 0: + max_records = self.config.lookback_period + 20 # extra records for safety + self.max_records = max_records + self.config.candles_config = [ + CandlesConfig( + connector=self.config.connector_pair_dominant.connector_name, + trading_pair=self.config.connector_pair_dominant.trading_pair, + interval=self.config.interval, + max_records=max_records + ), + CandlesConfig( + connector=self.config.connector_pair_hedge.connector_name, + trading_pair=self.config.connector_pair_hedge.trading_pair, + interval=self.config.interval, + max_records=max_records + ) + ] + if "_perpetual" in self.config.connector_pair_dominant.connector_name: + connector = self.market_data_provider.get_connector(self.config.connector_pair_dominant.connector_name) + connector.set_position_mode(self.config.position_mode) + connector.set_leverage(self.config.connector_pair_dominant.trading_pair, self.config.leverage) + if "_perpetual" in self.config.connector_pair_hedge.connector_name: + connector = self.market_data_provider.get_connector(self.config.connector_pair_hedge.connector_name) + connector.set_position_mode(self.config.position_mode) + connector.set_leverage(self.config.connector_pair_hedge.trading_pair, self.config.leverage) + + def determine_executor_actions(self) -> List[ExecutorAction]: + """ + The execution logic for the statistical arbitrage strategy. + Market Data Conditions: Signal is generated based on the z-score of the spread between the two assets. + If signal == 1 --> long dominant/short hedge + If signal == -1 --> short dominant/long hedge + Execution Conditions: If the signal is generated add position executors to quote from the dominant and hedge markets. + We compare the current position with the theoretical position for the dominant and hedge assets. + If the current position + the active placed amount is greater than the theoretical position, can't place more orders. + If the imbalance scaled pct is greater than the threshold, we avoid placing orders in the market passed on filtered_connector_pair. + If the pnl of total position is greater than the take profit or lower than the stop loss, we close the position. + """ + actions: List[ExecutorAction] = [] + # Check global take profit and stop loss + if self.processed_data["pair_pnl_pct"] > self.config.tp_global or self.processed_data["pair_pnl_pct"] < -self.config.sl_global: + # Close all positions + for position in self.positions_held: + actions.extend(self.get_executors_to_reduce_position(position)) + return actions + # Check the signal + elif self.processed_data["signal"] != 0: + actions.extend(self.get_executors_to_quote()) + actions.extend(self.get_executors_to_reduce_position_on_opposite_signal()) + + # Get the executors to keep position after a cooldown is reached + actions.extend(self.get_executors_to_keep_position()) + actions.extend(self.get_executors_to_refresh()) + + return actions + + def get_executors_to_reduce_position_on_opposite_signal(self) -> List[ExecutorAction]: + if self.processed_data["signal"] == 1: + dominant_side, hedge_side = TradeType.SELL, TradeType.BUY + elif self.processed_data["signal"] == -1: + dominant_side, hedge_side = TradeType.BUY, TradeType.SELL + else: + return [] + # Get executors to stop + dominant_active_executors_to_stop = self.filter_executors(self.executors_info, filter_func=lambda e: e.connector_name == self.config.connector_pair_dominant.connector_name and e.trading_pair == self.config.connector_pair_dominant.trading_pair and e.side == dominant_side) + hedge_active_executors_to_stop = self.filter_executors(self.executors_info, filter_func=lambda e: e.connector_name == self.config.connector_pair_hedge.connector_name and e.trading_pair == self.config.connector_pair_hedge.trading_pair and e.side == hedge_side) + stop_actions = [StopExecutorAction(controller_id=self.config.id, executor_id=executor.id, keep_position=False) for executor in dominant_active_executors_to_stop + hedge_active_executors_to_stop] + + # Get order executors to reduce positions + reduce_actions: List[ExecutorAction] = [] + for position in self.positions_held: + if position.connector_name == self.config.connector_pair_dominant.connector_name and position.trading_pair == self.config.connector_pair_dominant.trading_pair and position.side == dominant_side: + reduce_actions.extend(self.get_executors_to_reduce_position(position)) + elif position.connector_name == self.config.connector_pair_hedge.connector_name and position.trading_pair == self.config.connector_pair_hedge.trading_pair and position.side == hedge_side: + reduce_actions.extend(self.get_executors_to_reduce_position(position)) + return stop_actions + reduce_actions + + def get_executors_to_keep_position(self) -> List[ExecutorAction]: + stop_actions: List[ExecutorAction] = [] + for executor in self.processed_data["executors_dominant_filled"] + self.processed_data["executors_hedge_filled"]: + if self.market_data_provider.time() - executor.timestamp >= self.config.quoter_cooldown: + # Create a new executor to keep the position + stop_actions.append(StopExecutorAction(controller_id=self.config.id, executor_id=executor.id, keep_position=True)) + return stop_actions + + def get_executors_to_refresh(self) -> List[ExecutorAction]: + refresh_actions: List[ExecutorAction] = [] + for executor in self.processed_data["executors_dominant_placed"] + self.processed_data["executors_hedge_placed"]: + if self.market_data_provider.time() - executor.timestamp >= self.config.quoter_refresh: + # Create a new executor to refresh the position + refresh_actions.append(StopExecutorAction(controller_id=self.config.id, executor_id=executor.id, keep_position=False)) + return refresh_actions + + def get_executors_to_quote(self) -> List[ExecutorAction]: + """ + Get Order Executor to quote from the dominant and hedge markets. + """ + actions: List[ExecutorAction] = [] + trade_type_dominant = TradeType.BUY if self.processed_data["signal"] == 1 else TradeType.SELL + trade_type_hedge = TradeType.SELL if self.processed_data["signal"] == 1 else TradeType.BUY + + # Analyze dominant active orders, max deviation and imbalance to create a new executor + if self.processed_data["dominant_gap"] > Decimal("0") and \ + self.processed_data["filter_connector_pair"] != self.config.connector_pair_dominant and \ + len(self.processed_data["executors_dominant_placed"]) < self.config.max_orders_placed_per_side and \ + len(self.processed_data["executors_dominant_filled"]) < self.config.max_orders_filled_per_side: + # Create Position Executor for dominant asset + if trade_type_dominant == TradeType.BUY: + price = self.processed_data["min_price_dominant"] * (1 - self.config.quoter_spread) + else: + price = self.processed_data["max_price_dominant"] * (1 + self.config.quoter_spread) + dominant_executor_config = PositionExecutorConfig( + timestamp=self.market_data_provider.time(), + connector_name=self.config.connector_pair_dominant.connector_name, + trading_pair=self.config.connector_pair_dominant.trading_pair, + side=trade_type_dominant, + entry_price=price, + amount=self.config.min_amount_quote / self.processed_data["dominant_price"], + triple_barrier_config=self.config.triple_barrier_config, + leverage=self.config.leverage, + ) + actions.append(CreateExecutorAction(controller_id=self.config.id, executor_config=dominant_executor_config)) + + # Analyze hedge active orders, max deviation and imbalance to create a new executor + if self.processed_data["hedge_gap"] > Decimal("0") and \ + self.processed_data["filter_connector_pair"] != self.config.connector_pair_hedge and \ + len(self.processed_data["executors_hedge_placed"]) < self.config.max_orders_placed_per_side and \ + len(self.processed_data["executors_hedge_filled"]) < self.config.max_orders_filled_per_side: + # Create Position Executor for hedge asset + if trade_type_hedge == TradeType.BUY: + price = self.processed_data["min_price_hedge"] * (1 - self.config.quoter_spread) + else: + price = self.processed_data["max_price_hedge"] * (1 + self.config.quoter_spread) + hedge_executor_config = PositionExecutorConfig( + timestamp=self.market_data_provider.time(), + connector_name=self.config.connector_pair_hedge.connector_name, + trading_pair=self.config.connector_pair_hedge.trading_pair, + side=trade_type_hedge, + entry_price=price, + amount=self.config.min_amount_quote / self.processed_data["hedge_price"], + triple_barrier_config=self.config.triple_barrier_config, + leverage=self.config.leverage, + ) + actions.append(CreateExecutorAction(controller_id=self.config.id, executor_config=hedge_executor_config)) + return actions + + def get_executors_to_reduce_position(self, position: PositionSummary) -> List[ExecutorAction]: + """ + Get Order Executor to reduce position. + """ + if position.amount > Decimal("0"): + # Close position + config = OrderExecutorConfig( + timestamp=self.market_data_provider.time(), + connector_name=position.connector_name, + trading_pair=position.trading_pair, + side=TradeType.BUY if position.side == TradeType.SELL else TradeType.SELL, + amount=position.amount, + position_action=PositionAction.CLOSE, + execution_strategy=ExecutionStrategy.MARKET, + leverage=self.config.leverage, + ) + return [CreateExecutorAction(controller_id=self.config.id, executor_config=config)] + return [] + + async def update_processed_data(self): + """ + Update processed data with the latest market information and statistical calculations + needed for the statistical arbitrage strategy. + """ + # Stat arb analysis + spread, z_score = self.get_spread_and_z_score() + + # Generate trading signal based on z-score + entry_threshold = float(self.config.entry_threshold) + if z_score > entry_threshold: + # Spread is too high, expect it to revert: long dominant, short hedge + signal = 1 + dominant_side, hedge_side = TradeType.BUY, TradeType.SELL + elif z_score < -entry_threshold: + # Spread is too low, expect it to revert: short dominant, long hedge + signal = -1 + dominant_side, hedge_side = TradeType.SELL, TradeType.BUY + else: + # No signal + signal = 0 + dominant_side, hedge_side = None, None + + # Current prices + dominant_price, hedge_price = self.get_pairs_prices() + + # Get current positions stats by signal + positions_dominant = next((position for position in self.positions_held if position.connector_name == self.config.connector_pair_dominant.connector_name and position.trading_pair == self.config.connector_pair_dominant.trading_pair and (position.side == dominant_side or dominant_side is None)), None) + positions_hedge = next((position for position in self.positions_held if position.connector_name == self.config.connector_pair_hedge.connector_name and position.trading_pair == self.config.connector_pair_hedge.trading_pair and (position.side == hedge_side or hedge_side is None)), None) + # Get position stats + position_dominant_quote = positions_dominant.amount_quote if positions_dominant else Decimal("0") + position_hedge_quote = positions_hedge.amount_quote if positions_hedge else Decimal("0") + position_dominant_pnl_quote = positions_dominant.global_pnl_quote if positions_dominant else Decimal("0") + position_hedge_pnl_quote = positions_hedge.global_pnl_quote if positions_hedge else Decimal("0") + pair_pnl_pct = (position_dominant_pnl_quote + position_hedge_pnl_quote) / (position_dominant_quote + position_hedge_quote) if (position_dominant_quote + position_hedge_quote) != 0 else Decimal("0") + # Get active executors + executors_dominant_placed, executors_dominant_filled = self.get_executors_dominant() + executors_hedge_placed, executors_hedge_filled = self.get_executors_hedge() + min_price_dominant = Decimal(str(min([executor.config.entry_price for executor in executors_dominant_placed]))) if executors_dominant_placed else None + max_price_dominant = Decimal(str(max([executor.config.entry_price for executor in executors_dominant_placed]))) if executors_dominant_placed else None + min_price_hedge = Decimal(str(min([executor.config.entry_price for executor in executors_hedge_placed]))) if executors_hedge_placed else None + max_price_hedge = Decimal(str(max([executor.config.entry_price for executor in executors_hedge_placed]))) if executors_hedge_placed else None + + active_amount_dominant = Decimal(str(sum([executor.filled_amount_quote for executor in executors_dominant_filled]))) + active_amount_hedge = Decimal(str(sum([executor.filled_amount_quote for executor in executors_hedge_filled]))) + + # Compute imbalance based on the hedge ratio + dominant_gap = self.theoretical_dominant_quote - position_dominant_quote - active_amount_dominant + hedge_gap = self.theoretical_hedge_quote - position_hedge_quote - active_amount_hedge + imbalance = position_dominant_quote - position_hedge_quote + imbalance_scaled = position_dominant_quote - position_hedge_quote * self.config.pos_hedge_ratio + imbalance_scaled_pct = imbalance_scaled / position_dominant_quote if position_dominant_quote != Decimal("0") else Decimal("0") + filter_connector_pair = None + if imbalance_scaled_pct > self.config.max_position_deviation: + # Avoid placing orders in the dominant market + filter_connector_pair = self.config.connector_pair_dominant + elif imbalance_scaled_pct < -self.config.max_position_deviation: + # Avoid placing orders in the hedge market + filter_connector_pair = self.config.connector_pair_hedge + + # Update processed data + self.processed_data.update({ + "dominant_price": Decimal(str(dominant_price)), + "hedge_price": Decimal(str(hedge_price)), + "spread": Decimal(str(spread)), + "z_score": Decimal(str(z_score)), + "dominant_gap": Decimal(str(dominant_gap)), + "hedge_gap": Decimal(str(hedge_gap)), + "position_dominant_quote": position_dominant_quote, + "position_hedge_quote": position_hedge_quote, + "active_amount_dominant": active_amount_dominant, + "active_amount_hedge": active_amount_hedge, + "signal": signal, + # Store full dataframes for reference + "imbalance": Decimal(str(imbalance)), + "imbalance_scaled_pct": Decimal(str(imbalance_scaled_pct)), + "filter_connector_pair": filter_connector_pair, + "min_price_dominant": min_price_dominant if min_price_dominant is not None else Decimal(str(dominant_price)), + "max_price_dominant": max_price_dominant if max_price_dominant is not None else Decimal(str(dominant_price)), + "min_price_hedge": min_price_hedge if min_price_hedge is not None else Decimal(str(hedge_price)), + "max_price_hedge": max_price_hedge if max_price_hedge is not None else Decimal(str(hedge_price)), + "executors_dominant_filled": executors_dominant_filled, + "executors_hedge_filled": executors_hedge_filled, + "executors_dominant_placed": executors_dominant_placed, + "executors_hedge_placed": executors_hedge_placed, + "pair_pnl_pct": pair_pnl_pct, + }) + + def get_spread_and_z_score(self): + # Fetch candle data for both assets + dominant_df = self.market_data_provider.get_candles_df( + connector_name=self.config.connector_pair_dominant.connector_name, + trading_pair=self.config.connector_pair_dominant.trading_pair, + interval=self.config.interval, + max_records=self.max_records + ) + + hedge_df = self.market_data_provider.get_candles_df( + connector_name=self.config.connector_pair_hedge.connector_name, + trading_pair=self.config.connector_pair_hedge.trading_pair, + interval=self.config.interval, + max_records=self.max_records + ) + + if dominant_df.empty or hedge_df.empty: + self.logger().warning("Not enough candle data available for statistical analysis") + return + + # Extract close prices + dominant_prices = dominant_df['close'].values + hedge_prices = hedge_df['close'].values + + # Ensure we have enough data and both series have the same length + min_length = min(len(dominant_prices), len(hedge_prices)) + if min_length < self.config.lookback_period: + self.logger().warning( + f"Not enough data points for analysis. Required: {self.config.lookback_period}, Available: {min_length}") + return + + # Use the most recent data points + dominant_prices = dominant_prices[-self.config.lookback_period:] + hedge_prices = hedge_prices[-self.config.lookback_period:] + + # Convert to numpy arrays + dominant_prices_np = np.array(dominant_prices, dtype=float) + hedge_prices_np = np.array(hedge_prices, dtype=float) + + # Calculate percentage returns + dominant_pct_change = np.diff(dominant_prices_np) / dominant_prices_np[:-1] + hedge_pct_change = np.diff(hedge_prices_np) / hedge_prices_np[:-1] + + # Convert to cumulative returns + dominant_cum_returns = np.cumprod(dominant_pct_change + 1) + hedge_cum_returns = np.cumprod(hedge_pct_change + 1) + + # Normalize to start at 1 + dominant_cum_returns = dominant_cum_returns / dominant_cum_returns[0] if len(dominant_cum_returns) > 0 else np.array([1.0]) + hedge_cum_returns = hedge_cum_returns / hedge_cum_returns[0] if len(hedge_cum_returns) > 0 else np.array([1.0]) + + # Perform linear regression + dominant_cum_returns_reshaped = dominant_cum_returns.reshape(-1, 1) + reg = LinearRegression().fit(dominant_cum_returns_reshaped, hedge_cum_returns) + alpha = reg.intercept_ + beta = reg.coef_[0] + self.processed_data.update({ + "alpha": alpha, + "beta": beta, + }) + + # Calculate spread as percentage difference from predicted value + y_pred = alpha + beta * dominant_cum_returns + spread_pct = (hedge_cum_returns - y_pred) / y_pred * 100 + + # Calculate z-score + mean_spread = np.mean(spread_pct) + std_spread = np.std(spread_pct) + if std_spread == 0: + self.logger().warning("Standard deviation of spread is zero, cannot calculate z-score") + return + + current_spread = spread_pct[-1] + current_z_score = (current_spread - mean_spread) / std_spread + + return current_spread, current_z_score + + def get_pairs_prices(self): + current_dominant_price = self.market_data_provider.get_price_by_type( + connector_name=self.config.connector_pair_dominant.connector_name, + trading_pair=self.config.connector_pair_dominant.trading_pair, price_type=PriceType.MidPrice) + + current_hedge_price = self.market_data_provider.get_price_by_type( + connector_name=self.config.connector_pair_hedge.connector_name, + trading_pair=self.config.connector_pair_hedge.trading_pair, price_type=PriceType.MidPrice) + return current_dominant_price, current_hedge_price + + def get_executors_dominant(self): + active_executors_dominant_placed = self.filter_executors( + self.executors_info, + filter_func=lambda e: e.connector_name == self.config.connector_pair_dominant.connector_name and e.trading_pair == self.config.connector_pair_dominant.trading_pair and e.is_active and not e.is_trading and e.type == "position_executor" + ) + active_executors_dominant_filled = self.filter_executors( + self.executors_info, + filter_func=lambda e: e.connector_name == self.config.connector_pair_dominant.connector_name and e.trading_pair == self.config.connector_pair_dominant.trading_pair and e.is_active and e.is_trading and e.type == "position_executor" + ) + return active_executors_dominant_placed, active_executors_dominant_filled + + def get_executors_hedge(self): + active_executors_hedge_placed = self.filter_executors( + self.executors_info, + filter_func=lambda e: e.connector_name == self.config.connector_pair_hedge.connector_name and e.trading_pair == self.config.connector_pair_hedge.trading_pair and e.is_active and not e.is_trading and e.type == "position_executor" + ) + active_executors_hedge_filled = self.filter_executors( + self.executors_info, + filter_func=lambda e: e.connector_name == self.config.connector_pair_hedge.connector_name and e.trading_pair == self.config.connector_pair_hedge.trading_pair and e.is_active and e.is_trading and e.type == "position_executor" + ) + return active_executors_hedge_placed, active_executors_hedge_filled + + def to_format_status(self) -> List[str]: + """ + Format the status of the controller for display. + """ + status_lines = [] + status_lines.append(f""" +Dominant Pair: {self.config.connector_pair_dominant} | Hedge Pair: {self.config.connector_pair_hedge} | +Timeframe: {self.config.interval} | Lookback Period: {self.config.lookback_period} | Entry Threshold: {self.config.entry_threshold} + +Positions targets: +Theoretical Dominant : {self.theoretical_dominant_quote} | Theoretical Hedge: {self.theoretical_hedge_quote} | Position Hedge Ratio: {self.config.pos_hedge_ratio} +Position Dominant : {self.processed_data['position_dominant_quote']:.2f} | Position Hedge: {self.processed_data['position_hedge_quote']:.2f} | Imbalance: {self.processed_data['imbalance']:.2f} | Imbalance Scaled: {self.processed_data['imbalance_scaled_pct']:.2f} % + +Current Executors: +Active Orders Dominant : {len(self.processed_data['executors_dominant_placed'])} | Active Orders Hedge : {len(self.processed_data['executors_hedge_placed'])} | +Active Orders Dominant Filled: {len(self.processed_data['executors_dominant_filled'])} | Active Orders Hedge Filled: {len(self.processed_data['executors_hedge_filled'])} + +Signal: {self.processed_data['signal']:.2f} | Z-Score: {self.processed_data['z_score']:.2f} | Spread: {self.processed_data['spread']:.2f} +Alpha : {self.processed_data['alpha']:.2f} | Beta: {self.processed_data['beta']:.2f} +Pair PnL PCT: {self.processed_data['pair_pnl_pct'] * 100:.2f} % +""") + return status_lines diff --git a/bots/credentials/master_account/.password_verification b/bots/credentials/master_account/.password_verification deleted file mode 100644 index b8c7618..0000000 --- a/bots/credentials/master_account/.password_verification +++ /dev/null @@ -1 +0,0 @@ -7b2263727970746f223a207b22636970686572223a20226165732d3132382d637472222c2022636970686572706172616d73223a207b226976223a20223864336365306436393461623131396334363135663935366464653839363063227d2c202263697068657274657874223a20223836333266323430613563306131623665353664222c20226b6466223a202270626b646632222c20226b6466706172616d73223a207b2263223a20313030303030302c2022646b6c656e223a2033322c2022707266223a2022686d61632d736861323536222c202273616c74223a20226566373330376531636464373964376132303338323534656139343433663930227d2c20226d6163223a202266393439383534613530633138363633386363353962336133363665633962353333386633613964373266636635343066313034333361353431636232306438227d2c202276657273696f6e223a20337d \ No newline at end of file diff --git a/bots/credentials/master_account/conf_client.yml b/bots/credentials/master_account/conf_client.yml index 41e4173..7093ae6 100644 --- a/bots/credentials/master_account/conf_client.yml +++ b/bots/credentials/master_account/conf_client.yml @@ -36,14 +36,12 @@ mqtt_bridge: mqtt_notifier: true mqtt_commands: true mqtt_events: true + mqtt_external_events: true mqtt_autostart: true # Error log sharing send_error_logs: true -# Can store the previous strategy ran for quick retrieval. -previous_strategy: some-strategy.yml - # Advanced database options, currently supports SQLAlchemy's included dialects # Reference: https://docs.sqlalchemy.org/en/13/dialects/ # To use an instance of SQLite DB the required configuration is @@ -113,19 +111,6 @@ certs_path: /Users/dardonacci/Documents/work/hummingbot/certs anonymized_metrics_mode: anonymized_metrics_interval_min: 15.0 -# Command Shortcuts -# Define abbreviations for often used commands -# or batch grouped commands together -command_shortcuts: -- command: spreads - help: Set bid and ask spread - arguments: - - Bid Spread - - Ask Spread - output: - - config bid_spread $1 - - config ask_spread $2 - # A source for rate oracle, currently ascend_ex, binance, coin_gecko, coin_cap, kucoin, gate_io rate_oracle_source: name: binance diff --git a/bots/data/.gitignore b/bots/data/.gitignore deleted file mode 100644 index e69de29..0000000 diff --git a/bots/scripts/v2_with_controllers.py b/bots/scripts/v2_with_controllers.py index 80dac86..d345f0b 100644 --- a/bots/scripts/v2_with_controllers.py +++ b/bots/scripts/v2_with_controllers.py @@ -1,33 +1,24 @@ import os -import time from decimal import Decimal from typing import Dict, List, Optional, Set from hummingbot.client.hummingbot_application import HummingbotApplication from hummingbot.connector.connector_base import ConnectorBase -from hummingbot.core.clock import Clock -from hummingbot.core.data_type.common import OrderType, TradeType from hummingbot.data_feed.candles_feed.data_types import CandlesConfig -from hummingbot.remote_iface.mqtt import ETopicPublisher from hummingbot.strategy.strategy_v2_base import StrategyV2Base, StrategyV2ConfigBase from hummingbot.strategy_v2.models.base import RunnableStatus from hummingbot.strategy_v2.models.executor_actions import CreateExecutorAction, StopExecutorAction -class GenericV2StrategyWithCashOutConfig(StrategyV2ConfigBase): +class V2WithControllersConfig(StrategyV2ConfigBase): script_file_name: str = os.path.basename(__file__) candles_config: List[CandlesConfig] = [] markets: Dict[str, Set[str]] = {} - time_to_cash_out: Optional[int] = None - max_global_drawdown: Optional[float] = None - max_controller_drawdown: Optional[float] = None - rebalance_interval: Optional[int] = None - extra_inventory: Optional[float] = 0.02 - min_amount_to_rebalance_usd: Decimal = Decimal("8") - asset_to_rebalance: str = "USDT" + max_global_drawdown_quote: Optional[float] = None + max_controller_drawdown_quote: Optional[float] = None -class GenericV2StrategyWithCashOut(StrategyV2Base): +class V2WithControllers(StrategyV2Base): """ This script runs a generic strategy with cash out feature. Will also check if the controllers configs have been updated and apply the new settings. @@ -40,131 +31,43 @@ class GenericV2StrategyWithCashOut(StrategyV2Base): """ performance_report_interval: int = 1 - def __init__(self, connectors: Dict[str, ConnectorBase], config: GenericV2StrategyWithCashOutConfig): + def __init__(self, connectors: Dict[str, ConnectorBase], config: V2WithControllersConfig): super().__init__(connectors, config) self.config = config - self.cashing_out = False self.max_pnl_by_controller = {} - self.performance_reports = {} self.max_global_pnl = Decimal("0") self.drawdown_exited_controllers = [] self.closed_executors_buffer: int = 30 - self.rebalance_interval: int = self.config.rebalance_interval self._last_performance_report_timestamp = 0 - self._last_rebalance_check_timestamp = 0 - hb_app = HummingbotApplication.main_application() - self.mqtt_enabled = hb_app._mqtt is not None - self._pub: Optional[ETopicPublisher] = None - if self.config.time_to_cash_out: - self.cash_out_time = self.config.time_to_cash_out + time.time() - else: - self.cash_out_time = None - - def start(self, clock: Clock, timestamp: float) -> None: - """ - Start the strategy. - :param clock: Clock to use. - :param timestamp: Current time. - """ - self._last_timestamp = timestamp - self.apply_initial_setting() - if self.mqtt_enabled: - self._pub = ETopicPublisher("performance", use_bot_prefix=True) - - async def on_stop(self): - await super().on_stop() - if self.mqtt_enabled: - self._pub({controller_id: {} for controller_id in self.controllers.keys()}) - self._pub = None def on_tick(self): super().on_tick() - self.performance_reports = {controller_id: self.executor_orchestrator.generate_performance_report(controller_id=controller_id).dict() for controller_id in self.controllers.keys()} - self.control_rebalance() - self.control_cash_out() - self.control_max_drawdown() - self.send_performance_report() - - def control_rebalance(self): - if self.rebalance_interval and self._last_rebalance_check_timestamp + self.rebalance_interval <= self.current_timestamp: - balance_required = {} - for controller_id, controller in self.controllers.items(): - connector_name = controller.config.model_dump().get("connector_name") - if connector_name and "perpetual" in connector_name: - continue - if connector_name not in balance_required: - balance_required[connector_name] = {} - tokens_required = controller.get_balance_requirements() - for token, amount in tokens_required: - if token not in balance_required[connector_name]: - balance_required[connector_name][token] = amount - else: - balance_required[connector_name][token] += amount - for connector_name, balance_requirements in balance_required.items(): - connector = self.connectors[connector_name] - for token, amount in balance_requirements.items(): - if token == self.config.asset_to_rebalance: - continue - balance = connector.get_balance(token) - trading_pair = f"{token}-{self.config.asset_to_rebalance}" - mid_price = connector.get_mid_price(trading_pair) - trading_rule = connector.trading_rules[trading_pair] - amount_with_safe_margin = amount * (1 + Decimal(self.config.extra_inventory)) - active_executors_for_pair = self.filter_executors( - executors=self.get_all_executors(), - filter_func=lambda x: x.is_active and x.trading_pair == trading_pair and x.connector_name == connector_name - ) - unmatched_amount = sum([executor.filled_amount_quote for executor in active_executors_for_pair if executor.side == TradeType.SELL]) - sum([executor.filled_amount_quote for executor in active_executors_for_pair if executor.side == TradeType.BUY]) - balance += unmatched_amount / mid_price - base_balance_diff = balance - amount_with_safe_margin - abs_balance_diff = abs(base_balance_diff) - trading_rules_condition = abs_balance_diff > trading_rule.min_order_size and abs_balance_diff * mid_price > trading_rule.min_notional_size and abs_balance_diff * mid_price > self.config.min_amount_to_rebalance_usd - order_type = OrderType.MARKET - if base_balance_diff > 0: - if trading_rules_condition: - self.logger().info(f"Rebalance: Selling {amount_with_safe_margin} {token} to {self.config.asset_to_rebalance}. Balance: {balance} | Executors unmatched balance {unmatched_amount / mid_price}") - connector.sell( - trading_pair=trading_pair, - amount=abs_balance_diff, - order_type=order_type, - price=mid_price) - else: - self.logger().info("Skipping rebalance due a low amount to sell that may cause future imbalance") - else: - if not trading_rules_condition: - amount = max([self.config.min_amount_to_rebalance_usd / mid_price, trading_rule.min_order_size, trading_rule.min_notional_size / mid_price]) - self.logger().info(f"Rebalance: Buying for a higher value to avoid future imbalance {amount} {token} to {self.config.asset_to_rebalance}. Balance: {balance} | Executors unmatched balance {unmatched_amount}") - else: - amount = abs_balance_diff - self.logger().info(f"Rebalance: Buying {amount} {token} to {self.config.asset_to_rebalance}. Balance: {balance} | Executors unmatched balance {unmatched_amount}") - connector.buy( - trading_pair=trading_pair, - amount=amount, - order_type=order_type, - price=mid_price) - self._last_rebalance_check_timestamp = self.current_timestamp + if not self._is_stop_triggered: + self.check_manual_kill_switch() + self.control_max_drawdown() + self.send_performance_report() def control_max_drawdown(self): - if self.config.max_controller_drawdown: + if self.config.max_controller_drawdown_quote: self.check_max_controller_drawdown() - if self.config.max_global_drawdown: + if self.config.max_global_drawdown_quote: self.check_max_global_drawdown() def check_max_controller_drawdown(self): for controller_id, controller in self.controllers.items(): if controller.status != RunnableStatus.RUNNING: continue - controller_pnl = self.performance_reports[controller_id]["global_pnl_quote"] + controller_pnl = self.get_performance_report(controller_id).global_pnl_quote last_max_pnl = self.max_pnl_by_controller[controller_id] if controller_pnl > last_max_pnl: self.max_pnl_by_controller[controller_id] = controller_pnl else: current_drawdown = last_max_pnl - controller_pnl - if current_drawdown > self.config.max_controller_drawdown: + if current_drawdown > self.config.max_controller_drawdown_quote: self.logger().info(f"Controller {controller_id} reached max drawdown. Stopping the controller.") controller.stop() executors_order_placed = self.filter_executors( - executors=self.executors_info[controller_id], + executors=self.get_executors_by_controller(controller_id), filter_func=lambda x: x.is_active and not x.is_trading, ) self.executor_orchestrator.execute_actions( @@ -173,38 +76,24 @@ class GenericV2StrategyWithCashOut(StrategyV2Base): self.drawdown_exited_controllers.append(controller_id) def check_max_global_drawdown(self): - current_global_pnl = sum([report["global_pnl_quote"] for report in self.performance_reports.values()]) + current_global_pnl = sum([self.get_performance_report(controller_id).global_pnl_quote for controller_id in self.controllers.keys()]) if current_global_pnl > self.max_global_pnl: self.max_global_pnl = current_global_pnl else: current_global_drawdown = self.max_global_pnl - current_global_pnl - if current_global_drawdown > self.config.max_global_drawdown: + if current_global_drawdown > self.config.max_global_drawdown_quote: self.drawdown_exited_controllers.extend(list(self.controllers.keys())) self.logger().info("Global drawdown reached. Stopping the strategy.") + self._is_stop_triggered = True HummingbotApplication.main_application().stop() def send_performance_report(self): - if self.current_timestamp - self._last_performance_report_timestamp >= self.performance_report_interval and self.mqtt_enabled: - self._pub(self.performance_reports) + if self.current_timestamp - self._last_performance_report_timestamp >= self.performance_report_interval and self._pub: + performance_reports = {controller_id: self.get_performance_report(controller_id).dict() for controller_id in self.controllers.keys()} + self._pub(performance_reports) self._last_performance_report_timestamp = self.current_timestamp - def control_cash_out(self): - self.evaluate_cash_out_time() - if self.cashing_out: - self.check_executors_status() - else: - self.check_manual_cash_out() - - def evaluate_cash_out_time(self): - if self.cash_out_time and self.current_timestamp >= self.cash_out_time and not self.cashing_out: - self.logger().info("Cash out time reached. Stopping the controllers.") - for controller_id, controller in self.controllers.items(): - if controller.status == RunnableStatus.RUNNING: - self.logger().info(f"Cash out for controller {controller_id}.") - controller.stop() - self.cashing_out = True - - def check_manual_cash_out(self): + def check_manual_kill_switch(self): for controller_id, controller in self.controllers.items(): if controller.config.manual_kill_switch and controller.status == RunnableStatus.RUNNING: self.logger().info(f"Manual cash out for controller {controller_id}.") @@ -246,7 +135,7 @@ class GenericV2StrategyWithCashOut(StrategyV2Base): connectors_position_mode = {} for controller_id, controller in self.controllers.items(): self.max_pnl_by_controller[controller_id] = Decimal("0") - config_dict = controller.config.dict() + config_dict = controller.config.model_dump() if "connector_name" in config_dict: if self.is_perpetual(config_dict["connector_name"]): if "position_mode" in config_dict: