This commit is contained in:
Andras Schmelczer 2026-05-03 10:39:31 +01:00
parent 9a009f0b4c
commit eed1567f7f
12 changed files with 463 additions and 243 deletions

View file

@ -9,10 +9,11 @@ from pathlib import Path
from PIL import Image
sys.path.append(str(Path(__file__).parent / "lib"))
from immich import ImmichClient, get_random_photo_of_people, get_random_photo_from_album
from homeassistant import HomeAssistantClient
from overlay import format_age, format_location
from crop import face_aware_crop
from homeassistant import HomeAssistantClient
from immich import ImmichClient, get_random_photo_from_album, get_random_photo_of_people
from overlay import format_age, format_location
# waveshare_epd is imported lazily after the lock — its epdconfig claims
# GPIO pins at import time, so two overlapping invocations would both crash
# on "GPIO busy" before reaching the flock below.
@ -21,23 +22,33 @@ IMMICH_URL = os.environ.get("IMMICH_URL", "https://immich.example.com")
IMMICH_API_KEY = os.environ.get("IMMICH_API_KEY", "REDACTED_IMMICH_API_KEY")
HA_URL = os.environ.get("HA_URL", "https://homeassistant.example.com")
HA_TOKEN = os.environ.get("HA_TOKEN", "REDACTED_HA_TOKEN")
HA_TOKEN = os.environ.get(
"HA_TOKEN",
"REDACTED_HA_TOKEN",
)
HA_PRESENCE = {"Andras": "person.andras", "Ruby": "person.ruby"}
def main() -> None:
parser = argparse.ArgumentParser(description="Display image on e-ink frame")
parser.add_argument("--people", default="Me,Ruby",
help="Comma-separated names for Immich search")
parser.add_argument(
"--people", default="Me,Ruby", help="Comma-separated names for Immich search"
)
parser.add_argument("--album", help="Fetch from album (overrides --people)")
parser.add_argument("-o", "--orientation", type=int, choices=[0, 90, 180, 270],
default=0, help="Rotation in degrees")
parser.add_argument(
"-o",
"--orientation",
type=int,
choices=[0, 90, 180, 270],
default=0,
help="Rotation in degrees",
)
parser.add_argument("--saturation", type=float, default=1.3)
parser.add_argument("--contrast", type=float, default=1.05)
parser.add_argument("--gamma", type=float, default=0.90)
args = parser.parse_args()
lock_fd = open("/tmp/frame.lock", "w")
lock_fd = open("/tmp/frame.lock", "w") # noqa: SIM115 — held for process lifetime
try:
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError:
@ -84,9 +95,15 @@ def main() -> None:
img = face_aware_crop(img, target_w, target_h, faces)
if args.orientation:
img = img.rotate(args.orientation, expand=True)
buf = epd.getbuffer(img, saturation=args.saturation, contrast=args.contrast,
gamma=args.gamma, left_text=left_text, right_text=right_text,
orientation=args.orientation)
buf = epd.getbuffer(
img,
saturation=args.saturation,
contrast=args.contrast,
gamma=args.gamma,
left_text=left_text,
right_text=right_text,
orientation=args.orientation,
)
epd.display(buf)
finally:
epd.sleep()

View file

@ -9,8 +9,9 @@ from PIL import Image
HEAD_EXTENSION = 0.4
def face_aware_crop(image: Image.Image, target_w: int, target_h: int,
faces: list[dict]) -> Image.Image:
def face_aware_crop(
image: Image.Image, target_w: int, target_h: int, faces: list[dict]
) -> Image.Image:
"""Resize to cover (target_w, target_h), then crop to keep faces in frame.
Each face dict has imageWidth/imageHeight (the coord-space dims) and
@ -47,17 +48,11 @@ def face_aware_crop(image: Image.Image, target_w: int, target_h: int,
x_lo = min(b[0] for b in boxes)
x_hi = max(b[2] for b in boxes)
if x_hi - x_lo <= target_w:
cx = (x_lo + x_hi) / 2
else:
cx = _weighted_center(boxes, 0, 2)
cx = (x_lo + x_hi) / 2 if x_hi - x_lo <= target_w else _weighted_center(boxes, 0, 2)
y_lo_ext = min(b[1] - (b[3] - b[1]) * HEAD_EXTENSION for b in boxes)
y_hi = max(b[3] for b in boxes)
if y_hi - y_lo_ext <= target_h:
cy = (y_lo_ext + y_hi) / 2
else:
cy = _weighted_center(boxes, 1, 3)
cy = (y_lo_ext + y_hi) / 2 if y_hi - y_lo_ext <= target_h else _weighted_center(boxes, 1, 3)
x_off = max(0, min(int(cx - target_w / 2), new_w - target_w))
y_off = max(0, min(int(cy - target_h / 2), new_h - target_h))

View file

@ -4,7 +4,7 @@ import random
import tempfile
import time
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from datetime import UTC, datetime, timedelta
from pathlib import Path
from urllib.request import Request
@ -35,20 +35,25 @@ def _load_history() -> tuple[set[str], datetime]:
data = json.loads(HISTORY_FILE.read_text())
created_at = datetime.fromisoformat(data["created_at"])
if created_at.tzinfo is None:
created_at = created_at.replace(tzinfo=timezone.utc)
if datetime.now(timezone.utc) - created_at <= timedelta(days=7):
created_at = created_at.replace(tzinfo=UTC)
if datetime.now(UTC) - created_at <= timedelta(days=7):
return set(data.get("displayed", [])), created_at
print("Photo history expired (>7 days), clearing...")
except (FileNotFoundError, json.JSONDecodeError, ValueError, KeyError):
pass
return set(), datetime.now(timezone.utc)
return set(), datetime.now(UTC)
def _save_history(displayed: set[str], created_at: datetime) -> None:
HISTORY_FILE.write_text(json.dumps({
"created_at": created_at.isoformat(),
"displayed": sorted(displayed),
}, indent=2))
HISTORY_FILE.write_text(
json.dumps(
{
"created_at": created_at.isoformat(),
"displayed": sorted(displayed),
},
indent=2,
)
)
@dataclass
@ -85,13 +90,17 @@ class ImmichClient:
items = []
page = 1
while True:
assets = self._request("POST", "/search/metadata", {
"personIds": person_ids,
"size": 250,
"page": page,
"type": "IMAGE",
"withExif": True,
}).get("assets", {})
assets = self._request(
"POST",
"/search/metadata",
{
"personIds": person_ids,
"size": 250,
"page": page,
"type": "IMAGE",
"withExif": True,
},
).get("assets", {})
items.extend(assets.get("items", []))
if not assets.get("nextPage"):
break
@ -157,7 +166,7 @@ def _on_this_day_candidates(assets: list[dict]) -> tuple[list[dict], bool]:
Returns (candidates, is_exact). `is_exact` is True when same-month-day matches
exist; callers use it to weight the pool higher than the looser ±3-day fallback.
"""
today = datetime.now(timezone.utc).date()
today = datetime.now(UTC).date()
dated = []
for a in assets:
exif = a.get("exifInfo") or {}
@ -184,7 +193,7 @@ def _on_this_day_candidates(assets: list[dict]) -> tuple[list[dict], bool]:
def _pick_weighted_random(assets: list[dict]) -> dict:
"""Pick random asset, biased towards on-this-day memories, favorites, and recents."""
cutoff = datetime.now(timezone.utc) - timedelta(days=30)
cutoff = datetime.now(UTC) - timedelta(days=30)
favorites = [a for a in assets if a.get("isFavorite")]
recent = []
for a in assets:
@ -208,8 +217,9 @@ def _pick_weighted_random(assets: list[dict]) -> dict:
return random.choice(pool)
def _pick_and_download(client: ImmichClient, assets: list[dict],
orientation: int, source_label: str) -> tuple[Path, dict]:
def _pick_and_download(
client: ImmichClient, assets: list[dict], orientation: int, source_label: str
) -> tuple[Path, dict]:
portrait = orientation in (90, 270)
filtered = _filter_by_orientation(assets, portrait)
if not filtered:
@ -231,7 +241,9 @@ def _pick_and_download(client: ImmichClient, assets: list[dict],
return path, asset
def get_random_photo_of_people(client: ImmichClient, names: list[str], orientation: int = 0) -> tuple[Path, dict]:
def get_random_photo_of_people(
client: ImmichClient, names: list[str], orientation: int = 0
) -> tuple[Path, dict]:
person_ids = [pid for name in names if (pid := client.get_person_id(name))]
if not person_ids:
raise ValueError(f"No people found: {names}")
@ -243,7 +255,9 @@ def get_random_photo_of_people(client: ImmichClient, names: list[str], orientati
return _pick_and_download(client, assets, orientation, f"photos for {', '.join(names)}")
def get_random_photo_from_album(client: ImmichClient, album_name: str, orientation: int = 0) -> tuple[Path, dict]:
def get_random_photo_from_album(
client: ImmichClient, album_name: str, orientation: int = 0
) -> tuple[Path, dict]:
album_id = client.get_album_id(album_name)
if not album_id:
raise ValueError(f"Album not found: {album_name}")

View file

@ -5,7 +5,7 @@ array; black/white survive Atkinson dithering so edges stay crisp on e-ink.
"""
import os
from datetime import datetime, timezone
from datetime import UTC, datetime
import numpy as np
from PIL import Image, ImageDraw, ImageFont
@ -39,8 +39,8 @@ def format_age(asset: dict) -> str | None:
except (ValueError, AttributeError):
return None
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
days = (datetime.now(timezone.utc) - dt).days
dt = dt.replace(tzinfo=UTC)
days = (datetime.now(UTC) - dt).days
if days < 0:
return None
if days == 0:
@ -63,10 +63,9 @@ def format_location(asset: dict) -> str | None:
return exif.get("city") or exif.get("state") or exif.get("country") or None
def render_text_into_indices(indices: np.ndarray,
left_text: str | None,
right_text: str | None,
orientation: int = 0) -> None:
def render_text_into_indices(
indices: np.ndarray, left_text: str | None, right_text: str | None, orientation: int = 0
) -> None:
"""Paint white-on-black-stroke text into a (height, width) palette-index array.
Text is laid out viewer-bottom-left/right, then rotated by `orientation`
@ -89,14 +88,28 @@ def render_text_into_indices(indices: np.ndarray,
if left_text:
pos = (margin, baseline)
fill_draw.text(pos, left_text, font=font, fill=255, anchor="lb")
full_draw.text(pos, left_text, font=font, fill=255, anchor="lb",
stroke_width=stroke_width, stroke_fill=255)
full_draw.text(
pos,
left_text,
font=font,
fill=255,
anchor="lb",
stroke_width=stroke_width,
stroke_fill=255,
)
if right_text:
pos = (view_w - margin, baseline)
fill_draw.text(pos, right_text, font=font, fill=255, anchor="rb")
full_draw.text(pos, right_text, font=font, fill=255, anchor="rb",
stroke_width=stroke_width, stroke_fill=255)
full_draw.text(
pos,
right_text,
font=font,
fill=255,
anchor="rb",
stroke_width=stroke_width,
stroke_fill=255,
)
if orientation:
fill_layer = fill_layer.rotate(orientation, expand=True)