This commit is contained in:
thsrite
2023-12-29 21:36:30 +08:00
parent bf4aa99ab9
commit 9473d288bb
17 changed files with 243 additions and 258 deletions

View File

@@ -1,6 +1,5 @@
from datetime import datetime, timedelta
import os
import pytz
from app.core.config import settings
from app.plugins import _PluginBase
from typing import Any, List, Dict, Tuple, Optional
@@ -8,9 +7,21 @@ from app.log import logger
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from app.plugins.embyreporter.emby import EmbyService
from app.plugins.embyreporter.ranks_draw import RanksDraw
from app.schemas import NotificationType
from pathlib import Path
import random
from io import BytesIO
from PIL import Image
from PIL import ImageFont
from PIL import ImageDraw
import pytz
from cacheout import Cache
from datetime import datetime, timedelta
from app.utils.http import RequestUtils
cache = Cache()
class EmbyReporter(_PluginBase):
@@ -43,7 +54,12 @@ class EmbyReporter(_PluginBase):
_mp_host = None
_scheduler: Optional[BackgroundScheduler] = None
PLAYBACK_REPORTING_TYPE_MOVIE = "ItemName"
PLAYBACK_REPORTING_TYPE_TVSHOWS = "substr(ItemName,0, instr(ItemName, ' - '))"
host = None
def init_plugin(self, config: dict = None):
self.host = "http://" + settings.EMBY_HOST
# 停止现有任务
self.stop_service()
@@ -97,26 +113,27 @@ class EmbyReporter(_PluginBase):
if not self._mp_host:
return
# 初始化对象
emby = EmbyService(settings.EMBY_HOST, settings.EMBY_API_KEY)
draw = RanksDraw(emby, self._res_dir)
# 获取数据
success, movies = emby.get_report(types=emby.PLAYBACK_REPORTING_TYPE_MOVIE, days=self._days, limit=5)
if not success:
exit(movies)
success, tvshows = emby.get_report(types=emby.PLAYBACK_REPORTING_TYPE_TVSHOWS, days=self._days, limit=5)
if not success:
exit(tvshows)
# 绘制海报
draw.draw(movies, tvshows)
report_path = draw.save()
# 发送海报
if not self._type:
return
# 获取数据
success, movies = self.get_report(types=self.PLAYBACK_REPORTING_TYPE_MOVIE, days=self._days, limit=5)
if not success:
exit(movies)
logger.info(f"获取到电影 {movies}")
success, tvshows = self.get_report(types=self.PLAYBACK_REPORTING_TYPE_TVSHOWS, days=self._days, limit=5)
if not success:
exit(tvshows)
logger.info(f"获取到电视剧 {tvshows}")
# 绘制海报
report_path = self.draw(self._res_dir, movies, tvshows)
if not report_path:
logger.error("生成海报失败")
return
# 发送海报
report_text = f"🌟*过去{self._days}日观影排行*\r\n\r\n"
report_url = self._mp_host + report_path.replace("/public", "")
@@ -126,7 +143,7 @@ class EmbyReporter(_PluginBase):
self.post_message(title=report_text,
mtype=mtype,
image=report_url)
logger.info("Emby观影记录推送成功")
logger.info(f"Emby观影记录推送成功 {report_url}")
def __update_config(self):
self.update_config({
@@ -134,6 +151,7 @@ class EmbyReporter(_PluginBase):
"onlyonce": self._onlyonce,
"cron": self._cron,
"days": self._days,
"type": self._type,
"mp_host": self._mp_host,
"res_dir": self._res_dir
})
@@ -310,7 +328,7 @@ class EmbyReporter(_PluginBase):
"res_dir": "",
"days": 7,
"mp_host": "",
"type": "tg"
"type": ""
}
def get_page(self) -> List[dict]:
@@ -327,4 +345,205 @@ class EmbyReporter(_PluginBase):
self._scheduler.shutdown()
self._scheduler = None
except Exception as e:
logger.error("退出插件失败:%s" % str(e))
logger.error("退出插件失败:%s" % str(e))
def draw(self, res_path, movies, tvshows, show_count=True):
# 默认路径 默认图
if not res_path:
res_path = os.path.join(Path(__file__).parent, "res")
# 绘图文件路径初始化
bg_path = os.path.join(res_path, "bg")
mask_path = os.path.join(res_path, "cover-ranks-mask-2.png")
font_path = os.path.join(res_path, "PingFang Bold.ttf")
# 随机调取背景, 路径: res/ranks/bg/...
bg_list = os.listdir(bg_path)
bg_path = os.path.join(bg_path, bg_list[random.randint(0, len(bg_list) - 1)])
# 初始绘图对象
bg = Image.open(bg_path)
mask = Image.open(mask_path)
bg.paste(mask, (0, 0), mask)
font = ImageFont.truetype(font_path, 18)
font_small = ImageFont.truetype(font_path, 14)
font_count = ImageFont.truetype(font_path, 12)
# 合并绘制
all_ranks = movies + tvshows
index, offset_y = (0, 0)
for i in all_ranks:
try:
# 榜单项数据
user_id, item_id, item_type, name, count, duarion = tuple(i)
print(item_type, item_id, name, count)
# 图片获取,剧集主封面获取
if item_type != "Movie":
# 获取剧ID
success, data = self.items(user_id, item_id)
if not success:
continue
item_id = data["SeriesId"]
# 封面图像获取
success, data = self.primary(item_id)
if not success:
continue
# 剧集Y偏移
if index >= 5:
index = 0
offset_y = 331
# 名称显示偏移
font_offset_y = 0
temp_font = font
# 名称超出长度缩小省略
if font.getlength(name) > 110:
temp_font = font_small
font_offset_y = 4
for i in range(len(name)):
name = name[:len(name) - 1]
if font.getlength(name) <= 110:
break
name += ".."
# 绘制封面
cover = Image.open(BytesIO(data))
cover = cover.resize((108, 159))
bg.paste(cover, (73 + 145 * index, 379 + offset_y))
# 绘制 播放次数、影片名称
text = ImageDraw.Draw(bg)
if show_count:
self.draw_text_psd_style(text,
(
177 + 145 * index - font_count.getlength(str(count)),
353 + offset_y),
str(count), font_count, 126)
self.draw_text_psd_style(text, (74 + 145 * index, 542 + font_offset_y + offset_y), name, temp_font, 126)
index += 1
except Exception:
continue
if index > 0:
save_path = "/public/report.jpg"
bg.save(save_path)
return save_path
return None
@staticmethod
def draw_text_psd_style(draw, xy, text, font, tracking=0, leading=None, **kwargs):
"""
usage: draw_text_psd_style(draw, (0, 0), "Test",
tracking=-0.1, leading=32, fill="Blue")
Leading is measured from the baseline of one line of text to the
baseline of the line above it. Baseline is the invisible line on which most
letters—that is, those without descenders—sit. The default auto-leading
option sets the leading at 120% of the type size (for example, 12point
leading for 10point type).
Tracking is measured in 1/1000 em, a unit of measure that is relative to
the current type size. In a 6 point font, 1 em equals 6 points;
in a 10 point font, 1 em equals 10 points. Tracking
is strictly proportional to the current type size.
"""
def stutter_chunk(lst, size, overlap=0, default=None):
for i in range(0, len(lst), size - overlap):
r = list(lst[i:i + size])
while len(r) < size:
r.append(default)
yield r
x, y = xy
font_size = font.size
lines = text.splitlines()
if leading is None:
leading = font.size * 1.2
for line in lines:
for a, b in stutter_chunk(line, 2, 1, ' '):
w = font.getlength(a + b) - font.getlength(b)
draw.text((x, y), a, font=font, **kwargs)
x += w + (tracking / 1000) * font_size
y += leading
x = xy[0]
@cache.memoize(ttl=600)
def primary(self, item_id, width=200, height=300, quality=90, ret_url=False):
try:
url = self.host + f"/emby/Items/{item_id}/Images/Primary?maxHeight={height}&maxWidth={width}&quality={quality}"
if ret_url:
return url
resp = RequestUtils().get_res(url=url)
if resp.status_code != 204 and resp.status_code != 200:
return False, "🤕Emby 服务器连接失败!"
return True, resp.content
except Exception:
return False, "🤕Emby 服务器连接失败!"
@cache.memoize(ttl=600)
def backdrop(self, item_id, width=1920, quality=70, ret_url=False):
try:
url = self.host + f"/emby/Items/{item_id}/Images/Backdrop/0?&maxWidth={width}&quality={quality}"
if ret_url:
return url
resp = RequestUtils().get_res(url=url)
if resp.status_code != 204 and resp.status_code != 200:
return False, "🤕Emby 服务器连接失败!"
return True, resp.content
except Exception:
return False, "🤕Emby 服务器连接失败!"
@cache.memoize(ttl=600)
def logo(self, item_id, quality=70, ret_url=False):
url = self.host + f"/emby/Items/{item_id}/Images/Logo?quality={quality}"
if ret_url:
return url
resp = RequestUtils().get_res(url=url)
if resp.status_code != 204 and resp.status_code != 200:
return False, "🤕Emby 服务器连接失败!"
return True, resp.content
@cache.memoize(ttl=300)
def items(self, user_id, item_id):
try:
url = f"{self.host}/emby/Users/{user_id}/Items/{item_id}?api_key={settings.EMBY_API_KEY}"
resp = RequestUtils().get_res(url=url)
if resp.status_code != 204 and resp.status_code != 200:
return False, "🤕Emby 服务器连接失败!"
return True, resp.json()
except Exception:
return False, "🤕Emby 服务器连接失败!"
def get_report(self, types=None, user_id=None, days=7, end_date=datetime.now(pytz.timezone("Asia/Shanghai")),
limit=10):
if not types:
types = self.PLAYBACK_REPORTING_TYPE_MOVIE
sub_date = end_date - timedelta(days=days)
start_time = sub_date.strftime("%Y-%m-%d 00:00:00")
end_time = end_date.strftime("%Y-%m-%d 23:59:59")
sql = "SELECT UserId, ItemId, ItemType, "
sql += types + " AS name, "
sql += "COUNT(1) AS play_count, "
sql += "SUM(PlayDuration - PauseDuration) AS total_duarion "
sql += "FROM PlaybackActivity "
sql += f"WHERE ItemType = '{'Movie' if types == self.PLAYBACK_REPORTING_TYPE_MOVIE else 'Episode'}' "
sql += f"AND DateCreated >= '{start_time}' AND DateCreated <= '{end_time}' "
sql += "AND UserId not IN (select UserId from UserList) "
if user_id:
sql += f"AND UserId = '{user_id}' "
sql += "GROUP BY name "
sql += "ORDER BY play_count DESC "
sql += "LIMIT " + str(limit)
url = f"{self.host}/emby/user_usage_stats/submit_custom_query?api_key={settings.EMBY_API_KEY}"
data = {
"CustomQueryString": sql,
"ReplaceUserId": False
}
resp = RequestUtils().post_res(url=url, data=data)
if resp.status_code != 204 and resp.status_code != 200:
return False, "🤕Emby 服务器连接失败!"
ret = resp.json()
if len(ret["colums"]) == 0:
return False, ret["message"]
return True, ret["results"]

View File

@@ -1,109 +0,0 @@
import pytz
import requests
from cacheout import Cache
from datetime import datetime, timedelta
cache = Cache()
class EmbyService():
PLAYBACK_REPORTING_TYPE_MOVIE = "ItemName"
PLAYBACK_REPORTING_TYPE_TVSHOWS = "substr(ItemName,0, instr(ItemName, ' - '))"
def __init__(self, host, api_key):
self.host = host
self.base_url = host + "/emby/{0}?api_key=" + api_key
self.api_key = api_key
@cache.memoize(ttl=600)
def primary(self, item_id, width=200, height=300, quality=90, ret_url=False):
url = self.host + f"/emby/Items/{item_id}/Images/Primary?maxHeight={height}&maxWidth={width}&quality={quality}"
if ret_url:
return url
resp = requests.get(url)
if resp.status_code != 204 and resp.status_code != 200:
return False, "🤕Emby 服务器连接失败!"
return True, resp.content
@cache.memoize(ttl=600)
def backdrop(self, item_id, width=1920, quality=70, ret_url=False):
url = self.host + f"/emby/Items/{item_id}/Images/Backdrop/0?&maxWidth={width}&quality={quality}"
if ret_url:
return url
resp = requests.get(url)
if resp.status_code != 204 and resp.status_code != 200:
return False, "🤕Emby 服务器连接失败!"
return True, resp.content
@cache.memoize(ttl=600)
def logo(self, item_id, quality=70, ret_url=False):
url = self.host + f"/emby/Items/{item_id}/Images/Logo?quality={quality}"
if ret_url:
return url
resp = requests.get(url)
if resp.status_code != 204 and resp.status_code != 200:
return False, "🤕Emby 服务器连接失败!"
return True, resp.content
@cache.memoize(ttl=300)
def items(self, user_id, item_id):
url = self.base_url.format(f"Users/{user_id}/Items/{item_id}")
resp = requests.get(url)
if resp.status_code != 204 and resp.status_code != 200:
return False, "🤕Emby 服务器连接失败!"
return True, resp.json()
def get_report(self, types=None, user_id=None, days=7, end_date=datetime.now(pytz.timezone("Asia/Shanghai")), limit=10):
if not types:
types = self.PLAYBACK_REPORTING_TYPE_MOVIE
sub_date = end_date - timedelta(days=days)
start_time = sub_date.strftime("%Y-%m-%d 00:00:00")
end_time = end_date.strftime("%Y-%m-%d 23:59:59")
sql = "SELECT UserId, ItemId, ItemType, "
sql += types + " AS name, "
sql += "COUNT(1) AS play_count, "
sql += "SUM(PlayDuration - PauseDuration) AS total_duarion "
sql += "FROM PlaybackActivity "
sql += f"WHERE ItemType = '{'Movie' if types==self.PLAYBACK_REPORTING_TYPE_MOVIE else 'Episode'}' "
sql += f"AND DateCreated >= '{start_time}' AND DateCreated <= '{end_time}' "
sql += "AND UserId not IN (select UserId from UserList) "
if user_id:
sql += f"AND UserId = '{user_id}' "
sql += "GROUP BY name "
sql += "ORDER BY play_count DESC "
sql += "LIMIT " + str(limit)
url = self.base_url.format(f"user_usage_stats/submit_custom_query")
data = {
"CustomQueryString": sql,
"ReplaceUserId": False
}
resp = requests.post(url, data)
if resp.status_code != 204 and resp.status_code != 200:
return False, "🤕Emby 服务器连接失败!"
ret = resp.json()
if len(ret["colums"]) == 0:
return False, ret["message"]
return True, ret["results"]
class LibraryService:
def __init__(self, host, api_key):
self.base_url = host + "/Library/{0}?api_key=" + api_key
"""
Gets all user media folders
"""
@cache.memoize(ttl=600)
def folders(self):
url = self.base_url.format("SelectableMediaFolders")
resp = requests.get(url)
if resp.status_code != 204 and resp.status_code != 200:
return False, "🤕Emby 服务器连接失败!"
return True, resp.json()

View File

@@ -1,91 +0,0 @@
import os
from pathlib import Path
import pytz
import random
from io import BytesIO
from PIL import Image
from PIL import ImageFont
from PIL import ImageDraw
from datetime import datetime
from utils import draw_text_psd_style
"""
Misty 周榜海报样式
你可以根据你的需求自行封装或更改为你自己的周榜海报样式!
"""
class RanksDraw:
def __init__(self, emby, res_path):
# 默认路径 默认图
if not res_path:
res_path = os.path.join(Path(__file__).parent, "res")
# 绘图文件路径初始化
bg_path = os.path.join(res_path, "bg")
mask_path = os.path.join(res_path, "cover-ranks-mask-2.png")
font_path = os.path.join(res_path, "PingFang Bold.ttf")
# 随机调取背景, 路径: res/ranks/bg/...
bg_list = os.listdir(bg_path)
bg_path = os.path.join(bg_path, bg_list[random.randint(0, len(bg_list) - 1)])
# 初始绘图对象
self.bg = Image.open(bg_path)
mask = Image.open(mask_path)
self.bg.paste(mask, (0, 0), mask)
self.font = ImageFont.truetype(font_path, 18)
self.font_small = ImageFont.truetype(font_path, 14)
self.font_count = ImageFont.truetype(font_path, 12)
# 初始化封面对象
self.emby = emby
def draw(self, movies, tvshows, show_count=True):
# 合并绘制
all_ranks = movies + tvshows
index, offset_y = (0, 0)
for i in all_ranks:
# 榜单项数据
user_id, item_id, item_type, name, count, duarion = tuple(i)
print(item_type, item_id, name, count)
# 图片获取,剧集主封面获取
if item_type != "Movie":
# 获取剧ID
success, data = self.emby.items(user_id, item_id)
if not success:
exit(data)
item_id = data["SeriesId"]
# 封面图像获取
success, data = self.emby.primary(item_id)
if not success:
exit(data)
# 剧集Y偏移
if index >= 5:
index = 0
offset_y = 331
# 名称显示偏移
font_offset_y = 0
temp_font = self.font
# 名称超出长度缩小省略
if self.font.getlength(name) > 110:
temp_font = self.font_small
font_offset_y = 4
for i in range(len(name)):
name = name[:len(name) - 1]
if self.font.getlength(name) <= 110:
break
name += ".."
# 绘制封面
cover = Image.open(BytesIO(data))
cover = cover.resize((108, 159))
self.bg.paste(cover, (73 + 145 * index, 379 + offset_y))
# 绘制 播放次数、影片名称
text = ImageDraw.Draw(self.bg)
if show_count:
draw_text_psd_style(text, (177 + 145 * index - self.font_count.getlength(str(count)), 353 + offset_y),
str(count), self.font_count, 126)
draw_text_psd_style(text, (74 + 145 * index, 542 + font_offset_y + offset_y), name, temp_font, 126)
index += 1
def save(self, save_path=os.path.join("/public", "ranks",
datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d.jpg"))):
self.bg.save(save_path)
return save_path

Binary file not shown.

Before

Width:  |  Height:  |  Size: 283 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 49 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 87 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 177 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 86 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 72 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 71 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 37 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 36 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 101 KiB

View File

@@ -1,34 +0,0 @@
def draw_text_psd_style(draw, xy, text, font, tracking=0, leading=None, **kwargs):
"""
usage: draw_text_psd_style(draw, (0, 0), "Test",
tracking=-0.1, leading=32, fill="Blue")
Leading is measured from the baseline of one line of text to the
baseline of the line above it. Baseline is the invisible line on which most
letters—that is, those without descenders—sit. The default auto-leading
option sets the leading at 120% of the type size (for example, 12point
leading for 10point type).
Tracking is measured in 1/1000 em, a unit of measure that is relative to
the current type size. In a 6 point font, 1 em equals 6 points;
in a 10 point font, 1 em equals 10 points. Tracking
is strictly proportional to the current type size.
"""
def stutter_chunk(lst, size, overlap=0, default=None):
for i in range(0, len(lst), size - overlap):
r = list(lst[i:i + size])
while len(r) < size:
r.append(default)
yield r
x, y = xy
font_size = font.size
lines = text.splitlines()
if leading is None:
leading = font.size * 1.2
for line in lines:
for a, b in stutter_chunk(line, 2, 1, ' '):
w = font.getlength(a + b) - font.getlength(b)
draw.text((x, y), a, font=font, **kwargs)
x += w + (tracking / 1000) * font_size
y += leading
x = xy[0]