2025-12-08 22:32:07 +01:00
|
|
|
# Copyright (c) Paillat-dev
|
2025-12-08 13:28:28 +01:00
|
|
|
# SPDX-License-Identifier: MIT
|
|
|
|
|
|
|
|
|
|
import asyncio
|
|
|
|
|
import logging
|
|
|
|
|
import time
|
|
|
|
|
import urllib.parse
|
|
|
|
|
from collections.abc import AsyncIterator, Callable, Coroutine
|
|
|
|
|
from contextlib import asynccontextmanager
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
from typing import TYPE_CHECKING
|
|
|
|
|
|
|
|
|
|
import moviepy
|
|
|
|
|
import numpy as np
|
|
|
|
|
import playwright.async_api
|
|
|
|
|
from aiofiles import tempfile
|
|
|
|
|
|
|
|
|
|
from .manager import RendererManager
|
|
|
|
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
|
|
|
from .flag import Flag
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger("bot").getChild("flag_renderer")
|
|
|
|
|
|
|
|
|
|
GIF_HEIGHT: int = 256
|
|
|
|
|
|
|
|
|
|
# autocrop
|
|
|
|
|
GREENSCREEN_COLOR: tuple[int, int, int] = (26, 26, 30)
|
|
|
|
|
EDGE_IGNORE_PERCENT: float = 0.25
|
|
|
|
|
BOUND_MARGIN_PERCENT: float = 0.35 # Margin for sides and top
|
|
|
|
|
BOUND_MARGIN_BOTTOM_PERCENT: float = 0.5 # Extra margin for bottom (flagpole)
|
|
|
|
|
AUTO_CROP_THRESHOLD = 0.02
|
|
|
|
|
|
2025-12-08 23:51:16 +01:00
|
|
|
CLICK_TIMEOUT: float = 4 * 1000
|
|
|
|
|
|
2025-12-08 13:28:28 +01:00
|
|
|
LOAD_TIME_BONUS: float = +0.6
|
|
|
|
|
|
|
|
|
|
FILE_BUTTON_XPATH: str = (
|
|
|
|
|
"xpath=/html/body/div[1]/div[1]/main/section/div[1]/div/div/fieldset/div/div[1]/fieldset/div/div[2]/label"
|
|
|
|
|
)
|
|
|
|
|
INPUT_FILE_BTN_XPATH: str = "xpath=/html/body/div[1]/div[1]/main/section/div[1]/div/div/fieldset/div/div[2]/div/div"
|
|
|
|
|
RESET_CAMERA_TEXT: str = "Reset camera"
|
|
|
|
|
THEATER_MODE_TEXT: str = "Theater mode"
|
|
|
|
|
SITE_PANEL_BTN_XPATH: str = "xpath=/html/body/div[1]/div[1]/header/div/div[2]/button"
|
|
|
|
|
WIND_CONTROL_TEXT: str = "Wind control"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class FlagRenderer:
|
|
|
|
|
"""Base class for renderers that provides common rendering methods."""
|
|
|
|
|
|
|
|
|
|
def __init__(self, renderer_manager: RendererManager, flagwaver_url: str) -> None:
|
|
|
|
|
self.renderer_manager: RendererManager = renderer_manager
|
|
|
|
|
self.flagwaver_url: str = flagwaver_url
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def _is_greenscreen(pixel: np.ndarray, tolerance: int = 60) -> bool:
|
|
|
|
|
"""Check if a pixel is close to the greenscreen color."""
|
|
|
|
|
return (
|
|
|
|
|
abs(int(pixel[0]) - GREENSCREEN_COLOR[0]) < tolerance
|
|
|
|
|
and abs(int(pixel[1]) - GREENSCREEN_COLOR[1]) < tolerance
|
|
|
|
|
and abs(int(pixel[2]) - GREENSCREEN_COLOR[2]) < tolerance
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
async def _render_url_to_video(
|
|
|
|
|
self,
|
|
|
|
|
url_params: dict[str, str],
|
|
|
|
|
temp_dir: str,
|
|
|
|
|
viewport: dict[str, int] | None = None,
|
|
|
|
|
device_scale_factor: int = 2,
|
|
|
|
|
wait_until: str = "networkidle",
|
|
|
|
|
wait_for: float = 1.0,
|
|
|
|
|
wait_for_selector: str | None = None,
|
|
|
|
|
duration: float = 6.0,
|
|
|
|
|
exec_page: Callable[[playwright.async_api.Page], Coroutine[None, None, None]] | None = None,
|
|
|
|
|
) -> Path:
|
|
|
|
|
"""Render the HTML content to a gif and return the gif path.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
url_params: The URL parameters to pass to Flagwaver.
|
|
|
|
|
temp_dir: The temporary directory to store the video.
|
|
|
|
|
viewport: The viewport size for the page.
|
|
|
|
|
device_scale_factor: The device scale factor for high DPI rendering.
|
|
|
|
|
wait_until: The event to wait for before rendering.
|
|
|
|
|
wait_for: Additional time to wait for JS execution.
|
|
|
|
|
wait_for_selector: The CSS selector to wait for before rendering.$
|
|
|
|
|
duration: The duration of the video to capture.
|
|
|
|
|
exec_page: A coroutine to execute on the page before rendering.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
The path to the rendered gif.
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
if not self.renderer_manager.browser:
|
|
|
|
|
raise RuntimeError("Browser has not been initialized. Call 'start()' on RendererManager first.")
|
|
|
|
|
|
|
|
|
|
context = await self.renderer_manager.browser.new_context(
|
|
|
|
|
viewport=viewport, # ty:ignore[invalid-argument-type]
|
|
|
|
|
device_scale_factor=device_scale_factor,
|
|
|
|
|
record_video_dir=temp_dir,
|
|
|
|
|
record_video_size=viewport, # ty:ignore[invalid-argument-type]
|
|
|
|
|
)
|
|
|
|
|
start_time = time.time()
|
|
|
|
|
try:
|
|
|
|
|
page = await context.new_page()
|
|
|
|
|
try:
|
|
|
|
|
encoded_url_params = urllib.parse.urlencode(url_params)
|
|
|
|
|
await page.goto(f"{self.flagwaver_url}?{encoded_url_params}", wait_until=wait_until) # ty:ignore[invalid-argument-type]
|
|
|
|
|
if wait_for_selector:
|
|
|
|
|
await page.wait_for_selector(wait_for_selector, timeout=5000)
|
|
|
|
|
|
|
|
|
|
await page.wait_for_timeout(wait_for)
|
|
|
|
|
if exec_page:
|
|
|
|
|
await exec_page(page)
|
|
|
|
|
load_time = (time.time() - start_time) + LOAD_TIME_BONUS
|
|
|
|
|
logger.debug(f"Page loaded in {load_time:.2f} seconds")
|
|
|
|
|
await asyncio.sleep(duration + LOAD_TIME_BONUS)
|
|
|
|
|
finally:
|
|
|
|
|
await page.close()
|
|
|
|
|
finally:
|
|
|
|
|
await context.close()
|
|
|
|
|
video_path = await page.video.path() # ty:ignore[possibly-missing-attribute]
|
|
|
|
|
return await asyncio.to_thread(self._manipulate_video, Path(video_path), trim_time=load_time)
|
|
|
|
|
|
|
|
|
|
def _detect_flag_bounds(self, frame: np.ndarray, tolerance: int = 60) -> tuple[int, int, int, int]:
|
|
|
|
|
"""Detect the flag boundaries in a frame by scanning for non-greenscreen pixels.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
frame: A numpy array representing the frame (H, W, C) in RGB format.
|
|
|
|
|
tolerance: Color tolerance for greenscreen detection.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
A tuple (x_min, y_min, x_max, y_max) representing the detected bounds.
|
|
|
|
|
"""
|
|
|
|
|
height, width = frame.shape[:2]
|
|
|
|
|
|
|
|
|
|
# Sample background colors from corners for diagnostics
|
|
|
|
|
corner_samples = [
|
|
|
|
|
("top-left", frame[10, 10]),
|
|
|
|
|
("top-right", frame[10, width - 10]),
|
|
|
|
|
("bottom-left", frame[height - 10, 10]),
|
|
|
|
|
("bottom-right", frame[height - 10, width - 10]),
|
|
|
|
|
("center-top", frame[10, width // 2]),
|
|
|
|
|
]
|
|
|
|
|
logger.debug("Background color samples:")
|
|
|
|
|
for label, pixel in corner_samples:
|
|
|
|
|
logger.debug(f" {label}: RGB{tuple(pixel)}")
|
|
|
|
|
|
|
|
|
|
ignore_x = int(width * EDGE_IGNORE_PERCENT)
|
|
|
|
|
ignore_y = int(height * EDGE_IGNORE_PERCENT)
|
|
|
|
|
|
|
|
|
|
search_left = ignore_x
|
|
|
|
|
search_right = width - ignore_x
|
|
|
|
|
search_top = ignore_y
|
|
|
|
|
search_bottom = height - ignore_y
|
|
|
|
|
|
|
|
|
|
left_bound = search_left
|
|
|
|
|
for x in range(search_left, search_right):
|
|
|
|
|
column = frame[search_top:search_bottom, x]
|
|
|
|
|
non_white_ratio = sum(1 for pixel in column if not self._is_greenscreen(pixel, tolerance)) / len(column)
|
|
|
|
|
if non_white_ratio >= AUTO_CROP_THRESHOLD:
|
|
|
|
|
left_bound = x
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
right_bound = search_right
|
|
|
|
|
for x in range(search_right - 1, search_left, -1):
|
|
|
|
|
column = frame[search_top:search_bottom, x]
|
|
|
|
|
non_white_ratio = sum(1 for pixel in column if not self._is_greenscreen(pixel, tolerance)) / len(column)
|
|
|
|
|
if non_white_ratio >= AUTO_CROP_THRESHOLD:
|
|
|
|
|
right_bound = x
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
top_bound = search_top
|
|
|
|
|
for y in range(search_top, search_bottom):
|
|
|
|
|
row = frame[y, search_left:search_right]
|
|
|
|
|
non_white_ratio = sum(1 for pixel in row if not self._is_greenscreen(pixel, tolerance)) / len(row)
|
|
|
|
|
if non_white_ratio >= AUTO_CROP_THRESHOLD:
|
|
|
|
|
top_bound = y
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
bottom_bound = search_bottom
|
|
|
|
|
for y in range(search_bottom - 1, search_top, -1):
|
|
|
|
|
row = frame[y, search_left:search_right]
|
|
|
|
|
non_white_ratio = sum(1 for pixel in row if not self._is_greenscreen(pixel, tolerance)) / len(row)
|
|
|
|
|
if non_white_ratio >= AUTO_CROP_THRESHOLD:
|
|
|
|
|
bottom_bound = y
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
detected_width = right_bound - left_bound
|
|
|
|
|
detected_height = bottom_bound - top_bound
|
|
|
|
|
margin_x = int(detected_width * BOUND_MARGIN_PERCENT)
|
|
|
|
|
margin_y_top = int(detected_height * BOUND_MARGIN_PERCENT) # Regular margin for top
|
|
|
|
|
margin_y_bottom = int(detected_height * BOUND_MARGIN_BOTTOM_PERCENT) # Extra margin for bottom (flagpole)
|
|
|
|
|
|
|
|
|
|
x_min = max(0, left_bound - margin_x)
|
|
|
|
|
y_min = max(0, top_bound - margin_y_top)
|
|
|
|
|
x_max = min(width, right_bound + margin_x)
|
|
|
|
|
y_max = min(height, bottom_bound + margin_y_bottom)
|
|
|
|
|
|
|
|
|
|
logger.debug(f"Margins: top={margin_y_top}, bottom={margin_y_bottom}, sides={margin_x}")
|
|
|
|
|
|
|
|
|
|
logger.debug(f"Detected flag bounds: ({x_min}, {y_min}) to ({x_max}, {y_max})")
|
|
|
|
|
logger.debug(f"Detected size: {x_max - x_min}x{y_max - y_min}")
|
|
|
|
|
|
|
|
|
|
return x_min, y_min, x_max, y_max
|
|
|
|
|
|
|
|
|
|
def _manipulate_video(self, path: Path, trim_time: float) -> Path:
|
|
|
|
|
new_path = path.parent / f"{path.stem}_cropped.gif"
|
|
|
|
|
|
|
|
|
|
clip = moviepy.VideoFileClip(path)
|
|
|
|
|
if trim_time >= clip.duration:
|
|
|
|
|
logger.warning(f"trim_time ({trim_time}s) exceeds clip duration ({clip.duration}s), using 0")
|
|
|
|
|
trim_time = 0
|
|
|
|
|
|
|
|
|
|
sample_time = min(trim_time + 0.5, clip.duration - 0.1)
|
|
|
|
|
sample_frame = clip.get_frame(sample_time) # Returns RGB numpy array
|
|
|
|
|
|
|
|
|
|
x_min, y_min, x_max, y_max = self._detect_flag_bounds(sample_frame)
|
|
|
|
|
|
|
|
|
|
crop_width = x_max - x_min
|
|
|
|
|
crop_height = y_max - y_min
|
|
|
|
|
|
|
|
|
|
x_center = (x_min + x_max) // 2
|
|
|
|
|
y_center = (y_min + y_max) // 2
|
|
|
|
|
|
|
|
|
|
logger.debug(f"Cropping to detected bounds: {crop_width}x{crop_height} at center ({x_center}, {y_center})")
|
|
|
|
|
|
|
|
|
|
crop = moviepy.vfx.Crop(width=crop_width, height=crop_height, x_center=x_center, y_center=y_center)
|
|
|
|
|
clip = clip.with_effects([crop])
|
|
|
|
|
|
|
|
|
|
clip = clip[trim_time:]
|
|
|
|
|
clip.write_gif(new_path)
|
|
|
|
|
logger.debug(f"Cropped video with transparency to {new_path}")
|
|
|
|
|
return new_path
|
|
|
|
|
|
|
|
|
|
async def _setup_ui(self, page: playwright.async_api.Page) -> None:
|
2025-12-09 00:04:59 +01:00
|
|
|
try:
|
|
|
|
|
side_panel_btn = page.locator(SITE_PANEL_BTN_XPATH)
|
|
|
|
|
await side_panel_btn.click(timeout=CLICK_TIMEOUT)
|
2025-12-08 13:28:28 +01:00
|
|
|
|
2025-12-09 00:04:59 +01:00
|
|
|
wind_control_btn = page.get_by_text(WIND_CONTROL_TEXT)
|
|
|
|
|
await wind_control_btn.click(timeout=CLICK_TIMEOUT)
|
2025-12-08 13:28:28 +01:00
|
|
|
|
2025-12-09 00:04:59 +01:00
|
|
|
await side_panel_btn.click(timeout=CLICK_TIMEOUT)
|
2025-12-08 13:28:28 +01:00
|
|
|
|
2025-12-09 00:04:59 +01:00
|
|
|
reset_camera_btn = page.get_by_text(RESET_CAMERA_TEXT)
|
|
|
|
|
await reset_camera_btn.click(timeout=CLICK_TIMEOUT, force=True)
|
2025-12-08 13:28:28 +01:00
|
|
|
|
2025-12-09 00:04:59 +01:00
|
|
|
theater_mode_btn = page.get_by_text(THEATER_MODE_TEXT)
|
|
|
|
|
await theater_mode_btn.click(timeout=CLICK_TIMEOUT, force=True)
|
|
|
|
|
except playwright.async_api.TimeoutError:
|
|
|
|
|
logger.exception("Failed to setup the UI")
|
|
|
|
|
logger.debug("Page content: %s", await page.content())
|
2025-12-08 13:28:28 +01:00
|
|
|
|
|
|
|
|
@asynccontextmanager
|
|
|
|
|
async def render(self, flag: "Flag") -> AsyncIterator[Path]:
|
|
|
|
|
async with tempfile.TemporaryDirectory() as temp_dir:
|
|
|
|
|
yield await self._render_url_to_video(
|
|
|
|
|
flag.to_url_params(),
|
|
|
|
|
temp_dir=temp_dir,
|
|
|
|
|
exec_page=self._setup_ui,
|
|
|
|
|
viewport={"width": 1280, "height": 720},
|
|
|
|
|
)
|