🎉 First commit

This commit is contained in:
Paillat
2025-12-08 13:28:28 +01:00
committed by Paillat-dev
commit dd77e3d433
26 changed files with 2294 additions and 0 deletions

43
src/__main__.py Normal file
View File

@@ -0,0 +1,43 @@
# Copyright (c) NiceBots
# SPDX-License-Identifier: MIT
import os
import sys
sys.path.append(os.path.dirname(__file__))
import asyncio
import logging
from pathlib import Path
from discord import Intents
from pycord_rest import App
from commands.error_handling import ErrorHandler
from commands.flag_gen import FlaggerCommands
from config import CONFIG
from http_server import HttpServer
from renderer.base import FlagRenderer
from renderer.manager import RendererManager
logging.basicConfig(level=logging.DEBUG)
intents = Intents.default()
app = App(intents=intents, auto_sync_commands=False)
FLAGWAVER_PATH = Path(__file__).parent / "flagwaver" / "dist"
async def main() -> None:
async with (
RendererManager(num_workers=CONFIG.num_workers) as manager,
HttpServer(port=CONFIG.flagwaver_http_port, path=FLAGWAVER_PATH),
):
renderer = FlagRenderer(manager, f"http://localhost:{CONFIG.flagwaver_http_port}")
app.add_cog(FlaggerCommands(app, manager, renderer))
app.add_cog(ErrorHandler(app))
await app.start(token=CONFIG.token, public_key=CONFIG.public_key, uvicorn_options={"host": CONFIG.uvicorn_host})
if __name__ == "__main__":
asyncio.run(main())

3
src/commands/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
# Copyright (c) NiceBots
# SPDX-License-Identifier: MIT

View File

@@ -0,0 +1,32 @@
# Copyright (c) NiceBots
# SPDX-License-Identifier: MIT
from typing import TYPE_CHECKING
import discord
from discord import ui
if TYPE_CHECKING:
from pycord_rest import App
class ErrorHandling(ui.DesignerView):
def __init__(self, error_message: str) -> None:
container = ui.Container(color=discord.Color.red())
container.add_text("## Oops... An error occurred")
container.add_text(f"```\n{error_message}\n```")
super().__init__(container, store=False)
class ErrorHandler(discord.Cog):
def __init__(self, app: "App") -> None:
self.app: App = app
super().__init__()
@discord.Cog.listener()
async def on_application_command_error(self, ctx: discord.ApplicationContext, error: Exception) -> None:
await ctx.respond(view=ErrorHandling(str(error)), ephemeral=True)
__all__ = ("ErrorHandler",)

65
src/commands/flag_gen.py Normal file
View File

@@ -0,0 +1,65 @@
# Copyright (c) NiceBots
# SPDX-License-Identifier: MIT
from typing import TYPE_CHECKING
import discord
from discord import ui
from renderer.flag import Flag
if TYPE_CHECKING:
from pycord_rest import App
from renderer.base import FlagRenderer
from renderer.manager import RendererManager
class FlagDisplayView(ui.DesignerView):
def __init__(self, image: discord.File) -> None:
container = ui.Container()
container.add_text("## Your Flag is Ready!")
container.add_gallery(discord.MediaGalleryItem(f"attachment://{image.filename}")) # ty:ignore[invalid-argument-type]
super().__init__(container, store=False)
class FlaggerCommands(discord.Cog):
def __init__(self, app: "App", manager: "RendererManager", renderer: "FlagRenderer") -> None:
self.app: App = app
self.manager: RendererManager = manager
self.renderer: FlagRenderer = renderer
super().__init__()
async def handle_flag_command(self, ctx: discord.ApplicationContext, image_url: str) -> None:
async with self.manager.render_context_manager(self.renderer.render, Flag(image_url)) as gif_path: # ty: ignore[invalid-argument-type]
file = discord.File(gif_path, filename=gif_path.name)
await ctx.respond(view=FlagDisplayView(file), files=[file])
flag = discord.SlashCommandGroup("flag", "Commands related to flag rendering.")
@flag.command(name="user", description="Render a user's flag.")
async def user(self, ctx: discord.ApplicationContext, user: discord.Member | None = None) -> None:
target = user or ctx.author
if target.display_avatar.is_animated():
asset = target.display_avatar.with_format("gif")
else:
asset = target.display_avatar.with_format("png")
await ctx.defer()
await self.handle_flag_command(ctx, asset.url)
@flag.command(name="custom", description="Render a custom flag from an image attachment.")
async def custom_flag(self, ctx: discord.ApplicationContext, attachment: discord.Attachment) -> None:
if not attachment.content_type or not attachment.content_type.startswith("image/"):
await ctx.respond("Please provide a valid image attachment.", ephemeral=True)
return
if attachment.content_type not in {"image/gif", "image/png", "image/jpeg"}:
await ctx.respond("Unsupported image format. Please provide a PNG, JPEG, or GIF image.", ephemeral=True)
return
await ctx.defer()
await self.handle_flag_command(ctx, attachment.url)
__all__ = ("FlaggerCommands",)

32
src/config.py Normal file
View File

@@ -0,0 +1,32 @@
# Copyright (c) NiceBots
# SPDX-License-Identifier: MIT
import os
from pydantic import BaseModel
try:
from dotenv import load_dotenv
except ImportError:
pass
else:
load_dotenv()
class Config(BaseModel):
token: str
public_key: str
num_workers: int = 1
flagwaver_http_port: int = 8910
uvicorn_host: str = "0.0.0.0" # noqa: S104
CONFIG = Config(
token=os.environ["DISCORD_TOKEN"],
public_key=os.environ["DISCORD_PUBLIC_KEY"],
num_workers=int(os.getenv("FLAGGER_RENDERER_WORKERS", "2")),
flagwaver_http_port=int(os.getenv("FLAGWAVER_HTTP_PORT", "8910")),
uvicorn_host=os.getenv("UVICORN_HOST", "0.0.0.0"), # noqa: S104
)
__all__ = ["CONFIG"]

1
src/flagwaver Submodule

Submodule src/flagwaver added at a15a24196d

39
src/http_server.py Normal file
View File

@@ -0,0 +1,39 @@
# Copyright (c) NiceBots
# SPDX-License-Identifier: MIT
import asyncio
import contextlib
from pathlib import Path
from typing import Self
class HttpServer:
def __init__(self, port: int, path: Path) -> None:
self.port = port
self.path = path
self.process: asyncio.subprocess.Process | None = None
async def start(self) -> None:
self.process = await asyncio.create_subprocess_shell(
f"python -m http.server {self.port} --directory {self.path}"
)
await asyncio.sleep(1) # Give the server a moment to start
async def stop(self) -> None:
if self.process:
self.process.terminate()
await self.process.wait()
async def __aenter__(self) -> Self:
await self.start()
return self
async def __aexit__(
self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: object
) -> None:
with contextlib.suppress(ProcessLookupError):
await self.stop()
__all__ = ["HttpServer"]

3
src/renderer/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
# Copyright (c) NiceBots
# SPDX-License-Identifier: MIT

256
src/renderer/base.py Normal file
View File

@@ -0,0 +1,256 @@
# Copyright (c) NiceBots
# SPDX-License-Identifier: MIT
import asyncio
import logging
import time
import urllib.parse
from collections.abc import AsyncIterator, Callable, Coroutine
from contextlib import asynccontextmanager
from pathlib import Path
from typing import TYPE_CHECKING
import moviepy
import numpy as np
import playwright.async_api
from aiofiles import tempfile
from .manager import RendererManager
if TYPE_CHECKING:
from .flag import Flag
logger = logging.getLogger("bot").getChild("flag_renderer")
GIF_HEIGHT: int = 256
# autocrop
GREENSCREEN_COLOR: tuple[int, int, int] = (26, 26, 30)
EDGE_IGNORE_PERCENT: float = 0.25
BOUND_MARGIN_PERCENT: float = 0.35 # Margin for sides and top
BOUND_MARGIN_BOTTOM_PERCENT: float = 0.5 # Extra margin for bottom (flagpole)
AUTO_CROP_THRESHOLD = 0.02
LOAD_TIME_BONUS: float = +0.6
FILE_BUTTON_XPATH: str = (
"xpath=/html/body/div[1]/div[1]/main/section/div[1]/div/div/fieldset/div/div[1]/fieldset/div/div[2]/label"
)
INPUT_FILE_BTN_XPATH: str = "xpath=/html/body/div[1]/div[1]/main/section/div[1]/div/div/fieldset/div/div[2]/div/div"
RESET_CAMERA_TEXT: str = "Reset camera"
THEATER_MODE_TEXT: str = "Theater mode"
SITE_PANEL_BTN_XPATH: str = "xpath=/html/body/div[1]/div[1]/header/div/div[2]/button"
WIND_CONTROL_TEXT: str = "Wind control"
class FlagRenderer:
"""Base class for renderers that provides common rendering methods."""
def __init__(self, renderer_manager: RendererManager, flagwaver_url: str) -> None:
self.renderer_manager: RendererManager = renderer_manager
self.flagwaver_url: str = flagwaver_url
@staticmethod
def _is_greenscreen(pixel: np.ndarray, tolerance: int = 60) -> bool:
"""Check if a pixel is close to the greenscreen color."""
return (
abs(int(pixel[0]) - GREENSCREEN_COLOR[0]) < tolerance
and abs(int(pixel[1]) - GREENSCREEN_COLOR[1]) < tolerance
and abs(int(pixel[2]) - GREENSCREEN_COLOR[2]) < tolerance
)
async def _render_url_to_video(
self,
url_params: dict[str, str],
temp_dir: str,
viewport: dict[str, int] | None = None,
device_scale_factor: int = 2,
wait_until: str = "networkidle",
wait_for: float = 1.0,
wait_for_selector: str | None = None,
duration: float = 6.0,
exec_page: Callable[[playwright.async_api.Page], Coroutine[None, None, None]] | None = None,
) -> Path:
"""Render the HTML content to a gif and return the gif path.
Args:
url_params: The URL parameters to pass to Flagwaver.
temp_dir: The temporary directory to store the video.
viewport: The viewport size for the page.
device_scale_factor: The device scale factor for high DPI rendering.
wait_until: The event to wait for before rendering.
wait_for: Additional time to wait for JS execution.
wait_for_selector: The CSS selector to wait for before rendering.$
duration: The duration of the video to capture.
exec_page: A coroutine to execute on the page before rendering.
Returns:
The path to the rendered gif.
"""
if not self.renderer_manager.browser:
raise RuntimeError("Browser has not been initialized. Call 'start()' on RendererManager first.")
context = await self.renderer_manager.browser.new_context(
viewport=viewport, # ty:ignore[invalid-argument-type]
device_scale_factor=device_scale_factor,
record_video_dir=temp_dir,
record_video_size=viewport, # ty:ignore[invalid-argument-type]
)
start_time = time.time()
try:
page = await context.new_page()
try:
encoded_url_params = urllib.parse.urlencode(url_params)
await page.goto(f"{self.flagwaver_url}?{encoded_url_params}", wait_until=wait_until) # ty:ignore[invalid-argument-type]
if wait_for_selector:
await page.wait_for_selector(wait_for_selector, timeout=5000)
await page.wait_for_timeout(wait_for)
if exec_page:
await exec_page(page)
load_time = (time.time() - start_time) + LOAD_TIME_BONUS
logger.debug(f"Page loaded in {load_time:.2f} seconds")
await asyncio.sleep(duration + LOAD_TIME_BONUS)
finally:
await page.close()
finally:
await context.close()
video_path = await page.video.path() # ty:ignore[possibly-missing-attribute]
return await asyncio.to_thread(self._manipulate_video, Path(video_path), trim_time=load_time)
def _detect_flag_bounds(self, frame: np.ndarray, tolerance: int = 60) -> tuple[int, int, int, int]:
"""Detect the flag boundaries in a frame by scanning for non-greenscreen pixels.
Args:
frame: A numpy array representing the frame (H, W, C) in RGB format.
tolerance: Color tolerance for greenscreen detection.
Returns:
A tuple (x_min, y_min, x_max, y_max) representing the detected bounds.
"""
height, width = frame.shape[:2]
# Sample background colors from corners for diagnostics
corner_samples = [
("top-left", frame[10, 10]),
("top-right", frame[10, width - 10]),
("bottom-left", frame[height - 10, 10]),
("bottom-right", frame[height - 10, width - 10]),
("center-top", frame[10, width // 2]),
]
logger.debug("Background color samples:")
for label, pixel in corner_samples:
logger.debug(f" {label}: RGB{tuple(pixel)}")
ignore_x = int(width * EDGE_IGNORE_PERCENT)
ignore_y = int(height * EDGE_IGNORE_PERCENT)
search_left = ignore_x
search_right = width - ignore_x
search_top = ignore_y
search_bottom = height - ignore_y
left_bound = search_left
for x in range(search_left, search_right):
column = frame[search_top:search_bottom, x]
non_white_ratio = sum(1 for pixel in column if not self._is_greenscreen(pixel, tolerance)) / len(column)
if non_white_ratio >= AUTO_CROP_THRESHOLD:
left_bound = x
break
right_bound = search_right
for x in range(search_right - 1, search_left, -1):
column = frame[search_top:search_bottom, x]
non_white_ratio = sum(1 for pixel in column if not self._is_greenscreen(pixel, tolerance)) / len(column)
if non_white_ratio >= AUTO_CROP_THRESHOLD:
right_bound = x
break
top_bound = search_top
for y in range(search_top, search_bottom):
row = frame[y, search_left:search_right]
non_white_ratio = sum(1 for pixel in row if not self._is_greenscreen(pixel, tolerance)) / len(row)
if non_white_ratio >= AUTO_CROP_THRESHOLD:
top_bound = y
break
bottom_bound = search_bottom
for y in range(search_bottom - 1, search_top, -1):
row = frame[y, search_left:search_right]
non_white_ratio = sum(1 for pixel in row if not self._is_greenscreen(pixel, tolerance)) / len(row)
if non_white_ratio >= AUTO_CROP_THRESHOLD:
bottom_bound = y
break
detected_width = right_bound - left_bound
detected_height = bottom_bound - top_bound
margin_x = int(detected_width * BOUND_MARGIN_PERCENT)
margin_y_top = int(detected_height * BOUND_MARGIN_PERCENT) # Regular margin for top
margin_y_bottom = int(detected_height * BOUND_MARGIN_BOTTOM_PERCENT) # Extra margin for bottom (flagpole)
x_min = max(0, left_bound - margin_x)
y_min = max(0, top_bound - margin_y_top)
x_max = min(width, right_bound + margin_x)
y_max = min(height, bottom_bound + margin_y_bottom)
logger.debug(f"Margins: top={margin_y_top}, bottom={margin_y_bottom}, sides={margin_x}")
logger.debug(f"Detected flag bounds: ({x_min}, {y_min}) to ({x_max}, {y_max})")
logger.debug(f"Detected size: {x_max - x_min}x{y_max - y_min}")
return x_min, y_min, x_max, y_max
def _manipulate_video(self, path: Path, trim_time: float) -> Path:
new_path = path.parent / f"{path.stem}_cropped.gif"
clip = moviepy.VideoFileClip(path)
if trim_time >= clip.duration:
logger.warning(f"trim_time ({trim_time}s) exceeds clip duration ({clip.duration}s), using 0")
trim_time = 0
sample_time = min(trim_time + 0.5, clip.duration - 0.1)
sample_frame = clip.get_frame(sample_time) # Returns RGB numpy array
x_min, y_min, x_max, y_max = self._detect_flag_bounds(sample_frame)
crop_width = x_max - x_min
crop_height = y_max - y_min
x_center = (x_min + x_max) // 2
y_center = (y_min + y_max) // 2
logger.debug(f"Cropping to detected bounds: {crop_width}x{crop_height} at center ({x_center}, {y_center})")
crop = moviepy.vfx.Crop(width=crop_width, height=crop_height, x_center=x_center, y_center=y_center)
clip = clip.with_effects([crop])
clip = clip[trim_time:]
clip.write_gif(new_path)
logger.debug(f"Cropped video with transparency to {new_path}")
return new_path
async def _setup_ui(self, page: playwright.async_api.Page) -> None:
side_panel_btn = page.locator(SITE_PANEL_BTN_XPATH)
await side_panel_btn.click(timeout=2 * 1000)
wind_control_btn = page.get_by_text(WIND_CONTROL_TEXT)
await wind_control_btn.click(timeout=2 * 1000)
await side_panel_btn.click(timeout=2 * 1000)
reset_camera_btn = page.get_by_text(RESET_CAMERA_TEXT)
await reset_camera_btn.click(timeout=2 * 1000, force=True)
theater_mode_btn = page.get_by_text(THEATER_MODE_TEXT)
await theater_mode_btn.click(timeout=2 * 1000, force=True)
@asynccontextmanager
async def render(self, flag: "Flag") -> AsyncIterator[Path]:
async with tempfile.TemporaryDirectory() as temp_dir:
yield await self._render_url_to_video(
flag.to_url_params(),
temp_dir=temp_dir,
exec_page=self._setup_ui,
viewport={"width": 1280, "height": 720},
)

21
src/renderer/flag.py Normal file
View File

@@ -0,0 +1,21 @@
# Copyright (c) NiceBots
# SPDX-License-Identifier: MIT
from dataclasses import dataclass
from typing import Literal
@dataclass
class Flag:
url: str
flag_pole_type: Literal["gallery"] = "gallery"
background: Literal["blue-sky", "custom"] = "custom"
backgroundcolor: str = "1a1a1e"
def to_url_params(self) -> dict[str, str]:
return {
"src": self.url,
"flagpoletype": self.flag_pole_type,
"background": self.background,
"backgroundcolor": self.backgroundcolor,
}

141
src/renderer/manager.py Normal file
View File

@@ -0,0 +1,141 @@
# Copyright (c) NiceBots
# SPDX-License-Identifier: MIT
import asyncio
import contextlib
import logging
from asyncio import Future
from collections.abc import AsyncGenerator, Awaitable, Callable
from contextlib import AbstractAsyncContextManager, asynccontextmanager
from typing import Any, Self
from playwright.async_api import Browser, Playwright, async_playwright
TaskType = tuple[
Callable[..., Awaitable[Any]],
tuple[Any, ...],
dict[str, Any],
Future[Any],
]
logger = logging.getLogger("bot").getChild("renderer_manager")
class RendererManager:
"""Manages the browser and task queue for rendering tasks."""
def __init__(self, num_workers: int = 2) -> None:
self.num_workers: int = num_workers
self.queue: asyncio.Queue[TaskType | None] = asyncio.Queue()
self.browser: Browser | None = None
self.playwright: Playwright | None = None
self.worker_tasks: list[asyncio.Task[None]] = []
logger.debug("RendererManager initialized")
async def start(self) -> None:
"""Start the browser and the worker tasks."""
logger.info("Starting the browser and worker tasks")
self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch()
self.worker_tasks = [
asyncio.create_task(self._worker(), name=f"worker-{i + 1}") for i in range(self.num_workers)
]
logger.info(f"{self.num_workers} worker tasks started")
async def _worker(self) -> None:
"""Worker task that processes tasks from the queue."""
worker_name = asyncio.current_task().get_name() # ty:ignore[possibly-missing-attribute]
logger.debug(f"{worker_name} started")
while True:
task = await self.queue.get()
if task is None:
logger.info(f"{worker_name} received stop signal")
self.queue.task_done()
break
func, args, kwargs, future = task
try:
logger.debug("%s started task: %s", worker_name, getattr(func, "__name__", func))
result = await func(*args, **kwargs)
future.set_result(result)
logger.debug(
"%s completed task: %s",
worker_name,
getattr(func, "__name__", func),
)
except Exception as e:
logger.exception(
"%s encountered an error in task: %s",
worker_name,
getattr(func, "__name__", func),
)
future.set_exception(e)
self.queue.task_done()
async def render[**P, R](self, func: Callable[P, Awaitable[R]], *args: P.args, **kwargs: P.kwargs) -> R:
"""Add a rendering task to the queue and return the result."""
logger.info("Adding a rendering task to the queue: %s", getattr(func, "__name__", func))
future: Future[Any] = asyncio.get_running_loop().create_future()
await self.queue.put((func, args, kwargs, future))
return await future
@asynccontextmanager
async def render_context_manager[**P, R](
self, func: Callable[P, AbstractAsyncContextManager[R]], *args: P.args, **kwargs: P.kwargs
) -> AsyncGenerator[R]:
"""Queue a task that creates and manages an async context manager.
Usage:
async with renderer.render_context_manager(browser.new_page) as page:
await page.goto("https://example.com")
# page is automatically closed on exit
"""
logger.info("Adding a context manager task to the queue: %s", getattr(func, "__name__", func))
# Helper to create and enter the context manager
async def enter_context() -> tuple[AbstractAsyncContextManager[R], R]:
ctx_manager = func(*args, **kwargs) # No await - this returns the context manager
result = await ctx_manager.__aenter__()
return ctx_manager, result
# Queue the enter operation
future: Future[tuple[Any, R]] = asyncio.get_running_loop().create_future()
await self.queue.put((enter_context, (), {}, future))
ctx_manager, result = await future
try:
yield result
finally:
# Queue the exit operation
async def exit_context() -> None:
await ctx_manager.__aexit__(None, None, None)
exit_future: Future[None] = asyncio.get_running_loop().create_future()
await self.queue.put((exit_context, (), {}, exit_future))
await exit_future
logger.debug("Context manager exited for: %s", getattr(func, "__name__", func))
async def close(self) -> None:
"""Close the browser and stop the worker tasks."""
logger.info("Closing the browser and stopping the worker tasks")
for _ in range(self.num_workers):
await self.queue.put(None)
if self.worker_tasks:
await asyncio.gather(*self.worker_tasks)
if self.browser:
with contextlib.suppress(Exception):
await self.browser.close()
if self.playwright:
await self.playwright.stop()
logger.info("Browser closed and all worker tasks stopped")
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: object,
) -> None:
await self.close()
async def __aenter__(self) -> Self:
await self.start()
return self