Improvements and notebooks

This commit is contained in:
Andras Schmelczer 2026-04-26 21:05:16 +01:00
parent 84f8456fff
commit f6b0ba5754
34 changed files with 2668 additions and 1373 deletions

View file

@ -12,6 +12,7 @@ sys.path.append(str(Path(__file__).parent / "lib"))
from immich import ImmichClient, get_random_photo_of_people, get_random_photo_from_album
from homeassistant import HomeAssistantClient
from overlay import format_age, format_location
from crop import face_aware_crop
# waveshare_epd is imported lazily after the lock — its epdconfig claims
# GPIO pins at import time, so two overlapping invocations would both crash
# on "GPIO busy" before reaching the flock below.
@ -21,7 +22,7 @@ IMMICH_API_KEY = os.environ.get("IMMICH_API_KEY", "REDACTED_IMMICH_API_KEY")
HA_URL = os.environ.get("HA_URL", "https://homeassistant.example.com")
HA_TOKEN = os.environ.get("HA_TOKEN", "REDACTED_HA_TOKEN")
HA_PRESENCE_ENTITIES = ["person.andras", "person.ruby"]
HA_PRESENCE = {"Andras": "person.andras", "Ruby": "person.ruby"}
def main() -> None:
@ -47,12 +48,12 @@ def main() -> None:
now = datetime.now()
print(f"Time: {now.strftime('%H:%M')}")
if 0 <= now.hour < 7:
if now.hour < 7:
print("Night time, skipping")
sys.exit(0)
ha = HomeAssistantClient(HA_URL, HA_TOKEN)
home = [e.split(".")[-1].title() for e in HA_PRESENCE_ENTITIES if ha.is_person_home(e)]
home = [name for name, eid in HA_PRESENCE.items() if ha.is_person_home(eid)]
if not home:
print("No one home, skipping")
sys.exit(0)
@ -77,6 +78,10 @@ def main() -> None:
try:
epd.init()
img = Image.open(image_path).convert("RGB")
faces = client.get_asset_faces(asset["id"])
print(f"Faces: {len(faces)}")
target_w, target_h = (480, 800) if args.orientation in (90, 270) else (800, 480)
img = face_aware_crop(img, target_w, target_h, faces)
if args.orientation:
img = img.rotate(args.orientation, expand=True)
buf = epd.getbuffer(img, saturation=args.saturation, contrast=args.contrast,

69
src/lib/crop.py Normal file
View file

@ -0,0 +1,69 @@
"""Resize-to-cover then crop, biased toward Immich-detected face boxes."""
import math
from PIL import Image
# Face boxes end at the hairline; extend each box upward by this fraction of
# its own height so the fit-check considers the head, not just the face.
HEAD_EXTENSION = 0.4
def face_aware_crop(image: Image.Image, target_w: int, target_h: int,
faces: list[dict]) -> Image.Image:
"""Resize to cover (target_w, target_h), then crop to keep faces in frame.
Each face dict has imageWidth/imageHeight (the coord-space dims) and
boundingBoxX1/Y1/X2/Y2. Per axis: if every (head-extended) face fits in
the crop we centre on the joint span so all faces are included with hair
clearance on top. If the span doesn't fit, we fall back to the
area-weighted centroid of the unextended boxes that biases toward the
biggest, presumably foreground, face. Plain center crop when no faces.
"""
img_w, img_h = image.size
img_aspect = img_w / img_h
target_aspect = target_w / target_h
if img_aspect < target_aspect:
new_w = target_w
new_h = math.ceil(target_w / img_aspect)
else:
new_w = math.ceil(target_h * img_aspect)
new_h = target_h
resized = image.resize((new_w, new_h), Image.LANCZOS)
cx, cy = new_w / 2, new_h / 2
if faces:
boxes = []
for f in faces:
sx = new_w / (f.get("imageWidth") or img_w)
sy = new_h / (f.get("imageHeight") or img_h)
x1 = f["boundingBoxX1"] * sx
y1 = f["boundingBoxY1"] * sy
x2 = f["boundingBoxX2"] * sx
y2 = f["boundingBoxY2"] * sy
area = max(0.0, (x2 - x1) * (y2 - y1))
boxes.append((x1, y1, x2, y2, area))
x_lo = min(b[0] for b in boxes)
x_hi = max(b[2] for b in boxes)
if x_hi - x_lo <= target_w:
cx = (x_lo + x_hi) / 2
else:
cx = _weighted_center(boxes, 0, 2)
y_lo_ext = min(b[1] - (b[3] - b[1]) * HEAD_EXTENSION for b in boxes)
y_hi = max(b[3] for b in boxes)
if y_hi - y_lo_ext <= target_h:
cy = (y_lo_ext + y_hi) / 2
else:
cy = _weighted_center(boxes, 1, 3)
x_off = max(0, min(int(cx - target_w / 2), new_w - target_w))
y_off = max(0, min(int(cy - target_h / 2), new_h - target_h))
return resized.crop((x_off, y_off, x_off + target_w, y_off + target_h))
def _weighted_center(boxes: list[tuple], lo: int, hi: int) -> float:
total = sum(b[4] for b in boxes) or 1.0
return sum((b[lo] + b[hi]) / 2 * b[4] for b in boxes) / total

View file

@ -16,11 +16,11 @@ CACHE_DIR = Path(tempfile.gettempdir()) / "frame_cache"
def _cache_get(key: str) -> list[dict] | None:
path = CACHE_DIR / f"{key}.json"
if not path.exists() or time.time() - path.stat().st_mtime > 3600:
return None
try:
if time.time() - path.stat().st_mtime > 3600:
return None
return json.loads(path.read_text())
except (json.JSONDecodeError, OSError):
except (FileNotFoundError, json.JSONDecodeError, OSError):
return None
@ -29,45 +29,26 @@ def _cache_set(key: str, value: list[dict]) -> None:
(CACHE_DIR / f"{key}.json").write_text(json.dumps(value))
class PhotoHistory:
"""Track displayed photos to avoid repeats. Clears after 7 days."""
def _load_history() -> tuple[set[str], datetime]:
"""Load (displayed, created_at). Resets if missing/corrupt or older than 7 days."""
try:
data = json.loads(HISTORY_FILE.read_text())
created_at = datetime.fromisoformat(data["created_at"])
if created_at.tzinfo is None:
created_at = created_at.replace(tzinfo=timezone.utc)
if datetime.now(timezone.utc) - created_at <= timedelta(days=7):
return set(data.get("displayed", [])), created_at
print("Photo history expired (>7 days), clearing...")
except (FileNotFoundError, json.JSONDecodeError, ValueError, KeyError):
pass
return set(), datetime.now(timezone.utc)
def __init__(self, path: Path = HISTORY_FILE):
self.path = path
self.displayed: set[str] = set()
self.created_at = datetime.now(timezone.utc)
self._load()
def _load(self) -> None:
if not self.path.exists():
self._save()
return
try:
data = json.loads(self.path.read_text())
self.created_at = datetime.fromisoformat(data["created_at"])
if self.created_at.tzinfo is None:
self.created_at = self.created_at.replace(tzinfo=timezone.utc)
if datetime.now(timezone.utc) - self.created_at > timedelta(days=7):
print("Photo history expired (>7 days), clearing...")
self.created_at = datetime.now(timezone.utc)
self._save()
else:
self.displayed = set(data.get("displayed", []))
except (json.JSONDecodeError, ValueError, KeyError):
self._save()
def _save(self) -> None:
self.path.write_text(json.dumps({
"created_at": self.created_at.isoformat(),
"displayed": list(self.displayed),
}, indent=2))
def mark_displayed(self, asset_id: str) -> None:
self.displayed.add(asset_id)
self._save()
def filter_new(self, assets: list[dict]) -> list[dict]:
return [a for a in assets if a.get("id") not in self.displayed]
def _save_history(displayed: set[str], created_at: datetime) -> None:
HISTORY_FILE.write_text(json.dumps({
"created_at": created_at.isoformat(),
"displayed": sorted(displayed),
}, indent=2))
@dataclass
@ -79,14 +60,13 @@ class ImmichClient:
self.base_url = self.base_url.rstrip("/")
def _request(self, method: str, endpoint: str, data: dict | None = None) -> dict:
url = f"{self.base_url}/api/{endpoint.lstrip('/')}"
headers = {"x-api-key": self.api_key}
body = None
if data is not None:
headers["Content-Type"] = "application/json"
body = json.dumps(data).encode()
req = Request(url, data=body, headers=headers, method=method)
req = Request(f"{self.base_url}/api{endpoint}", data=body, headers=headers, method=method)
with urlopen_with_retry(req, timeout=30) as resp:
return json.loads(resp.read().decode())
@ -105,16 +85,15 @@ class ImmichClient:
items = []
page = 1
while True:
result = self._request("POST", "/search/metadata", {
assets = self._request("POST", "/search/metadata", {
"personIds": person_ids,
"size": 250,
"page": page,
"type": "IMAGE",
"withExif": True,
})
batch = result.get("assets", {}).get("items", [])
items.extend(batch)
if not batch or not result.get("assets", {}).get("nextPage"):
}).get("assets", {})
items.extend(assets.get("items", []))
if not assets.get("nextPage"):
break
page += 1
_cache_set(key, items)
@ -127,6 +106,19 @@ class ImmichClient:
dest.write_bytes(resp.read())
return dest
def get_asset_faces(self, asset_id: str) -> list[dict]:
"""Face boxes for people assigned on this asset.
Each face has imageWidth, imageHeight, boundingBoxX1/Y1/X2/Y2.
Unassigned faces are skipped they're often false positives (posters,
reflections) and shouldn't drag the crop off the real subjects.
"""
asset = self._request("GET", f"/assets/{asset_id}")
faces = []
for person in asset.get("people") or []:
faces.extend(person.get("faces") or [])
return faces
def get_album_id(self, name: str) -> str | None:
for album in self._request("GET", "/albums"):
if album["albumName"].lower() == name.lower():
@ -159,8 +151,35 @@ def _filter_by_orientation(assets: list[dict], portrait: bool) -> list[dict]:
return out
def _on_this_day_candidates(assets: list[dict]) -> list[dict]:
"""Photos taken on today's month-day in past years, with a ±3-day fallback."""
today = datetime.now(timezone.utc).date()
dated = []
for a in assets:
exif = a.get("exifInfo") or {}
date_str = exif.get("dateTimeOriginal") or a.get("fileCreatedAt")
if not date_str:
continue
try:
dt = datetime.fromisoformat(date_str.replace("Z", "+00:00")).date()
except (ValueError, AttributeError):
continue
if dt.year < today.year:
dated.append((a, dt))
exact = [a for a, dt in dated if (dt.month, dt.day) == (today.month, today.day)]
if exact:
return exact
nearby_md = set()
for offset in range(-3, 4):
d = today + timedelta(days=offset)
nearby_md.add((d.month, d.day))
return [a for a, dt in dated if (dt.month, dt.day) in nearby_md]
def _pick_weighted_random(assets: list[dict]) -> dict:
"""Pick random asset, biased towards favorites and recently added photos."""
"""Pick random asset, biased towards on-this-day memories, favorites, and recents."""
cutoff = datetime.now(timezone.utc) - timedelta(days=30)
favorites = [a for a in assets if a.get("isFavorite")]
recent = []
@ -170,10 +189,18 @@ def _pick_weighted_random(assets: list[dict]) -> dict:
recent.append(a)
except (ValueError, AttributeError):
pass
on_this_day = _on_this_day_candidates(assets)
candidates = [(favorites, 0.2), (recent, 0.4), (assets, 0.4)]
pools, weights = zip(*[(p, w) for p, w in candidates if p])
pool = random.choices(pools, weights=weights)[0]
candidates = [
("on this day", on_this_day, 0.10),
("favorites", favorites, 0.18),
("recent", recent, 0.36),
("all", assets, 0.36),
]
active = [(label, pool, w) for label, pool, w in candidates if pool]
print("Pool sizes: " + ", ".join(f"{label}={len(pool)}" for label, pool, _ in active))
label, pool, _ = random.choices(active, weights=[w for _, _, w in active])[0]
print(f"Picked pool: {label} ({len(pool)} candidates)")
return random.choice(pool)
@ -184,8 +211,8 @@ def _pick_and_download(client: ImmichClient, assets: list[dict],
if not filtered:
raise ValueError(f"No {'portrait' if portrait else 'landscape'} photos in {source_label}")
history = PhotoHistory()
candidates = history.filter_new(filtered)
displayed, created_at = _load_history()
candidates = [a for a in filtered if a.get("id") not in displayed]
if not candidates:
print(f"All {len(filtered)} photos shown, picking from full list")
candidates = filtered
@ -195,7 +222,8 @@ def _pick_and_download(client: ImmichClient, assets: list[dict],
asset = _pick_weighted_random(candidates)
dest = Path(tempfile.gettempdir()) / "immich_photo.jpg"
path = client.download_asset(asset["id"], dest)
history.mark_displayed(asset["id"])
displayed.add(asset["id"])
_save_history(displayed, created_at)
return path, asset

View file

@ -3,17 +3,13 @@ import time
from urllib.error import URLError
from urllib.request import Request, urlopen
RETRY_DELAYS = (3, 10)
def urlopen_with_retry(req: Request, timeout: int = 30):
"""urlopen wrapper that retries transient network failures."""
last_err: Exception | None = None
for attempt in range(len(RETRY_DELAYS) + 1):
"""urlopen wrapper that retries transient network failures (3s, 10s backoff)."""
for delay in (3, 10, None):
try:
return urlopen(req, timeout=timeout)
except (URLError, TimeoutError) as e:
last_err = e
if attempt < len(RETRY_DELAYS):
time.sleep(RETRY_DELAYS[attempt])
raise last_err
except (URLError, TimeoutError):
if delay is None:
raise
time.sleep(delay)

View file

@ -52,7 +52,9 @@ def format_age(asset: dict) -> str | None:
for n, unit in ((365, "year"), (30, "month"), (7, "week")):
if days >= n:
count = max(1, days // n)
return f"{count} {unit}{'s' if count > 1 else ''} ago"
if count == 1:
return f"Last {unit}"
return f"{count} {unit}s ago"
def format_location(asset: dict) -> str | None:

View file

@ -159,23 +159,20 @@ class EPD:
epdconfig.digital_write(self.reset_pin, 1)
epdconfig.delay_ms(20)
def send_command(self, command):
epdconfig.digital_write(self.dc_pin, 0)
def _spi(self, dc: int, payload, batch: bool = False):
epdconfig.digital_write(self.dc_pin, dc)
epdconfig.digital_write(self.cs_pin, 0)
epdconfig.spi_writebyte([command])
(epdconfig.spi_writebyte2 if batch else epdconfig.spi_writebyte)(payload)
epdconfig.digital_write(self.cs_pin, 1)
def send_command(self, command):
self._spi(0, [command])
def send_data(self, data):
epdconfig.digital_write(self.dc_pin, 1)
epdconfig.digital_write(self.cs_pin, 0)
epdconfig.spi_writebyte([data])
epdconfig.digital_write(self.cs_pin, 1)
self._spi(1, [data])
def send_data2(self, data):
epdconfig.digital_write(self.dc_pin, 1)
epdconfig.digital_write(self.cs_pin, 0)
epdconfig.spi_writebyte2(data)
epdconfig.digital_write(self.cs_pin, 1)
self._spi(1, data, batch=True)
def wait_busy(self):
while epdconfig.digital_read(self.busy_pin) == 0: