diff --git a/src/display.py b/src/display.py index fc2a3fe..44d4be0 100644 --- a/src/display.py +++ b/src/display.py @@ -1,95 +1,93 @@ -#!/usr/bin/env python3 -import argparse -import fcntl -import os -import sys -from datetime import datetime -from pathlib import Path - -from PIL import Image, ImageDraw, ImageFont - -sys.path.append(str(Path(__file__).parent / "lib")) -from waveshare_epd import epd7in3e -from immich import ImmichClient, get_random_photo_of_people, get_random_photo_from_album -from homeassistant import HomeAssistantClient - -LOCK_FILE = "/tmp/frame.lock" - -IMMICH_URL = os.environ.get("IMMICH_URL", "https://immich.schmelczer.dev") -IMMICH_API_KEY = os.environ.get("IMMICH_API_KEY", "6crxVS1JLTJxsfGlzVhN2kefdL4EP7HPkkoMk9L6ZOE") - -HA_URL = os.environ.get("HA_URL", "https://homeassistant.schmelczer.dev") -HA_TOKEN = os.environ.get("HA_TOKEN", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJmZjk3OTNmOWMzOWU0YjdmYmRjYTc5YmJkMTUyODcyNSIsImlhdCI6MTc2OTIwMjg1NCwiZXhwIjoyMDg0NTYyODU0fQ.IiL_1vTrGMlOoPMksN6lAopE0aInlY_wRnL4Jc-CeBs") -HA_PRESENCE_ENTITIES = ["person.andras", "person.ruby"] - -DEFAULT_SATURATION = 1.3 -DEFAULT_CONTRAST = 1.05 -DEFAULT_GAMMA = 0.90 - -def display_image(image_path: Path, orientation: int, saturation: float, - contrast: float, gamma: float, enhance: bool) -> None: - epd = epd7in3e.EPD() - try: - epd.init() - img = Image.open(image_path).convert("RGB") - if orientation: - img = img.rotate(orientation, expand=True) - buf = epd.getbuffer(img, saturation=saturation, contrast=contrast, - gamma=gamma, enhance=enhance) - epd.display(buf) - finally: - epd.sleep() - - -def main() -> None: - parser = argparse.ArgumentParser(description="Display image on e-ink frame") - parser.add_argument("--people", default="Me,Ruby", - help="Comma-separated names for Immich search") - parser.add_argument("--album", help="Fetch from album (overrides --people)") - parser.add_argument("-o", "--orientation", type=int, choices=[0, 90, 180, 270], - default=0, help="Rotation in degrees") - parser.add_argument("--saturation", type=float, default=DEFAULT_SATURATION) - parser.add_argument("--contrast", type=float, default=DEFAULT_CONTRAST) - parser.add_argument("--gamma", type=float, default=DEFAULT_GAMMA) - parser.add_argument("--no-enhance", action="store_true") - args = parser.parse_args() - - lock_fd = open(LOCK_FILE, "w") - try: - fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) - except BlockingIOError: - print("Another instance running, skipping") - sys.exit(0) - - now = datetime.now() - print(f"Time: {now.strftime('%H:%M')}") - - if 0 <= now.hour < 7: - print("Night time, skipping") - sys.exit(0) - - ha = HomeAssistantClient(HA_URL, HA_TOKEN) - home = [e.split(".")[-1].title() for e in HA_PRESENCE_ENTITIES if ha.is_person_home(e)] - if not home: - print("No one home, skipping") - sys.exit(0) - print(f"Home: {', '.join(home)}") - - client = ImmichClient(IMMICH_URL, IMMICH_API_KEY) - if args.album: - image_path = get_random_photo_from_album(client, args.album, args.orientation) - print(f"Album: {args.album}") - else: - names = [n.strip() for n in args.people.split(",")] - image_path = get_random_photo_of_people(client, names, args.orientation) - print(f"People: {', '.join(names)}") - - try: - display_image(image_path, args.orientation, args.saturation, - args.contrast, args.gamma, not args.no_enhance) - finally: - image_path.unlink(missing_ok=True) - - -if __name__ == "__main__": - main() +#!/usr/bin/env python3 +import argparse +import fcntl +import os +import sys +from datetime import datetime +from pathlib import Path + +from PIL import Image + +sys.path.append(str(Path(__file__).parent / "lib")) +from immich import ImmichClient, get_random_photo_of_people, get_random_photo_from_album +from homeassistant import HomeAssistantClient +from overlay import format_age, format_location +# waveshare_epd is imported lazily after the lock — its epdconfig claims +# GPIO pins at import time, so two overlapping invocations would both crash +# on "GPIO busy" before reaching the flock below. + +IMMICH_URL = os.environ.get("IMMICH_URL", "https://immich.schmelczer.dev") +IMMICH_API_KEY = os.environ.get("IMMICH_API_KEY", "6crxVS1JLTJxsfGlzVhN2kefdL4EP7HPkkoMk9L6ZOE") + +HA_URL = os.environ.get("HA_URL", "https://homeassistant.schmelczer.dev") +HA_TOKEN = os.environ.get("HA_TOKEN", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJmZjk3OTNmOWMzOWU0YjdmYmRjYTc5YmJkMTUyODcyNSIsImlhdCI6MTc2OTIwMjg1NCwiZXhwIjoyMDg0NTYyODU0fQ.IiL_1vTrGMlOoPMksN6lAopE0aInlY_wRnL4Jc-CeBs") +HA_PRESENCE_ENTITIES = ["person.andras", "person.ruby"] + + +def main() -> None: + parser = argparse.ArgumentParser(description="Display image on e-ink frame") + parser.add_argument("--people", default="Me,Ruby", + help="Comma-separated names for Immich search") + parser.add_argument("--album", help="Fetch from album (overrides --people)") + parser.add_argument("-o", "--orientation", type=int, choices=[0, 90, 180, 270], + default=0, help="Rotation in degrees") + parser.add_argument("--saturation", type=float, default=1.3) + parser.add_argument("--contrast", type=float, default=1.05) + parser.add_argument("--gamma", type=float, default=0.90) + args = parser.parse_args() + + lock_fd = open("/tmp/frame.lock", "w") + try: + fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + except BlockingIOError: + print("Another instance running, skipping") + sys.exit(0) + + from waveshare_epd import epd7in3e + + now = datetime.now() + print(f"Time: {now.strftime('%H:%M')}") + if 0 <= now.hour < 7: + print("Night time, skipping") + sys.exit(0) + + ha = HomeAssistantClient(HA_URL, HA_TOKEN) + home = [e.split(".")[-1].title() for e in HA_PRESENCE_ENTITIES if ha.is_person_home(e)] + if not home: + print("No one home, skipping") + sys.exit(0) + print(f"Home: {', '.join(home)}") + + client = ImmichClient(IMMICH_URL, IMMICH_API_KEY) + if args.album: + image_path, asset = get_random_photo_from_album(client, args.album, args.orientation) + print(f"Album: {args.album}") + else: + names = [n.strip() for n in args.people.split(",")] + image_path, asset = get_random_photo_of_people(client, names, args.orientation) + print(f"People: {', '.join(names)}") + + left_text = format_age(asset) + right_text = format_location(asset) + if left_text or right_text: + print(f"Overlay: {left_text or '-'} | {right_text or '-'}") + + try: + epd = epd7in3e.EPD() + try: + epd.init() + img = Image.open(image_path).convert("RGB") + if args.orientation: + img = img.rotate(args.orientation, expand=True) + buf = epd.getbuffer(img, saturation=args.saturation, contrast=args.contrast, + gamma=args.gamma, left_text=left_text, right_text=right_text, + orientation=args.orientation) + epd.display(buf) + finally: + epd.sleep() + finally: + image_path.unlink(missing_ok=True) + + +if __name__ == "__main__": + main() diff --git a/src/lib/homeassistant.py b/src/lib/homeassistant.py index 30f3fec..1b60fc4 100644 --- a/src/lib/homeassistant.py +++ b/src/lib/homeassistant.py @@ -1,10 +1,8 @@ #!/usr/bin/env python3 import json -import time -from urllib.error import URLError -from urllib.request import Request, urlopen +from urllib.request import Request -RETRY_DELAYS = (3, 10) +from net import urlopen_with_retry class HomeAssistantClient: @@ -12,26 +10,14 @@ class HomeAssistantClient: self.base_url = base_url.rstrip("/") self.token = token - def get_state(self, entity_id: str) -> dict: - url = f"{self.base_url}/api/states/{entity_id}" - req = Request(url, headers={ - "Authorization": f"Bearer {self.token}", - "Content-Type": "application/json", - }) - last_err: Exception | None = None - for attempt in range(len(RETRY_DELAYS) + 1): - try: - with urlopen(req, timeout=30) as resp: - return json.loads(resp.read().decode()) - except (URLError, TimeoutError) as e: - last_err = e - if attempt < len(RETRY_DELAYS): - time.sleep(RETRY_DELAYS[attempt]) - raise last_err - def is_person_home(self, entity_id: str) -> bool: + req = Request( + f"{self.base_url}/api/states/{entity_id}", + headers={"Authorization": f"Bearer {self.token}"}, + ) try: - return self.get_state(entity_id).get("state") == "home" + with urlopen_with_retry(req, timeout=30) as resp: + return json.loads(resp.read().decode()).get("state") == "home" except Exception as e: print(f"Failed to check {entity_id}: {e}") return False diff --git a/src/lib/immich.py b/src/lib/immich.py index 08f3680..126a1bd 100644 --- a/src/lib/immich.py +++ b/src/lib/immich.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 -import hashlib import json import random import tempfile @@ -7,38 +6,17 @@ import time from dataclasses import dataclass from datetime import datetime, timedelta, timezone from pathlib import Path -from urllib.error import URLError -from urllib.request import Request, urlopen +from urllib.request import Request -from progress import ProgressBar +from net import urlopen_with_retry HISTORY_FILE = Path(__file__).parent.parent / "photo_history.json" -HISTORY_MAX_AGE_DAYS = 7 - CACHE_DIR = Path(tempfile.gettempdir()) / "frame_cache" -CACHE_TTL_SECONDS = 3600 - -RETRY_DELAYS = (3, 10) - - -def _urlopen_with_retry(req: Request, timeout: int = 30): - """urlopen wrapper that retries transient network failures.""" - last_err: Exception | None = None - for attempt in range(len(RETRY_DELAYS) + 1): - try: - return urlopen(req, timeout=timeout) - except (URLError, TimeoutError) as e: - last_err = e - if attempt < len(RETRY_DELAYS): - time.sleep(RETRY_DELAYS[attempt]) - raise last_err def _cache_get(key: str) -> list[dict] | None: path = CACHE_DIR / f"{key}.json" - if not path.exists(): - return None - if time.time() - path.stat().st_mtime > CACHE_TTL_SECONDS: + if not path.exists() or time.time() - path.stat().st_mtime > 3600: return None try: return json.loads(path.read_text()) @@ -57,30 +35,26 @@ class PhotoHistory: def __init__(self, path: Path = HISTORY_FILE): self.path = path self.displayed: set[str] = set() - self.created_at: datetime | None = None + self.created_at = datetime.now(timezone.utc) self._load() def _load(self) -> None: if not self.path.exists(): - self._reset() + self._save() return try: data = json.loads(self.path.read_text()) - self.created_at = datetime.fromisoformat(data.get("created_at", "")) + self.created_at = datetime.fromisoformat(data["created_at"]) if self.created_at.tzinfo is None: self.created_at = self.created_at.replace(tzinfo=timezone.utc) - if datetime.now(timezone.utc) - self.created_at > timedelta(days=HISTORY_MAX_AGE_DAYS): - print(f"Photo history expired (>{HISTORY_MAX_AGE_DAYS} days), clearing...") - self._reset() + if datetime.now(timezone.utc) - self.created_at > timedelta(days=7): + print("Photo history expired (>7 days), clearing...") + self.created_at = datetime.now(timezone.utc) + self._save() else: self.displayed = set(data.get("displayed", [])) except (json.JSONDecodeError, ValueError, KeyError): - self._reset() - - def _reset(self) -> None: - self.displayed = set() - self.created_at = datetime.now(timezone.utc) - self._save() + self._save() def _save(self) -> None: self.path.write_text(json.dumps({ @@ -96,24 +70,16 @@ class PhotoHistory: return [a for a in assets if a.get("id") not in self.displayed] -_history: PhotoHistory | None = None - - -def get_history() -> PhotoHistory: - global _history - if _history is None: - _history = PhotoHistory() - return _history - - @dataclass class ImmichClient: base_url: str api_key: str - def _request(self, method: str, endpoint: str, data: dict | None = None, - show_progress: bool = False, progress_desc: str = "Fetching") -> dict: - url = f"{self.base_url.rstrip('/')}/api/{endpoint.lstrip('/')}" + def __post_init__(self): + self.base_url = self.base_url.rstrip("/") + + def _request(self, method: str, endpoint: str, data: dict | None = None) -> dict: + url = f"{self.base_url}/api/{endpoint.lstrip('/')}" headers = {"x-api-key": self.api_key} body = None if data is not None: @@ -121,30 +87,17 @@ class ImmichClient: body = json.dumps(data).encode() req = Request(url, data=body, headers=headers, method=method) - with _urlopen_with_retry(req, timeout=30) as resp: - total_size = resp.headers.get('Content-Length') - if total_size and show_progress: - total_size = int(total_size) - progress = ProgressBar(total_size, desc=progress_desc) - chunks = bytearray() - while chunk := resp.read(8192): - chunks.extend(chunk) - progress.update(len(chunk)) - progress.finish() - return json.loads(chunks.decode()) + with urlopen_with_retry(req, timeout=30) as resp: return json.loads(resp.read().decode()) - def get_people(self) -> list[dict]: - return self._request("GET", "/people")["people"] - def get_person_id(self, name: str) -> str | None: - for person in self.get_people(): + for person in self._request("GET", "/people")["people"]: if person["name"].lower() == name.lower(): return person["id"] return None def search_assets_by_people(self, person_ids: list[str]) -> list[dict]: - key = "people_" + hashlib.md5("_".join(sorted(person_ids)).encode()).hexdigest() + key = "people_" + "_".join(sorted(person_ids)) cached = _cache_get(key) if cached is not None: return cached @@ -167,22 +120,11 @@ class ImmichClient: _cache_set(key, items) return items - def download_asset(self, asset_id: str, dest: Path, show_progress: bool = True) -> Path: - url = f"{self.base_url.rstrip('/')}/api/assets/{asset_id}/thumbnail?size=preview" + def download_asset(self, asset_id: str, dest: Path) -> Path: + url = f"{self.base_url}/api/assets/{asset_id}/thumbnail?size=preview" req = Request(url, headers={"x-api-key": self.api_key}) - with _urlopen_with_retry(req, timeout=30) as resp: - total_size = resp.headers.get('Content-Length') - if total_size and show_progress: - total_size = int(total_size) - progress = ProgressBar(total_size, desc="Downloading") - data = bytearray() - while chunk := resp.read(8192): - data.extend(chunk) - progress.update(len(chunk)) - progress.finish() - dest.write_bytes(bytes(data)) - else: - dest.write_bytes(resp.read()) + with urlopen_with_retry(req, timeout=30) as resp: + dest.write_bytes(resp.read()) return dest def get_album_id(self, name: str) -> str | None: @@ -191,107 +133,85 @@ class ImmichClient: return album["id"] return None - def get_album_assets(self, album_id: str, show_progress: bool = False) -> list[dict]: + def get_album_assets(self, album_id: str) -> list[dict]: key = f"album_{album_id}" cached = _cache_get(key) if cached is not None: return cached - - album = self._request("GET", f"/albums/{album_id}", - show_progress=show_progress, progress_desc="Fetching album") - assets = album.get("assets", []) + assets = self._request("GET", f"/albums/{album_id}").get("assets", []) _cache_set(key, assets) return assets -def _is_portrait(asset: dict) -> bool | None: - """Check if asset displays as portrait, accounting for EXIF orientation.""" - exif = asset.get("exifInfo") or {} - width = exif.get("exifImageWidth") or 0 - height = exif.get("exifImageHeight") or 0 - if not (width and height): - return None - # EXIF orientation 6 and 8 mean 90° rotation (swap dimensions) - orientation = str(exif.get("orientation") or "1") - if orientation in ("6", "8"): - width, height = height, width - return height > width - - def _filter_by_orientation(assets: list[dict], portrait: bool) -> list[dict]: - """Filter assets by orientation, accounting for EXIF rotation.""" - filtered = [] - no_dimensions = 0 - for asset in assets: - is_portrait = _is_portrait(asset) - if is_portrait is not None: - if is_portrait == portrait: - filtered.append(asset) - else: - no_dimensions += 1 - if no_dimensions: - print(f"Note: {no_dimensions}/{len(assets)} photos missing dimension data") - return filtered + """Keep assets matching the requested orientation. Skips assets without EXIF dimensions.""" + out = [] + for a in assets: + exif = a.get("exifInfo") or {} + w = exif.get("exifImageWidth") or 0 + h = exif.get("exifImageHeight") or 0 + if not (w and h): + continue + if exif.get("orientation") in (6, 8, "6", "8"): + w, h = h, w + if (h > w) == portrait: + out.append(a) + return out def _pick_weighted_random(assets: list[dict]) -> dict: - """Pick random asset, biased towards favorites (20%) and recently added photos (50%).""" - if not assets: - raise ValueError("No assets to choose from") - + """Pick random asset, biased towards favorites and recently added photos.""" cutoff = datetime.now(timezone.utc) - timedelta(days=30) favorites = [a for a in assets if a.get("isFavorite")] recent = [] - for asset in assets: - date_str = asset.get("createdAt", "") + for a in assets: try: - if datetime.fromisoformat(date_str.replace("Z", "+00:00")) >= cutoff: - recent.append(asset) + if datetime.fromisoformat(a.get("createdAt", "").replace("Z", "+00:00")) >= cutoff: + recent.append(a) except (ValueError, AttributeError): pass - if favorites and random.random() < 0.2: - return random.choice(favorites) - if recent and random.random() < 0.5: - return random.choice(recent) - return random.choice(assets) + candidates = [(favorites, 0.2), (recent, 0.4), (assets, 0.4)] + pools, weights = zip(*[(p, w) for p, w in candidates if p]) + pool = random.choices(pools, weights=weights)[0] + return random.choice(pool) -def _download_random_asset(client: ImmichClient, assets: list[dict]) -> Path: - history = get_history() - new_assets = history.filter_new(assets) +def _pick_and_download(client: ImmichClient, assets: list[dict], + orientation: int, source_label: str) -> tuple[Path, dict]: + portrait = orientation in (90, 270) + filtered = _filter_by_orientation(assets, portrait) + if not filtered: + raise ValueError(f"No {'portrait' if portrait else 'landscape'} photos in {source_label}") - if new_assets: - print(f"Photos: {len(new_assets)} new / {len(assets)} total") - asset = _pick_weighted_random(new_assets) + history = PhotoHistory() + candidates = history.filter_new(filtered) + if not candidates: + print(f"All {len(filtered)} photos shown, picking from full list") + candidates = filtered else: - print(f"All {len(assets)} photos shown, picking from full list") - asset = _pick_weighted_random(assets) + print(f"Photos: {len(candidates)} new / {len(filtered)} total") + asset = _pick_weighted_random(candidates) dest = Path(tempfile.gettempdir()) / "immich_photo.jpg" path = client.download_asset(asset["id"], dest) history.mark_displayed(asset["id"]) - return path + return path, asset -def get_random_photo_of_people(client: ImmichClient, names: list[str], orientation: int = 0) -> Path: +def get_random_photo_of_people(client: ImmichClient, names: list[str], orientation: int = 0) -> tuple[Path, dict]: person_ids = [pid for name in names if (pid := client.get_person_id(name))] if not person_ids: raise ValueError(f"No people found: {names}") assets = client.search_assets_by_people(person_ids) - if not assets: raise ValueError(f"No photos found for: {names}") - portrait = orientation in (90, 270) - filtered = _filter_by_orientation(assets, portrait) - if not filtered: - raise ValueError(f"No {'portrait' if portrait else 'landscape'} photos available") - return _download_random_asset(client, filtered) + return _pick_and_download(client, assets, orientation, f"photos for {', '.join(names)}") -def get_random_photo_from_album(client: ImmichClient, album_name: str, orientation: int = 0) -> Path: +def get_random_photo_from_album(client: ImmichClient, album_name: str, orientation: int = 0) -> tuple[Path, dict]: album_id = client.get_album_id(album_name) if not album_id: raise ValueError(f"Album not found: {album_name}") @@ -300,8 +220,4 @@ def get_random_photo_from_album(client: ImmichClient, album_name: str, orientati if not assets: raise ValueError(f"No photos in album: {album_name}") - portrait = orientation in (90, 270) - filtered = _filter_by_orientation(assets, portrait) - if not filtered: - raise ValueError(f"No {'portrait' if portrait else 'landscape'} photos in album: {album_name}") - return _download_random_asset(client, filtered) + return _pick_and_download(client, assets, orientation, f"album: {album_name}") diff --git a/src/lib/net.py b/src/lib/net.py new file mode 100644 index 0000000..bd8a724 --- /dev/null +++ b/src/lib/net.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python3 +import time +from urllib.error import URLError +from urllib.request import Request, urlopen + +RETRY_DELAYS = (3, 10) + + +def urlopen_with_retry(req: Request, timeout: int = 30): + """urlopen wrapper that retries transient network failures.""" + last_err: Exception | None = None + for attempt in range(len(RETRY_DELAYS) + 1): + try: + return urlopen(req, timeout=timeout) + except (URLError, TimeoutError) as e: + last_err = e + if attempt < len(RETRY_DELAYS): + time.sleep(RETRY_DELAYS[attempt]) + raise last_err diff --git a/src/lib/overlay.py b/src/lib/overlay.py new file mode 100644 index 0000000..0b5cedc --- /dev/null +++ b/src/lib/overlay.py @@ -0,0 +1,106 @@ +"""Text overlay rendering for the e-ink frame. + +Paints aliased white-on-black-stroke text into the dithered palette index +array; black/white survive Atkinson dithering so edges stay crisp on e-ink. +""" + +import os +from datetime import datetime, timezone + +import numpy as np +from PIL import Image, ImageDraw, ImageFont + +FONT_CANDIDATES = ( + "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", + "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", + "/usr/share/fonts/truetype/freefont/FreeSansBold.ttf", + "/System/Library/Fonts/Helvetica.ttc", +) + +PALETTE_BLACK = 0 +PALETTE_WHITE = 1 + + +def _load_font(size: int) -> ImageFont.ImageFont: + for path in FONT_CANDIDATES: + if os.path.exists(path): + return ImageFont.truetype(path, size) + return ImageFont.load_default() + + +def format_age(asset: dict) -> str | None: + """Photo capture age as 'N days/weeks/months/years ago'.""" + exif = asset.get("exifInfo") or {} + date_str = exif.get("dateTimeOriginal") or asset.get("fileCreatedAt") + if not date_str: + return None + try: + dt = datetime.fromisoformat(date_str.replace("Z", "+00:00")) + except (ValueError, AttributeError): + return None + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + days = (datetime.now(timezone.utc) - dt).days + if days < 0: + return None + if days == 0: + return "Today" + if days == 1: + return "Yesterday" + if days < 7: + return f"{days} days ago" + for n, unit in ((365, "year"), (30, "month"), (7, "week")): + if days >= n: + count = max(1, days // n) + return f"{count} {unit}{'s' if count > 1 else ''} ago" + + +def format_location(asset: dict) -> str | None: + """Most specific location available from EXIF.""" + exif = asset.get("exifInfo") or {} + return exif.get("city") or exif.get("state") or exif.get("country") or None + + +def render_text_into_indices(indices: np.ndarray, + left_text: str | None, + right_text: str | None, + orientation: int = 0) -> None: + """Paint white-on-black-stroke text into a (height, width) palette-index array. + + Text is laid out viewer-bottom-left/right, then rotated by `orientation` + so labels land at the viewer's bottom regardless of frame mounting. + """ + font_size, margin, stroke_width = 20, 18, 2 + buffer_h, buffer_w = indices.shape + if orientation in (90, 270): + view_w, view_h = buffer_h, buffer_w + else: + view_w, view_h = buffer_w, buffer_h + + fill_layer = Image.new("L", (view_w, view_h), 0) + full_layer = Image.new("L", (view_w, view_h), 0) + fill_draw = ImageDraw.Draw(fill_layer) + full_draw = ImageDraw.Draw(full_layer) + font = _load_font(font_size) + baseline = view_h - margin + + if left_text: + pos = (margin, baseline) + fill_draw.text(pos, left_text, font=font, fill=255, anchor="lb") + full_draw.text(pos, left_text, font=font, fill=255, anchor="lb", + stroke_width=stroke_width, stroke_fill=255) + + if right_text: + pos = (view_w - margin, baseline) + fill_draw.text(pos, right_text, font=font, fill=255, anchor="rb") + full_draw.text(pos, right_text, font=font, fill=255, anchor="rb", + stroke_width=stroke_width, stroke_fill=255) + + if orientation: + fill_layer = fill_layer.rotate(orientation, expand=True) + full_layer = full_layer.rotate(orientation, expand=True) + + fill_mask = np.asarray(fill_layer) >= 128 + stroke_mask = (np.asarray(full_layer) >= 128) & ~fill_mask + indices[stroke_mask] = PALETTE_BLACK + indices[fill_mask] = PALETTE_WHITE diff --git a/src/lib/progress.py b/src/lib/progress.py index 2240e9c..98f7002 100644 --- a/src/lib/progress.py +++ b/src/lib/progress.py @@ -1,49 +1,23 @@ -"""Simple terminal progress bar for e-ink frame.""" - -import sys - - -class ProgressBar: - """Simple text-based progress bar.""" - - def __init__(self, total: int, desc: str = "", width: int = 30): - self.total = total - self.current = 0 - self.desc = desc - self.width = width - self._last_percent = -1 - - def update(self, n: int = 1) -> None: - """Update progress by n steps.""" - self.current = min(self.current + n, self.total) - self._render() - - def set(self, value: int) -> None: - """Set progress to specific value.""" - self.current = min(value, self.total) - self._render() - - def _render(self) -> None: - if self.total == 0: - return - - percent = int(100 * self.current / self.total) - if percent == self._last_percent: - return - self._last_percent = percent - - filled = int(self.width * self.current / self.total) - bar = "█" * filled + "░" * (self.width - filled) - - desc = f"{self.desc}: " if self.desc else "" - sys.stdout.write(f"\r{desc}|{bar}| {percent:3d}%") - sys.stdout.flush() - - if self.current >= self.total: - sys.stdout.write("\n") - sys.stdout.flush() - - def finish(self) -> None: - """Complete the progress bar.""" - self.current = self.total - self._render() +"""Simple terminal progress bar for e-ink frame.""" + + +class ProgressBar: + def __init__(self, total: int, desc: str = ""): + self.total = total + self.desc = desc + self._last_percent = -1 + + def set(self, value: int) -> None: + if self.total == 0: + return + value = min(value, self.total) + percent = int(100 * value / self.total) + if percent == self._last_percent: + return + self._last_percent = percent + + filled = int(30 * value / self.total) + bar = "█" * filled + "░" * (30 - filled) + end = "\n" if value >= self.total else "" + prefix = f"{self.desc}: " if self.desc else "" + print(f"\r{prefix}|{bar}| {percent:3d}%", end=end, flush=True) diff --git a/src/lib/waveshare_epd/epd7in3e.py b/src/lib/waveshare_epd/epd7in3e.py index 3c2f235..c2031d5 100644 --- a/src/lib/waveshare_epd/epd7in3e.py +++ b/src/lib/waveshare_epd/epd7in3e.py @@ -1,282 +1,238 @@ -#!/usr/bin/env python3 -# Waveshare 7.3" 6-color e-Paper driver (modified) -# Original: Waveshare team, 2022-10-20 - -import numpy as np -import cv2 -from PIL import Image, ImageEnhance -from numba import jit -from progress import ProgressBar -from . import epdconfig - -EPD_WIDTH = 800 -EPD_HEIGHT = 480 - -# 6-color e-ink encoding: indices 0,1,2,3,5,6 are wire-format colors; -# 4 is reserved/unused (filled with BLACK so nearest-color never picks it). -PALETTE_RGB = np.array([ - [0, 0, 0], # 0: BLACK - [255, 255, 255], # 1: WHITE - [255, 255, 0], # 2: YELLOW - [255, 0, 0], # 3: RED - [0, 0, 0], # 4: unused - [0, 0, 255], # 5: BLUE - [0, 255, 0], # 6: GREEN -], dtype=np.float64) - -PERCEPTUAL_WEIGHTS = np.array([0.299, 0.587, 0.114], dtype=np.float64) - - -def _enhance_for_eink(image: Image.Image, saturation: float, - contrast: float, gamma: float) -> Image.Image: - img = image.convert('RGB') - if saturation != 1.0: - img = ImageEnhance.Color(img).enhance(saturation) - if contrast != 1.0: - img = ImageEnhance.Contrast(img).enhance(contrast) - if gamma != 1.0: - lut = [int((i / 255.0) ** (1.0 / gamma) * 255) for i in range(256)] * 3 - img = img.point(lut) - return img - - -def _crop_center(image: Image.Image, target_w: int, target_h: int, - show_progress: bool = True) -> Image.Image: - if show_progress: - print("Center cropping...") - - img_cv = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) - img_h, img_w = img_cv.shape[:2] - img_aspect, target_aspect = img_w / img_h, target_w / target_h - - if img_aspect < target_aspect: - new_w, new_h = target_w, int(target_w / img_aspect) - else: - new_w, new_h = int(target_h * img_aspect), target_h - - img_cv = cv2.resize(img_cv, (new_w, new_h), interpolation=cv2.INTER_LANCZOS4) - x_off = (new_w - target_w) // 2 - y_off = (new_h - target_h) // 2 - cropped = img_cv[y_off:y_off + target_h, x_off:x_off + target_w] - return Image.fromarray(cv2.cvtColor(cropped, cv2.COLOR_BGR2RGB)) - - -@jit(nopython=True, cache=True) -def _find_nearest_color(r, g, b, palette, weights): - best_idx, best_dist = 0, 1e10 - for i in range(palette.shape[0]): - dr = (palette[i, 0] - r) * weights[0] - dg = (palette[i, 1] - g) * weights[1] - db = (palette[i, 2] - b) * weights[2] - dist = dr * dr + dg * dg + db * db - if dist < best_dist: - best_dist, best_idx = dist, i - return best_idx - - -@jit(nopython=True, cache=True) -def _atkinson_dither_rows(img, palette, weights, indices, start_row, end_row): - height, width = img.shape[:2] - for y in range(start_row, end_row): - for x in range(width): - old_r, old_g, old_b = img[y, x, 0], img[y, x, 1], img[y, x, 2] - idx = _find_nearest_color(old_r, old_g, old_b, palette, weights) - indices[y, x] = idx - new_r, new_g, new_b = palette[idx, 0], palette[idx, 1], palette[idx, 2] - - err_r, err_g, err_b = (old_r - new_r) / 8.0, (old_g - new_g) / 8.0, (old_b - new_b) / 8.0 - - if x + 1 < width: - img[y, x + 1, 0] += err_r - img[y, x + 1, 1] += err_g - img[y, x + 1, 2] += err_b - if x + 2 < width: - img[y, x + 2, 0] += err_r - img[y, x + 2, 1] += err_g - img[y, x + 2, 2] += err_b - if y + 1 < height: - if x > 0: - img[y + 1, x - 1, 0] += err_r - img[y + 1, x - 1, 1] += err_g - img[y + 1, x - 1, 2] += err_b - img[y + 1, x, 0] += err_r - img[y + 1, x, 1] += err_g - img[y + 1, x, 2] += err_b - if x + 1 < width: - img[y + 1, x + 1, 0] += err_r - img[y + 1, x + 1, 1] += err_g - img[y + 1, x + 1, 2] += err_b - if y + 2 < height: - img[y + 2, x, 0] += err_r - img[y + 2, x, 1] += err_g - img[y + 2, x, 2] += err_b - - -def _dither_atkinson(image: Image.Image, show_progress: bool = True) -> np.ndarray: - """Atkinson-dither to the e-ink palette and return a uint8 array of palette indices.""" - img = np.array(image.convert('RGB'), dtype=np.float64) - height, width = img.shape[:2] - indices = np.zeros((height, width), dtype=np.uint8) - if show_progress: - print("Dithering...") - progress = ProgressBar(height, desc="Dithering") - - chunk_size = 48 - for i in range((height + chunk_size - 1) // chunk_size): - start, end = i * chunk_size, min((i + 1) * chunk_size, height) - _atkinson_dither_rows(img, PALETTE_RGB, PERCEPTUAL_WEIGHTS, indices, start, end) - if show_progress: - progress.set(end) - - return indices - - -class EPD: - def __init__(self): - self.reset_pin = epdconfig.RST_PIN - self.dc_pin = epdconfig.DC_PIN - self.busy_pin = epdconfig.BUSY_PIN - self.cs_pin = epdconfig.CS_PIN - self.width = EPD_WIDTH - self.height = EPD_HEIGHT - - def reset(self): - epdconfig.digital_write(self.reset_pin, 1) - epdconfig.delay_ms(20) - epdconfig.digital_write(self.reset_pin, 0) - epdconfig.delay_ms(2) - epdconfig.digital_write(self.reset_pin, 1) - epdconfig.delay_ms(20) - - def send_command(self, command): - epdconfig.digital_write(self.dc_pin, 0) - epdconfig.digital_write(self.cs_pin, 0) - epdconfig.spi_writebyte([command]) - epdconfig.digital_write(self.cs_pin, 1) - - def send_data(self, data): - epdconfig.digital_write(self.dc_pin, 1) - epdconfig.digital_write(self.cs_pin, 0) - epdconfig.spi_writebyte([data]) - epdconfig.digital_write(self.cs_pin, 1) - - def send_data2(self, data): - epdconfig.digital_write(self.dc_pin, 1) - epdconfig.digital_write(self.cs_pin, 0) - epdconfig.spi_writebyte2(data) - epdconfig.digital_write(self.cs_pin, 1) - - def wait_busy(self): - while epdconfig.digital_read(self.busy_pin) == 0: - epdconfig.delay_ms(5) - - def turn_on_display(self): - self.send_command(0x04) # POWER_ON - self.wait_busy() - self.send_command(0x12) # DISPLAY_REFRESH - self.send_data(0x00) - self.wait_busy() - self.send_command(0x02) # POWER_OFF - self.send_data(0x00) - self.wait_busy() - - def init(self): - if epdconfig.module_init() != 0: - return -1 - self.reset() - self.wait_busy() - epdconfig.delay_ms(30) - - self.send_command(0xAA) - for v in [0x49, 0x55, 0x20, 0x08, 0x09, 0x18]: - self.send_data(v) - - self.send_command(0x01) - self.send_data(0x3F) - - self.send_command(0x00) - self.send_data(0x5F) - self.send_data(0x69) - - self.send_command(0x03) - for v in [0x00, 0x54, 0x00, 0x44]: - self.send_data(v) - - self.send_command(0x05) - for v in [0x40, 0x1F, 0x1F, 0x2C]: - self.send_data(v) - - self.send_command(0x06) - for v in [0x6F, 0x1F, 0x17, 0x49]: - self.send_data(v) - - self.send_command(0x08) - for v in [0x6F, 0x1F, 0x1F, 0x22]: - self.send_data(v) - - self.send_command(0x30) - self.send_data(0x03) - - self.send_command(0x50) - self.send_data(0x3F) - - self.send_command(0x60) - self.send_data(0x02) - self.send_data(0x00) - - self.send_command(0x61) - for v in [0x03, 0x20, 0x01, 0xE0]: - self.send_data(v) - - self.send_command(0x84) - self.send_data(0x01) - - self.send_command(0xE3) - self.send_data(0x2F) - - self.send_command(0x04) - self.wait_busy() - return 0 - - def getbuffer(self, image, saturation: float, contrast: float, gamma: float, - enhance: bool = True, show_progress: bool = True): - image = image.convert('RGB') - imwidth, imheight = image.size - - if imwidth != self.width or imheight != self.height: - if show_progress: - print(f"Input: {imwidth}x{imheight} → {self.width}x{self.height}") - image = _crop_center(image, self.width, self.height, show_progress) - - if enhance: - if show_progress: - print("Enhancing...") - image = _enhance_for_eink(image, saturation, contrast, gamma) - - indices = _dither_atkinson(image, show_progress) - - if show_progress: - print("Packing buffer...") - flat = indices.reshape(-1) - packed = (flat[0::2].astype(np.uint8) << 4) | flat[1::2].astype(np.uint8) - buf = packed.tolist() - - if show_progress: - print("Ready") - return buf - - def display(self, image): - self.send_command(0x10) - self.send_data2(image) - self.turn_on_display() - - def Clear(self, color=0x11): - self.send_command(0x10) - self.send_data2([color] * (self.height * self.width // 2)) - self.turn_on_display() - - def sleep(self): - self.send_command(0x07) # DEEP_SLEEP - self.send_data(0xA5) - epdconfig.delay_ms(2000) - epdconfig.module_exit() +#!/usr/bin/env python3 +# Waveshare 7.3" 6-color e-Paper driver (modified) + +import numpy as np +import cv2 +from PIL import Image, ImageEnhance +from numba import jit +from progress import ProgressBar +from overlay import render_text_into_indices +from . import epdconfig + +EPD_WIDTH = 800 +EPD_HEIGHT = 480 + +# 6-color e-ink encoding: indices 0,1,2,3,5,6 are wire-format colors; +# 4 is reserved/unused (filled with BLACK so nearest-color never picks it). +PALETTE_RGB = np.array([ + [0, 0, 0], # 0: BLACK + [255, 255, 255], # 1: WHITE + [255, 255, 0], # 2: YELLOW + [255, 0, 0], # 3: RED + [0, 0, 0], # 4: unused + [0, 0, 255], # 5: BLUE + [0, 255, 0], # 6: GREEN +], dtype=np.float64) + +PERCEPTUAL_WEIGHTS = np.array([0.299, 0.587, 0.114], dtype=np.float64) + +INIT_SEQUENCE = ( + (0xAA, [0x49, 0x55, 0x20, 0x08, 0x09, 0x18]), + (0x01, [0x3F]), + (0x00, [0x5F, 0x69]), + (0x03, [0x00, 0x54, 0x00, 0x44]), + (0x05, [0x40, 0x1F, 0x1F, 0x2C]), + (0x06, [0x6F, 0x1F, 0x17, 0x49]), + (0x08, [0x6F, 0x1F, 0x1F, 0x22]), + (0x30, [0x03]), + (0x50, [0x3F]), + (0x60, [0x02, 0x00]), + (0x61, [0x03, 0x20, 0x01, 0xE0]), + (0x84, [0x01]), + (0xE3, [0x2F]), +) + + +def _enhance_for_eink(image: Image.Image, saturation: float, + contrast: float, gamma: float) -> Image.Image: + img = image.convert('RGB') + if saturation != 1.0: + img = ImageEnhance.Color(img).enhance(saturation) + if contrast != 1.0: + img = ImageEnhance.Contrast(img).enhance(contrast) + if gamma != 1.0: + lut = [int((i / 255.0) ** (1.0 / gamma) * 255) for i in range(256)] * 3 + img = img.point(lut) + return img + + +def _crop_center(image: Image.Image, target_w: int, target_h: int) -> Image.Image: + print("Center cropping...") + img_cv = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) + img_h, img_w = img_cv.shape[:2] + img_aspect, target_aspect = img_w / img_h, target_w / target_h + + if img_aspect < target_aspect: + new_w, new_h = target_w, int(target_w / img_aspect) + else: + new_w, new_h = int(target_h * img_aspect), target_h + + img_cv = cv2.resize(img_cv, (new_w, new_h), interpolation=cv2.INTER_LANCZOS4) + x_off = (new_w - target_w) // 2 + y_off = (new_h - target_h) // 2 + cropped = img_cv[y_off:y_off + target_h, x_off:x_off + target_w] + return Image.fromarray(cv2.cvtColor(cropped, cv2.COLOR_BGR2RGB)) + + +@jit(nopython=True, cache=True) +def _find_nearest_color(r, g, b, palette, weights): + best_idx, best_dist = 0, 1e10 + for i in range(palette.shape[0]): + dr = (palette[i, 0] - r) * weights[0] + dg = (palette[i, 1] - g) * weights[1] + db = (palette[i, 2] - b) * weights[2] + dist = dr * dr + dg * dg + db * db + if dist < best_dist: + best_dist, best_idx = dist, i + return best_idx + + +@jit(nopython=True, cache=True) +def _atkinson_dither_rows(img, palette, weights, indices, start_row, end_row): + height, width = img.shape[:2] + for y in range(start_row, end_row): + for x in range(width): + old_r, old_g, old_b = img[y, x, 0], img[y, x, 1], img[y, x, 2] + idx = _find_nearest_color(old_r, old_g, old_b, palette, weights) + indices[y, x] = idx + new_r, new_g, new_b = palette[idx, 0], palette[idx, 1], palette[idx, 2] + + err_r, err_g, err_b = (old_r - new_r) / 8.0, (old_g - new_g) / 8.0, (old_b - new_b) / 8.0 + + if x + 1 < width: + img[y, x + 1, 0] += err_r + img[y, x + 1, 1] += err_g + img[y, x + 1, 2] += err_b + if x + 2 < width: + img[y, x + 2, 0] += err_r + img[y, x + 2, 1] += err_g + img[y, x + 2, 2] += err_b + if y + 1 < height: + if x > 0: + img[y + 1, x - 1, 0] += err_r + img[y + 1, x - 1, 1] += err_g + img[y + 1, x - 1, 2] += err_b + img[y + 1, x, 0] += err_r + img[y + 1, x, 1] += err_g + img[y + 1, x, 2] += err_b + if x + 1 < width: + img[y + 1, x + 1, 0] += err_r + img[y + 1, x + 1, 1] += err_g + img[y + 1, x + 1, 2] += err_b + if y + 2 < height: + img[y + 2, x, 0] += err_r + img[y + 2, x, 1] += err_g + img[y + 2, x, 2] += err_b + + +def _dither_atkinson(image: Image.Image) -> np.ndarray: + """Atkinson-dither to the e-ink palette and return a uint8 array of palette indices.""" + img = np.array(image.convert('RGB'), dtype=np.float64) + height, width = img.shape[:2] + indices = np.zeros((height, width), dtype=np.uint8) + print("Dithering...") + progress = ProgressBar(height, desc="Dithering") + + chunk_size = 48 + for i in range((height + chunk_size - 1) // chunk_size): + start, end = i * chunk_size, min((i + 1) * chunk_size, height) + _atkinson_dither_rows(img, PALETTE_RGB, PERCEPTUAL_WEIGHTS, indices, start, end) + progress.set(end) + + return indices + + +class EPD: + def __init__(self): + self.reset_pin = epdconfig.RST_PIN + self.dc_pin = epdconfig.DC_PIN + self.busy_pin = epdconfig.BUSY_PIN + self.cs_pin = epdconfig.CS_PIN + self.width = EPD_WIDTH + self.height = EPD_HEIGHT + + def reset(self): + epdconfig.digital_write(self.reset_pin, 1) + epdconfig.delay_ms(20) + epdconfig.digital_write(self.reset_pin, 0) + epdconfig.delay_ms(2) + epdconfig.digital_write(self.reset_pin, 1) + epdconfig.delay_ms(20) + + def send_command(self, command): + epdconfig.digital_write(self.dc_pin, 0) + epdconfig.digital_write(self.cs_pin, 0) + epdconfig.spi_writebyte([command]) + epdconfig.digital_write(self.cs_pin, 1) + + def send_data(self, data): + epdconfig.digital_write(self.dc_pin, 1) + epdconfig.digital_write(self.cs_pin, 0) + epdconfig.spi_writebyte([data]) + epdconfig.digital_write(self.cs_pin, 1) + + def send_data2(self, data): + epdconfig.digital_write(self.dc_pin, 1) + epdconfig.digital_write(self.cs_pin, 0) + epdconfig.spi_writebyte2(data) + epdconfig.digital_write(self.cs_pin, 1) + + def wait_busy(self): + while epdconfig.digital_read(self.busy_pin) == 0: + epdconfig.delay_ms(5) + + def turn_on_display(self): + self.send_command(0x04) # POWER_ON + self.wait_busy() + self.send_command(0x12) # DISPLAY_REFRESH + self.send_data(0x00) + self.wait_busy() + self.send_command(0x02) # POWER_OFF + self.send_data(0x00) + self.wait_busy() + + def init(self): + epdconfig.module_init() + self.reset() + self.wait_busy() + epdconfig.delay_ms(30) + + for cmd, data in INIT_SEQUENCE: + self.send_command(cmd) + for v in data: + self.send_data(v) + + self.send_command(0x04) + self.wait_busy() + + def getbuffer(self, image, saturation: float, contrast: float, gamma: float, + left_text: str | None = None, right_text: str | None = None, + orientation: int = 0): + image = image.convert('RGB') + if image.size != (self.width, self.height): + print(f"Input: {image.size[0]}x{image.size[1]} → {self.width}x{self.height}") + image = _crop_center(image, self.width, self.height) + + print("Enhancing...") + image = _enhance_for_eink(image, saturation, contrast, gamma) + + indices = _dither_atkinson(image) + + if left_text or right_text: + print("Rendering overlay...") + render_text_into_indices(indices, left_text, right_text, orientation) + + print("Packing buffer...") + flat = indices.reshape(-1) + return ((flat[0::2].astype(np.uint8) << 4) | flat[1::2].astype(np.uint8)).tolist() + + def display(self, image): + self.send_command(0x10) + self.send_data2(image) + self.turn_on_display() + + def sleep(self): + self.send_command(0x07) # DEEP_SLEEP + self.send_data(0xA5) + epdconfig.delay_ms(2000) + epdconfig.module_exit()