Files
archived-MoviePilot-Plugins/plugins/commandexecute/__init__.py
2024-06-09 11:23:16 +08:00

172 lines
6.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import subprocess
from app.plugins import _PluginBase
from typing import Any, List, Dict, Tuple
from app.log import logger
class CommandExecute(_PluginBase):
# 插件名称
plugin_name = "命令执行器"
# 插件描述
plugin_desc = "自定义容器命令执行。"
# 插件图标
plugin_icon = "https://raw.githubusercontent.com/thsrite/MoviePilot-Plugins/main/icons/command.png"
# 插件版本
plugin_version = "1.0"
# 插件作者
plugin_author = "thsrite"
# 作者主页
author_url = "https://github.com/thsrite"
# 插件配置项ID前缀
plugin_config_prefix = "commandexecute_"
# 加载顺序
plugin_order = 30
# 可使用的用户级别
auth_level = 1
# 私有属性
_onlyonce = None
_command = None
def init_plugin(self, config: dict = None):
if config:
self._onlyonce = config.get("onlyonce")
self._command = config.get("command")
if self._onlyonce and self._command:
# 执行SQL语句
try:
for command in self._command.split("\n"):
logger.info(f"开始执行命令 {command}")
result = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
last_output = None
last_error = None
while True:
error = result.stderr.readline().decode("utf-8")
if error == '' and result.poll() is not None:
break
if error:
logger.info(error.strip())
last_error = error.strip()
while True:
output = result.stdout.readline().decode("utf-8")
if output == '' and result.poll() is not None:
break
if output:
logger.info(output.strip())
last_output = output.strip()
print(last_output if last_output else last_error)
except Exception as e:
logger.error(f"命令执行失败 {str(e)}")
return
finally:
self._onlyonce = False
self.update_config({
"onlyonce": self._onlyonce,
"command": self._command
})
def get_state(self) -> bool:
return False
@staticmethod
def get_command() -> List[Dict[str, Any]]:
pass
def get_api(self) -> List[Dict[str, Any]]:
pass
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
"""
拼装插件配置页面需要返回两块数据1、页面配置2、数据结构
"""
return [
{
'component': 'VForm',
'content': [
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'onlyonce',
'label': '执行命令'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VTextarea',
'props': {
'model': 'command',
'rows': '2',
'label': 'command命令',
'placeholder': '一行一条'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'type': 'info',
'variant': 'tonal'
},
'content': [
{
'component': 'span',
'text': '执行日志将会输出到控制台,请谨慎操作。'
}
]
}
]
}
]
}
]
}
], {
"onlyonce": False,
"command": "",
}
def get_page(self) -> List[dict]:
pass
def stop_service(self):
"""
退出插件
"""
pass