Files
deploy/bots/scripts/v2_with_controllers.py
2024-08-26 21:27:47 -03:00

190 lines
9.4 KiB
Python

import os
import time
from decimal import Decimal
from typing import Dict, List, Optional, Set
from pydantic import Field
from hummingbot.client.hummingbot_application import HummingbotApplication
from hummingbot.connector.connector_base import ConnectorBase
from hummingbot.core.clock import Clock
from hummingbot.data_feed.candles_feed.data_types import CandlesConfig
from hummingbot.remote_iface.mqtt import ETopicPublisher
from hummingbot.strategy.strategy_v2_base import StrategyV2Base, StrategyV2ConfigBase
from hummingbot.strategy_v2.models.base import RunnableStatus
from hummingbot.strategy_v2.models.executor_actions import CreateExecutorAction, StopExecutorAction
class GenericV2StrategyWithCashOutConfig(StrategyV2ConfigBase):
script_file_name: str = Field(default_factory=lambda: os.path.basename(__file__))
candles_config: List[CandlesConfig] = []
markets: Dict[str, Set[str]] = {}
time_to_cash_out: Optional[int] = None
max_global_drawdown: Optional[float] = None
max_controller_drawdown: Optional[float] = None
class GenericV2StrategyWithCashOut(StrategyV2Base):
"""
This script runs a generic strategy with cash out feature. Will also check if the controllers configs have been
updated and apply the new settings.
The cash out of the script can be set by the time_to_cash_out parameter in the config file. If set, the script will
stop the controllers after the specified time has passed, and wait until the active executors finalize their
execution.
The controllers will also have a parameter to manually cash out. In that scenario, the main strategy will stop the
specific controller and wait until the active executors finalize their execution. The rest of the executors will
wait until the main strategy stops them.
"""
def __init__(self, connectors: Dict[str, ConnectorBase], config: GenericV2StrategyWithCashOutConfig):
super().__init__(connectors, config)
self.config = config
self.cashing_out = False
self.max_pnl_by_controller = {}
self.performance_reports = {}
self.max_global_pnl = Decimal("0")
self.drawdown_exited_controllers = []
self.closed_executors_buffer: int = 30
self.performance_report_interval: int = 1
self._last_performance_report_timestamp = 0
hb_app = HummingbotApplication.main_application()
self.mqtt_enabled = hb_app._mqtt is not None
self._pub: Optional[ETopicPublisher] = None
if self.config.time_to_cash_out:
self.cash_out_time = self.config.time_to_cash_out + time.time()
else:
self.cash_out_time = None
def start(self, clock: Clock, timestamp: float) -> None:
"""
Start the strategy.
:param clock: Clock to use.
:param timestamp: Current time.
"""
self._last_timestamp = timestamp
self.apply_initial_setting()
if self.mqtt_enabled:
self._pub = ETopicPublisher("performance", use_bot_prefix=True)
async def on_stop(self):
await super().on_stop()
if self.mqtt_enabled:
self._pub({controller_id: {} for controller_id in self.controllers.keys()})
self._pub = None
def on_tick(self):
super().on_tick()
self.performance_reports = {controller_id: self.executor_orchestrator.generate_performance_report(controller_id=controller_id).dict() for controller_id in self.controllers.keys()}
self.control_cash_out()
self.control_max_drawdown()
self.send_performance_report()
def control_max_drawdown(self):
if self.config.max_controller_drawdown:
self.check_max_controller_drawdown()
if self.config.max_global_drawdown:
self.check_max_global_drawdown()
def check_max_controller_drawdown(self):
for controller_id, controller in self.controllers.items():
controller_pnl = self.performance_reports[controller_id]["global_pnl_quote"]
last_max_pnl = self.max_pnl_by_controller[controller_id]
if controller_pnl > last_max_pnl:
self.max_pnl_by_controller[controller_id] = controller_pnl
else:
current_drawdown = last_max_pnl - controller_pnl
if current_drawdown > self.config.max_controller_drawdown:
self.logger().info(f"Controller {controller_id} reached max drawdown. Stopping the controller.")
controller.stop()
executors_order_placed = self.filter_executors(
executors=self.executors_info[controller_id],
filter_func=lambda x: x.is_active and not x.is_trading,
)
self.executor_orchestrator.execute_actions(
actions=[StopExecutorAction(controller_id=controller_id, executor_id=executor.id) for executor in executors_order_placed]
)
self.drawdown_exited_controllers.append(controller_id)
def check_max_global_drawdown(self):
current_global_pnl = sum([report["global_pnl_quote"] for report in self.performance_reports.values()])
if current_global_pnl > self.max_global_pnl:
self.max_global_pnl = current_global_pnl
else:
current_global_drawdown = self.max_global_pnl - current_global_pnl
if current_global_drawdown > self.config.max_global_drawdown:
self.logger().info("Global drawdown reached. Stopping the strategy.")
HummingbotApplication.main_application().stop()
def send_performance_report(self):
if self.current_timestamp - self._last_performance_report_timestamp >= self.performance_report_interval and self.mqtt_enabled:
self._pub(self.performance_reports)
self._last_performance_report_timestamp = self.current_timestamp
def control_cash_out(self):
self.evaluate_cash_out_time()
if self.cashing_out:
self.check_executors_status()
else:
self.check_manual_cash_out()
def evaluate_cash_out_time(self):
if self.cash_out_time and self.current_timestamp >= self.cash_out_time and not self.cashing_out:
self.logger().info("Cash out time reached. Stopping the controllers.")
for controller_id, controller in self.controllers.items():
if controller.status == RunnableStatus.RUNNING:
self.logger().info(f"Cash out for controller {controller_id}.")
controller.stop()
self.cashing_out = True
def check_manual_cash_out(self):
for controller_id, controller in self.controllers.items():
if controller.config.manual_kill_switch and controller.status == RunnableStatus.RUNNING:
self.logger().info(f"Manual cash out for controller {controller_id}.")
controller.stop()
executors_to_stop = self.get_executors_by_controller(controller_id)
self.executor_orchestrator.execute_actions(
[StopExecutorAction(executor_id=executor.id,
controller_id=executor.controller_id) for executor in executors_to_stop])
if not controller.config.manual_kill_switch and controller.status == RunnableStatus.TERMINATED:
if controller_id in self.drawdown_exited_controllers:
continue
self.logger().info(f"Restarting controller {controller_id}.")
controller.start()
def check_executors_status(self):
active_executors = self.filter_executors(
executors=self.get_all_executors(),
filter_func=lambda executor: executor.status == RunnableStatus.RUNNING
)
if not active_executors:
self.logger().info("All executors have finalized their execution. Stopping the strategy.")
HummingbotApplication.main_application().stop()
else:
non_trading_executors = self.filter_executors(
executors=active_executors,
filter_func=lambda executor: not executor.is_trading
)
self.executor_orchestrator.execute_actions(
[StopExecutorAction(executor_id=executor.id,
controller_id=executor.controller_id) for executor in non_trading_executors])
def create_actions_proposal(self) -> List[CreateExecutorAction]:
return []
def stop_actions_proposal(self) -> List[StopExecutorAction]:
return []
def apply_initial_setting(self):
connectors_position_mode = {}
for controller_id, controller in self.controllers.items():
self.max_pnl_by_controller[controller_id] = Decimal("0")
config_dict = controller.config.dict()
if "connector_name" in config_dict:
if self.is_perpetual(config_dict["connector_name"]):
if "position_mode" in config_dict:
connectors_position_mode[config_dict["connector_name"]] = config_dict["position_mode"]
if "leverage" in config_dict:
self.connectors[config_dict["connector_name"]].set_leverage(leverage=config_dict["leverage"],
trading_pair=config_dict["trading_pair"])
for connector_name, position_mode in connectors_position_mode.items():
self.connectors[connector_name].set_position_mode(position_mode)