Simplify and caption

This commit is contained in:
Andras Schmelczer 2026-04-26 15:49:14 +01:00
parent de65fbee9f
commit 84f8456fff
7 changed files with 551 additions and 596 deletions

View file

@ -1,95 +1,93 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import argparse import argparse
import fcntl import fcntl
import os import os
import sys import sys
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from PIL import Image, ImageDraw, ImageFont from PIL import Image
sys.path.append(str(Path(__file__).parent / "lib")) sys.path.append(str(Path(__file__).parent / "lib"))
from waveshare_epd import epd7in3e from immich import ImmichClient, get_random_photo_of_people, get_random_photo_from_album
from immich import ImmichClient, get_random_photo_of_people, get_random_photo_from_album from homeassistant import HomeAssistantClient
from homeassistant import HomeAssistantClient from overlay import format_age, format_location
# waveshare_epd is imported lazily after the lock — its epdconfig claims
LOCK_FILE = "/tmp/frame.lock" # GPIO pins at import time, so two overlapping invocations would both crash
# on "GPIO busy" before reaching the flock below.
IMMICH_URL = os.environ.get("IMMICH_URL", "https://immich.example.com")
IMMICH_API_KEY = os.environ.get("IMMICH_API_KEY", "REDACTED_IMMICH_API_KEY") IMMICH_URL = os.environ.get("IMMICH_URL", "https://immich.example.com")
IMMICH_API_KEY = os.environ.get("IMMICH_API_KEY", "REDACTED_IMMICH_API_KEY")
HA_URL = os.environ.get("HA_URL", "https://homeassistant.example.com")
HA_TOKEN = os.environ.get("HA_TOKEN", "REDACTED_HA_TOKEN") HA_URL = os.environ.get("HA_URL", "https://homeassistant.example.com")
HA_PRESENCE_ENTITIES = ["person.andras", "person.ruby"] HA_TOKEN = os.environ.get("HA_TOKEN", "REDACTED_HA_TOKEN")
HA_PRESENCE_ENTITIES = ["person.andras", "person.ruby"]
DEFAULT_SATURATION = 1.3
DEFAULT_CONTRAST = 1.05
DEFAULT_GAMMA = 0.90 def main() -> None:
parser = argparse.ArgumentParser(description="Display image on e-ink frame")
def display_image(image_path: Path, orientation: int, saturation: float, parser.add_argument("--people", default="Me,Ruby",
contrast: float, gamma: float, enhance: bool) -> None: help="Comma-separated names for Immich search")
epd = epd7in3e.EPD() parser.add_argument("--album", help="Fetch from album (overrides --people)")
try: parser.add_argument("-o", "--orientation", type=int, choices=[0, 90, 180, 270],
epd.init() default=0, help="Rotation in degrees")
img = Image.open(image_path).convert("RGB") parser.add_argument("--saturation", type=float, default=1.3)
if orientation: parser.add_argument("--contrast", type=float, default=1.05)
img = img.rotate(orientation, expand=True) parser.add_argument("--gamma", type=float, default=0.90)
buf = epd.getbuffer(img, saturation=saturation, contrast=contrast, args = parser.parse_args()
gamma=gamma, enhance=enhance)
epd.display(buf) lock_fd = open("/tmp/frame.lock", "w")
finally: try:
epd.sleep() fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError:
print("Another instance running, skipping")
def main() -> None: sys.exit(0)
parser = argparse.ArgumentParser(description="Display image on e-ink frame")
parser.add_argument("--people", default="Me,Ruby", from waveshare_epd import epd7in3e
help="Comma-separated names for Immich search")
parser.add_argument("--album", help="Fetch from album (overrides --people)") now = datetime.now()
parser.add_argument("-o", "--orientation", type=int, choices=[0, 90, 180, 270], print(f"Time: {now.strftime('%H:%M')}")
default=0, help="Rotation in degrees") if 0 <= now.hour < 7:
parser.add_argument("--saturation", type=float, default=DEFAULT_SATURATION) print("Night time, skipping")
parser.add_argument("--contrast", type=float, default=DEFAULT_CONTRAST) sys.exit(0)
parser.add_argument("--gamma", type=float, default=DEFAULT_GAMMA)
parser.add_argument("--no-enhance", action="store_true") ha = HomeAssistantClient(HA_URL, HA_TOKEN)
args = parser.parse_args() home = [e.split(".")[-1].title() for e in HA_PRESENCE_ENTITIES if ha.is_person_home(e)]
if not home:
lock_fd = open(LOCK_FILE, "w") print("No one home, skipping")
try: sys.exit(0)
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) print(f"Home: {', '.join(home)}")
except BlockingIOError:
print("Another instance running, skipping") client = ImmichClient(IMMICH_URL, IMMICH_API_KEY)
sys.exit(0) if args.album:
image_path, asset = get_random_photo_from_album(client, args.album, args.orientation)
now = datetime.now() print(f"Album: {args.album}")
print(f"Time: {now.strftime('%H:%M')}") else:
names = [n.strip() for n in args.people.split(",")]
if 0 <= now.hour < 7: image_path, asset = get_random_photo_of_people(client, names, args.orientation)
print("Night time, skipping") print(f"People: {', '.join(names)}")
sys.exit(0)
left_text = format_age(asset)
ha = HomeAssistantClient(HA_URL, HA_TOKEN) right_text = format_location(asset)
home = [e.split(".")[-1].title() for e in HA_PRESENCE_ENTITIES if ha.is_person_home(e)] if left_text or right_text:
if not home: print(f"Overlay: {left_text or '-'} | {right_text or '-'}")
print("No one home, skipping")
sys.exit(0) try:
print(f"Home: {', '.join(home)}") epd = epd7in3e.EPD()
try:
client = ImmichClient(IMMICH_URL, IMMICH_API_KEY) epd.init()
if args.album: img = Image.open(image_path).convert("RGB")
image_path = get_random_photo_from_album(client, args.album, args.orientation) if args.orientation:
print(f"Album: {args.album}") img = img.rotate(args.orientation, expand=True)
else: buf = epd.getbuffer(img, saturation=args.saturation, contrast=args.contrast,
names = [n.strip() for n in args.people.split(",")] gamma=args.gamma, left_text=left_text, right_text=right_text,
image_path = get_random_photo_of_people(client, names, args.orientation) orientation=args.orientation)
print(f"People: {', '.join(names)}") epd.display(buf)
finally:
try: epd.sleep()
display_image(image_path, args.orientation, args.saturation, finally:
args.contrast, args.gamma, not args.no_enhance) image_path.unlink(missing_ok=True)
finally:
image_path.unlink(missing_ok=True)
if __name__ == "__main__":
main()
if __name__ == "__main__":
main()

View file

@ -1,10 +1,8 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import json import json
import time from urllib.request import Request
from urllib.error import URLError
from urllib.request import Request, urlopen
RETRY_DELAYS = (3, 10) from net import urlopen_with_retry
class HomeAssistantClient: class HomeAssistantClient:
@ -12,26 +10,14 @@ class HomeAssistantClient:
self.base_url = base_url.rstrip("/") self.base_url = base_url.rstrip("/")
self.token = token self.token = token
def get_state(self, entity_id: str) -> dict:
url = f"{self.base_url}/api/states/{entity_id}"
req = Request(url, headers={
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json",
})
last_err: Exception | None = None
for attempt in range(len(RETRY_DELAYS) + 1):
try:
with urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except (URLError, TimeoutError) as e:
last_err = e
if attempt < len(RETRY_DELAYS):
time.sleep(RETRY_DELAYS[attempt])
raise last_err
def is_person_home(self, entity_id: str) -> bool: def is_person_home(self, entity_id: str) -> bool:
req = Request(
f"{self.base_url}/api/states/{entity_id}",
headers={"Authorization": f"Bearer {self.token}"},
)
try: try:
return self.get_state(entity_id).get("state") == "home" with urlopen_with_retry(req, timeout=30) as resp:
return json.loads(resp.read().decode()).get("state") == "home"
except Exception as e: except Exception as e:
print(f"Failed to check {entity_id}: {e}") print(f"Failed to check {entity_id}: {e}")
return False return False

View file

@ -1,5 +1,4 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import hashlib
import json import json
import random import random
import tempfile import tempfile
@ -7,38 +6,17 @@ import time
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from pathlib import Path from pathlib import Path
from urllib.error import URLError from urllib.request import Request
from urllib.request import Request, urlopen
from progress import ProgressBar from net import urlopen_with_retry
HISTORY_FILE = Path(__file__).parent.parent / "photo_history.json" HISTORY_FILE = Path(__file__).parent.parent / "photo_history.json"
HISTORY_MAX_AGE_DAYS = 7
CACHE_DIR = Path(tempfile.gettempdir()) / "frame_cache" CACHE_DIR = Path(tempfile.gettempdir()) / "frame_cache"
CACHE_TTL_SECONDS = 3600
RETRY_DELAYS = (3, 10)
def _urlopen_with_retry(req: Request, timeout: int = 30):
"""urlopen wrapper that retries transient network failures."""
last_err: Exception | None = None
for attempt in range(len(RETRY_DELAYS) + 1):
try:
return urlopen(req, timeout=timeout)
except (URLError, TimeoutError) as e:
last_err = e
if attempt < len(RETRY_DELAYS):
time.sleep(RETRY_DELAYS[attempt])
raise last_err
def _cache_get(key: str) -> list[dict] | None: def _cache_get(key: str) -> list[dict] | None:
path = CACHE_DIR / f"{key}.json" path = CACHE_DIR / f"{key}.json"
if not path.exists(): if not path.exists() or time.time() - path.stat().st_mtime > 3600:
return None
if time.time() - path.stat().st_mtime > CACHE_TTL_SECONDS:
return None return None
try: try:
return json.loads(path.read_text()) return json.loads(path.read_text())
@ -57,30 +35,26 @@ class PhotoHistory:
def __init__(self, path: Path = HISTORY_FILE): def __init__(self, path: Path = HISTORY_FILE):
self.path = path self.path = path
self.displayed: set[str] = set() self.displayed: set[str] = set()
self.created_at: datetime | None = None self.created_at = datetime.now(timezone.utc)
self._load() self._load()
def _load(self) -> None: def _load(self) -> None:
if not self.path.exists(): if not self.path.exists():
self._reset() self._save()
return return
try: try:
data = json.loads(self.path.read_text()) data = json.loads(self.path.read_text())
self.created_at = datetime.fromisoformat(data.get("created_at", "")) self.created_at = datetime.fromisoformat(data["created_at"])
if self.created_at.tzinfo is None: if self.created_at.tzinfo is None:
self.created_at = self.created_at.replace(tzinfo=timezone.utc) self.created_at = self.created_at.replace(tzinfo=timezone.utc)
if datetime.now(timezone.utc) - self.created_at > timedelta(days=HISTORY_MAX_AGE_DAYS): if datetime.now(timezone.utc) - self.created_at > timedelta(days=7):
print(f"Photo history expired (>{HISTORY_MAX_AGE_DAYS} days), clearing...") print("Photo history expired (>7 days), clearing...")
self._reset() self.created_at = datetime.now(timezone.utc)
self._save()
else: else:
self.displayed = set(data.get("displayed", [])) self.displayed = set(data.get("displayed", []))
except (json.JSONDecodeError, ValueError, KeyError): except (json.JSONDecodeError, ValueError, KeyError):
self._reset() self._save()
def _reset(self) -> None:
self.displayed = set()
self.created_at = datetime.now(timezone.utc)
self._save()
def _save(self) -> None: def _save(self) -> None:
self.path.write_text(json.dumps({ self.path.write_text(json.dumps({
@ -96,24 +70,16 @@ class PhotoHistory:
return [a for a in assets if a.get("id") not in self.displayed] return [a for a in assets if a.get("id") not in self.displayed]
_history: PhotoHistory | None = None
def get_history() -> PhotoHistory:
global _history
if _history is None:
_history = PhotoHistory()
return _history
@dataclass @dataclass
class ImmichClient: class ImmichClient:
base_url: str base_url: str
api_key: str api_key: str
def _request(self, method: str, endpoint: str, data: dict | None = None, def __post_init__(self):
show_progress: bool = False, progress_desc: str = "Fetching") -> dict: self.base_url = self.base_url.rstrip("/")
url = f"{self.base_url.rstrip('/')}/api/{endpoint.lstrip('/')}"
def _request(self, method: str, endpoint: str, data: dict | None = None) -> dict:
url = f"{self.base_url}/api/{endpoint.lstrip('/')}"
headers = {"x-api-key": self.api_key} headers = {"x-api-key": self.api_key}
body = None body = None
if data is not None: if data is not None:
@ -121,30 +87,17 @@ class ImmichClient:
body = json.dumps(data).encode() body = json.dumps(data).encode()
req = Request(url, data=body, headers=headers, method=method) req = Request(url, data=body, headers=headers, method=method)
with _urlopen_with_retry(req, timeout=30) as resp: with urlopen_with_retry(req, timeout=30) as resp:
total_size = resp.headers.get('Content-Length')
if total_size and show_progress:
total_size = int(total_size)
progress = ProgressBar(total_size, desc=progress_desc)
chunks = bytearray()
while chunk := resp.read(8192):
chunks.extend(chunk)
progress.update(len(chunk))
progress.finish()
return json.loads(chunks.decode())
return json.loads(resp.read().decode()) return json.loads(resp.read().decode())
def get_people(self) -> list[dict]:
return self._request("GET", "/people")["people"]
def get_person_id(self, name: str) -> str | None: def get_person_id(self, name: str) -> str | None:
for person in self.get_people(): for person in self._request("GET", "/people")["people"]:
if person["name"].lower() == name.lower(): if person["name"].lower() == name.lower():
return person["id"] return person["id"]
return None return None
def search_assets_by_people(self, person_ids: list[str]) -> list[dict]: def search_assets_by_people(self, person_ids: list[str]) -> list[dict]:
key = "people_" + hashlib.md5("_".join(sorted(person_ids)).encode()).hexdigest() key = "people_" + "_".join(sorted(person_ids))
cached = _cache_get(key) cached = _cache_get(key)
if cached is not None: if cached is not None:
return cached return cached
@ -167,22 +120,11 @@ class ImmichClient:
_cache_set(key, items) _cache_set(key, items)
return items return items
def download_asset(self, asset_id: str, dest: Path, show_progress: bool = True) -> Path: def download_asset(self, asset_id: str, dest: Path) -> Path:
url = f"{self.base_url.rstrip('/')}/api/assets/{asset_id}/thumbnail?size=preview" url = f"{self.base_url}/api/assets/{asset_id}/thumbnail?size=preview"
req = Request(url, headers={"x-api-key": self.api_key}) req = Request(url, headers={"x-api-key": self.api_key})
with _urlopen_with_retry(req, timeout=30) as resp: with urlopen_with_retry(req, timeout=30) as resp:
total_size = resp.headers.get('Content-Length') dest.write_bytes(resp.read())
if total_size and show_progress:
total_size = int(total_size)
progress = ProgressBar(total_size, desc="Downloading")
data = bytearray()
while chunk := resp.read(8192):
data.extend(chunk)
progress.update(len(chunk))
progress.finish()
dest.write_bytes(bytes(data))
else:
dest.write_bytes(resp.read())
return dest return dest
def get_album_id(self, name: str) -> str | None: def get_album_id(self, name: str) -> str | None:
@ -191,107 +133,85 @@ class ImmichClient:
return album["id"] return album["id"]
return None return None
def get_album_assets(self, album_id: str, show_progress: bool = False) -> list[dict]: def get_album_assets(self, album_id: str) -> list[dict]:
key = f"album_{album_id}" key = f"album_{album_id}"
cached = _cache_get(key) cached = _cache_get(key)
if cached is not None: if cached is not None:
return cached return cached
assets = self._request("GET", f"/albums/{album_id}").get("assets", [])
album = self._request("GET", f"/albums/{album_id}",
show_progress=show_progress, progress_desc="Fetching album")
assets = album.get("assets", [])
_cache_set(key, assets) _cache_set(key, assets)
return assets return assets
def _is_portrait(asset: dict) -> bool | None:
"""Check if asset displays as portrait, accounting for EXIF orientation."""
exif = asset.get("exifInfo") or {}
width = exif.get("exifImageWidth") or 0
height = exif.get("exifImageHeight") or 0
if not (width and height):
return None
# EXIF orientation 6 and 8 mean 90° rotation (swap dimensions)
orientation = str(exif.get("orientation") or "1")
if orientation in ("6", "8"):
width, height = height, width
return height > width
def _filter_by_orientation(assets: list[dict], portrait: bool) -> list[dict]: def _filter_by_orientation(assets: list[dict], portrait: bool) -> list[dict]:
"""Filter assets by orientation, accounting for EXIF rotation.""" """Keep assets matching the requested orientation. Skips assets without EXIF dimensions."""
filtered = [] out = []
no_dimensions = 0 for a in assets:
for asset in assets: exif = a.get("exifInfo") or {}
is_portrait = _is_portrait(asset) w = exif.get("exifImageWidth") or 0
if is_portrait is not None: h = exif.get("exifImageHeight") or 0
if is_portrait == portrait: if not (w and h):
filtered.append(asset) continue
else: if exif.get("orientation") in (6, 8, "6", "8"):
no_dimensions += 1 w, h = h, w
if no_dimensions: if (h > w) == portrait:
print(f"Note: {no_dimensions}/{len(assets)} photos missing dimension data") out.append(a)
return filtered return out
def _pick_weighted_random(assets: list[dict]) -> dict: def _pick_weighted_random(assets: list[dict]) -> dict:
"""Pick random asset, biased towards favorites (20%) and recently added photos (50%).""" """Pick random asset, biased towards favorites and recently added photos."""
if not assets:
raise ValueError("No assets to choose from")
cutoff = datetime.now(timezone.utc) - timedelta(days=30) cutoff = datetime.now(timezone.utc) - timedelta(days=30)
favorites = [a for a in assets if a.get("isFavorite")] favorites = [a for a in assets if a.get("isFavorite")]
recent = [] recent = []
for asset in assets: for a in assets:
date_str = asset.get("createdAt", "")
try: try:
if datetime.fromisoformat(date_str.replace("Z", "+00:00")) >= cutoff: if datetime.fromisoformat(a.get("createdAt", "").replace("Z", "+00:00")) >= cutoff:
recent.append(asset) recent.append(a)
except (ValueError, AttributeError): except (ValueError, AttributeError):
pass pass
if favorites and random.random() < 0.2: candidates = [(favorites, 0.2), (recent, 0.4), (assets, 0.4)]
return random.choice(favorites) pools, weights = zip(*[(p, w) for p, w in candidates if p])
if recent and random.random() < 0.5: pool = random.choices(pools, weights=weights)[0]
return random.choice(recent) return random.choice(pool)
return random.choice(assets)
def _download_random_asset(client: ImmichClient, assets: list[dict]) -> Path: def _pick_and_download(client: ImmichClient, assets: list[dict],
history = get_history() orientation: int, source_label: str) -> tuple[Path, dict]:
new_assets = history.filter_new(assets) portrait = orientation in (90, 270)
filtered = _filter_by_orientation(assets, portrait)
if not filtered:
raise ValueError(f"No {'portrait' if portrait else 'landscape'} photos in {source_label}")
if new_assets: history = PhotoHistory()
print(f"Photos: {len(new_assets)} new / {len(assets)} total") candidates = history.filter_new(filtered)
asset = _pick_weighted_random(new_assets) if not candidates:
print(f"All {len(filtered)} photos shown, picking from full list")
candidates = filtered
else: else:
print(f"All {len(assets)} photos shown, picking from full list") print(f"Photos: {len(candidates)} new / {len(filtered)} total")
asset = _pick_weighted_random(assets)
asset = _pick_weighted_random(candidates)
dest = Path(tempfile.gettempdir()) / "immich_photo.jpg" dest = Path(tempfile.gettempdir()) / "immich_photo.jpg"
path = client.download_asset(asset["id"], dest) path = client.download_asset(asset["id"], dest)
history.mark_displayed(asset["id"]) history.mark_displayed(asset["id"])
return path return path, asset
def get_random_photo_of_people(client: ImmichClient, names: list[str], orientation: int = 0) -> Path: def get_random_photo_of_people(client: ImmichClient, names: list[str], orientation: int = 0) -> tuple[Path, dict]:
person_ids = [pid for name in names if (pid := client.get_person_id(name))] person_ids = [pid for name in names if (pid := client.get_person_id(name))]
if not person_ids: if not person_ids:
raise ValueError(f"No people found: {names}") raise ValueError(f"No people found: {names}")
assets = client.search_assets_by_people(person_ids) assets = client.search_assets_by_people(person_ids)
if not assets: if not assets:
raise ValueError(f"No photos found for: {names}") raise ValueError(f"No photos found for: {names}")
portrait = orientation in (90, 270) return _pick_and_download(client, assets, orientation, f"photos for {', '.join(names)}")
filtered = _filter_by_orientation(assets, portrait)
if not filtered:
raise ValueError(f"No {'portrait' if portrait else 'landscape'} photos available")
return _download_random_asset(client, filtered)
def get_random_photo_from_album(client: ImmichClient, album_name: str, orientation: int = 0) -> Path: def get_random_photo_from_album(client: ImmichClient, album_name: str, orientation: int = 0) -> tuple[Path, dict]:
album_id = client.get_album_id(album_name) album_id = client.get_album_id(album_name)
if not album_id: if not album_id:
raise ValueError(f"Album not found: {album_name}") raise ValueError(f"Album not found: {album_name}")
@ -300,8 +220,4 @@ def get_random_photo_from_album(client: ImmichClient, album_name: str, orientati
if not assets: if not assets:
raise ValueError(f"No photos in album: {album_name}") raise ValueError(f"No photos in album: {album_name}")
portrait = orientation in (90, 270) return _pick_and_download(client, assets, orientation, f"album: {album_name}")
filtered = _filter_by_orientation(assets, portrait)
if not filtered:
raise ValueError(f"No {'portrait' if portrait else 'landscape'} photos in album: {album_name}")
return _download_random_asset(client, filtered)

19
src/lib/net.py Normal file
View file

@ -0,0 +1,19 @@
#!/usr/bin/env python3
import time
from urllib.error import URLError
from urllib.request import Request, urlopen
RETRY_DELAYS = (3, 10)
def urlopen_with_retry(req: Request, timeout: int = 30):
"""urlopen wrapper that retries transient network failures."""
last_err: Exception | None = None
for attempt in range(len(RETRY_DELAYS) + 1):
try:
return urlopen(req, timeout=timeout)
except (URLError, TimeoutError) as e:
last_err = e
if attempt < len(RETRY_DELAYS):
time.sleep(RETRY_DELAYS[attempt])
raise last_err

106
src/lib/overlay.py Normal file
View file

@ -0,0 +1,106 @@
"""Text overlay rendering for the e-ink frame.
Paints aliased white-on-black-stroke text into the dithered palette index
array; black/white survive Atkinson dithering so edges stay crisp on e-ink.
"""
import os
from datetime import datetime, timezone
import numpy as np
from PIL import Image, ImageDraw, ImageFont
FONT_CANDIDATES = (
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
"/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"/usr/share/fonts/truetype/freefont/FreeSansBold.ttf",
"/System/Library/Fonts/Helvetica.ttc",
)
PALETTE_BLACK = 0
PALETTE_WHITE = 1
def _load_font(size: int) -> ImageFont.ImageFont:
for path in FONT_CANDIDATES:
if os.path.exists(path):
return ImageFont.truetype(path, size)
return ImageFont.load_default()
def format_age(asset: dict) -> str | None:
"""Photo capture age as 'N days/weeks/months/years ago'."""
exif = asset.get("exifInfo") or {}
date_str = exif.get("dateTimeOriginal") or asset.get("fileCreatedAt")
if not date_str:
return None
try:
dt = datetime.fromisoformat(date_str.replace("Z", "+00:00"))
except (ValueError, AttributeError):
return None
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
days = (datetime.now(timezone.utc) - dt).days
if days < 0:
return None
if days == 0:
return "Today"
if days == 1:
return "Yesterday"
if days < 7:
return f"{days} days ago"
for n, unit in ((365, "year"), (30, "month"), (7, "week")):
if days >= n:
count = max(1, days // n)
return f"{count} {unit}{'s' if count > 1 else ''} ago"
def format_location(asset: dict) -> str | None:
"""Most specific location available from EXIF."""
exif = asset.get("exifInfo") or {}
return exif.get("city") or exif.get("state") or exif.get("country") or None
def render_text_into_indices(indices: np.ndarray,
left_text: str | None,
right_text: str | None,
orientation: int = 0) -> None:
"""Paint white-on-black-stroke text into a (height, width) palette-index array.
Text is laid out viewer-bottom-left/right, then rotated by `orientation`
so labels land at the viewer's bottom regardless of frame mounting.
"""
font_size, margin, stroke_width = 20, 18, 2
buffer_h, buffer_w = indices.shape
if orientation in (90, 270):
view_w, view_h = buffer_h, buffer_w
else:
view_w, view_h = buffer_w, buffer_h
fill_layer = Image.new("L", (view_w, view_h), 0)
full_layer = Image.new("L", (view_w, view_h), 0)
fill_draw = ImageDraw.Draw(fill_layer)
full_draw = ImageDraw.Draw(full_layer)
font = _load_font(font_size)
baseline = view_h - margin
if left_text:
pos = (margin, baseline)
fill_draw.text(pos, left_text, font=font, fill=255, anchor="lb")
full_draw.text(pos, left_text, font=font, fill=255, anchor="lb",
stroke_width=stroke_width, stroke_fill=255)
if right_text:
pos = (view_w - margin, baseline)
fill_draw.text(pos, right_text, font=font, fill=255, anchor="rb")
full_draw.text(pos, right_text, font=font, fill=255, anchor="rb",
stroke_width=stroke_width, stroke_fill=255)
if orientation:
fill_layer = fill_layer.rotate(orientation, expand=True)
full_layer = full_layer.rotate(orientation, expand=True)
fill_mask = np.asarray(fill_layer) >= 128
stroke_mask = (np.asarray(full_layer) >= 128) & ~fill_mask
indices[stroke_mask] = PALETTE_BLACK
indices[fill_mask] = PALETTE_WHITE

View file

@ -1,49 +1,23 @@
"""Simple terminal progress bar for e-ink frame.""" """Simple terminal progress bar for e-ink frame."""
import sys
class ProgressBar:
def __init__(self, total: int, desc: str = ""):
class ProgressBar: self.total = total
"""Simple text-based progress bar.""" self.desc = desc
self._last_percent = -1
def __init__(self, total: int, desc: str = "", width: int = 30):
self.total = total def set(self, value: int) -> None:
self.current = 0 if self.total == 0:
self.desc = desc return
self.width = width value = min(value, self.total)
self._last_percent = -1 percent = int(100 * value / self.total)
if percent == self._last_percent:
def update(self, n: int = 1) -> None: return
"""Update progress by n steps.""" self._last_percent = percent
self.current = min(self.current + n, self.total)
self._render() filled = int(30 * value / self.total)
bar = "" * filled + "" * (30 - filled)
def set(self, value: int) -> None: end = "\n" if value >= self.total else ""
"""Set progress to specific value.""" prefix = f"{self.desc}: " if self.desc else ""
self.current = min(value, self.total) print(f"\r{prefix}|{bar}| {percent:3d}%", end=end, flush=True)
self._render()
def _render(self) -> None:
if self.total == 0:
return
percent = int(100 * self.current / self.total)
if percent == self._last_percent:
return
self._last_percent = percent
filled = int(self.width * self.current / self.total)
bar = "" * filled + "" * (self.width - filled)
desc = f"{self.desc}: " if self.desc else ""
sys.stdout.write(f"\r{desc}|{bar}| {percent:3d}%")
sys.stdout.flush()
if self.current >= self.total:
sys.stdout.write("\n")
sys.stdout.flush()
def finish(self) -> None:
"""Complete the progress bar."""
self.current = self.total
self._render()

View file

@ -1,282 +1,238 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# Waveshare 7.3" 6-color e-Paper driver (modified) # Waveshare 7.3" 6-color e-Paper driver (modified)
# Original: Waveshare team, 2022-10-20
import numpy as np
import numpy as np import cv2
import cv2 from PIL import Image, ImageEnhance
from PIL import Image, ImageEnhance from numba import jit
from numba import jit from progress import ProgressBar
from progress import ProgressBar from overlay import render_text_into_indices
from . import epdconfig from . import epdconfig
EPD_WIDTH = 800 EPD_WIDTH = 800
EPD_HEIGHT = 480 EPD_HEIGHT = 480
# 6-color e-ink encoding: indices 0,1,2,3,5,6 are wire-format colors; # 6-color e-ink encoding: indices 0,1,2,3,5,6 are wire-format colors;
# 4 is reserved/unused (filled with BLACK so nearest-color never picks it). # 4 is reserved/unused (filled with BLACK so nearest-color never picks it).
PALETTE_RGB = np.array([ PALETTE_RGB = np.array([
[0, 0, 0], # 0: BLACK [0, 0, 0], # 0: BLACK
[255, 255, 255], # 1: WHITE [255, 255, 255], # 1: WHITE
[255, 255, 0], # 2: YELLOW [255, 255, 0], # 2: YELLOW
[255, 0, 0], # 3: RED [255, 0, 0], # 3: RED
[0, 0, 0], # 4: unused [0, 0, 0], # 4: unused
[0, 0, 255], # 5: BLUE [0, 0, 255], # 5: BLUE
[0, 255, 0], # 6: GREEN [0, 255, 0], # 6: GREEN
], dtype=np.float64) ], dtype=np.float64)
PERCEPTUAL_WEIGHTS = np.array([0.299, 0.587, 0.114], dtype=np.float64) PERCEPTUAL_WEIGHTS = np.array([0.299, 0.587, 0.114], dtype=np.float64)
INIT_SEQUENCE = (
def _enhance_for_eink(image: Image.Image, saturation: float, (0xAA, [0x49, 0x55, 0x20, 0x08, 0x09, 0x18]),
contrast: float, gamma: float) -> Image.Image: (0x01, [0x3F]),
img = image.convert('RGB') (0x00, [0x5F, 0x69]),
if saturation != 1.0: (0x03, [0x00, 0x54, 0x00, 0x44]),
img = ImageEnhance.Color(img).enhance(saturation) (0x05, [0x40, 0x1F, 0x1F, 0x2C]),
if contrast != 1.0: (0x06, [0x6F, 0x1F, 0x17, 0x49]),
img = ImageEnhance.Contrast(img).enhance(contrast) (0x08, [0x6F, 0x1F, 0x1F, 0x22]),
if gamma != 1.0: (0x30, [0x03]),
lut = [int((i / 255.0) ** (1.0 / gamma) * 255) for i in range(256)] * 3 (0x50, [0x3F]),
img = img.point(lut) (0x60, [0x02, 0x00]),
return img (0x61, [0x03, 0x20, 0x01, 0xE0]),
(0x84, [0x01]),
(0xE3, [0x2F]),
def _crop_center(image: Image.Image, target_w: int, target_h: int, )
show_progress: bool = True) -> Image.Image:
if show_progress:
print("Center cropping...") def _enhance_for_eink(image: Image.Image, saturation: float,
contrast: float, gamma: float) -> Image.Image:
img_cv = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) img = image.convert('RGB')
img_h, img_w = img_cv.shape[:2] if saturation != 1.0:
img_aspect, target_aspect = img_w / img_h, target_w / target_h img = ImageEnhance.Color(img).enhance(saturation)
if contrast != 1.0:
if img_aspect < target_aspect: img = ImageEnhance.Contrast(img).enhance(contrast)
new_w, new_h = target_w, int(target_w / img_aspect) if gamma != 1.0:
else: lut = [int((i / 255.0) ** (1.0 / gamma) * 255) for i in range(256)] * 3
new_w, new_h = int(target_h * img_aspect), target_h img = img.point(lut)
return img
img_cv = cv2.resize(img_cv, (new_w, new_h), interpolation=cv2.INTER_LANCZOS4)
x_off = (new_w - target_w) // 2
y_off = (new_h - target_h) // 2 def _crop_center(image: Image.Image, target_w: int, target_h: int) -> Image.Image:
cropped = img_cv[y_off:y_off + target_h, x_off:x_off + target_w] print("Center cropping...")
return Image.fromarray(cv2.cvtColor(cropped, cv2.COLOR_BGR2RGB)) img_cv = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
img_h, img_w = img_cv.shape[:2]
img_aspect, target_aspect = img_w / img_h, target_w / target_h
@jit(nopython=True, cache=True)
def _find_nearest_color(r, g, b, palette, weights): if img_aspect < target_aspect:
best_idx, best_dist = 0, 1e10 new_w, new_h = target_w, int(target_w / img_aspect)
for i in range(palette.shape[0]): else:
dr = (palette[i, 0] - r) * weights[0] new_w, new_h = int(target_h * img_aspect), target_h
dg = (palette[i, 1] - g) * weights[1]
db = (palette[i, 2] - b) * weights[2] img_cv = cv2.resize(img_cv, (new_w, new_h), interpolation=cv2.INTER_LANCZOS4)
dist = dr * dr + dg * dg + db * db x_off = (new_w - target_w) // 2
if dist < best_dist: y_off = (new_h - target_h) // 2
best_dist, best_idx = dist, i cropped = img_cv[y_off:y_off + target_h, x_off:x_off + target_w]
return best_idx return Image.fromarray(cv2.cvtColor(cropped, cv2.COLOR_BGR2RGB))
@jit(nopython=True, cache=True) @jit(nopython=True, cache=True)
def _atkinson_dither_rows(img, palette, weights, indices, start_row, end_row): def _find_nearest_color(r, g, b, palette, weights):
height, width = img.shape[:2] best_idx, best_dist = 0, 1e10
for y in range(start_row, end_row): for i in range(palette.shape[0]):
for x in range(width): dr = (palette[i, 0] - r) * weights[0]
old_r, old_g, old_b = img[y, x, 0], img[y, x, 1], img[y, x, 2] dg = (palette[i, 1] - g) * weights[1]
idx = _find_nearest_color(old_r, old_g, old_b, palette, weights) db = (palette[i, 2] - b) * weights[2]
indices[y, x] = idx dist = dr * dr + dg * dg + db * db
new_r, new_g, new_b = palette[idx, 0], palette[idx, 1], palette[idx, 2] if dist < best_dist:
best_dist, best_idx = dist, i
err_r, err_g, err_b = (old_r - new_r) / 8.0, (old_g - new_g) / 8.0, (old_b - new_b) / 8.0 return best_idx
if x + 1 < width:
img[y, x + 1, 0] += err_r @jit(nopython=True, cache=True)
img[y, x + 1, 1] += err_g def _atkinson_dither_rows(img, palette, weights, indices, start_row, end_row):
img[y, x + 1, 2] += err_b height, width = img.shape[:2]
if x + 2 < width: for y in range(start_row, end_row):
img[y, x + 2, 0] += err_r for x in range(width):
img[y, x + 2, 1] += err_g old_r, old_g, old_b = img[y, x, 0], img[y, x, 1], img[y, x, 2]
img[y, x + 2, 2] += err_b idx = _find_nearest_color(old_r, old_g, old_b, palette, weights)
if y + 1 < height: indices[y, x] = idx
if x > 0: new_r, new_g, new_b = palette[idx, 0], palette[idx, 1], palette[idx, 2]
img[y + 1, x - 1, 0] += err_r
img[y + 1, x - 1, 1] += err_g err_r, err_g, err_b = (old_r - new_r) / 8.0, (old_g - new_g) / 8.0, (old_b - new_b) / 8.0
img[y + 1, x - 1, 2] += err_b
img[y + 1, x, 0] += err_r if x + 1 < width:
img[y + 1, x, 1] += err_g img[y, x + 1, 0] += err_r
img[y + 1, x, 2] += err_b img[y, x + 1, 1] += err_g
if x + 1 < width: img[y, x + 1, 2] += err_b
img[y + 1, x + 1, 0] += err_r if x + 2 < width:
img[y + 1, x + 1, 1] += err_g img[y, x + 2, 0] += err_r
img[y + 1, x + 1, 2] += err_b img[y, x + 2, 1] += err_g
if y + 2 < height: img[y, x + 2, 2] += err_b
img[y + 2, x, 0] += err_r if y + 1 < height:
img[y + 2, x, 1] += err_g if x > 0:
img[y + 2, x, 2] += err_b img[y + 1, x - 1, 0] += err_r
img[y + 1, x - 1, 1] += err_g
img[y + 1, x - 1, 2] += err_b
def _dither_atkinson(image: Image.Image, show_progress: bool = True) -> np.ndarray: img[y + 1, x, 0] += err_r
"""Atkinson-dither to the e-ink palette and return a uint8 array of palette indices.""" img[y + 1, x, 1] += err_g
img = np.array(image.convert('RGB'), dtype=np.float64) img[y + 1, x, 2] += err_b
height, width = img.shape[:2] if x + 1 < width:
indices = np.zeros((height, width), dtype=np.uint8) img[y + 1, x + 1, 0] += err_r
if show_progress: img[y + 1, x + 1, 1] += err_g
print("Dithering...") img[y + 1, x + 1, 2] += err_b
progress = ProgressBar(height, desc="Dithering") if y + 2 < height:
img[y + 2, x, 0] += err_r
chunk_size = 48 img[y + 2, x, 1] += err_g
for i in range((height + chunk_size - 1) // chunk_size): img[y + 2, x, 2] += err_b
start, end = i * chunk_size, min((i + 1) * chunk_size, height)
_atkinson_dither_rows(img, PALETTE_RGB, PERCEPTUAL_WEIGHTS, indices, start, end)
if show_progress: def _dither_atkinson(image: Image.Image) -> np.ndarray:
progress.set(end) """Atkinson-dither to the e-ink palette and return a uint8 array of palette indices."""
img = np.array(image.convert('RGB'), dtype=np.float64)
return indices height, width = img.shape[:2]
indices = np.zeros((height, width), dtype=np.uint8)
print("Dithering...")
class EPD: progress = ProgressBar(height, desc="Dithering")
def __init__(self):
self.reset_pin = epdconfig.RST_PIN chunk_size = 48
self.dc_pin = epdconfig.DC_PIN for i in range((height + chunk_size - 1) // chunk_size):
self.busy_pin = epdconfig.BUSY_PIN start, end = i * chunk_size, min((i + 1) * chunk_size, height)
self.cs_pin = epdconfig.CS_PIN _atkinson_dither_rows(img, PALETTE_RGB, PERCEPTUAL_WEIGHTS, indices, start, end)
self.width = EPD_WIDTH progress.set(end)
self.height = EPD_HEIGHT
return indices
def reset(self):
epdconfig.digital_write(self.reset_pin, 1)
epdconfig.delay_ms(20) class EPD:
epdconfig.digital_write(self.reset_pin, 0) def __init__(self):
epdconfig.delay_ms(2) self.reset_pin = epdconfig.RST_PIN
epdconfig.digital_write(self.reset_pin, 1) self.dc_pin = epdconfig.DC_PIN
epdconfig.delay_ms(20) self.busy_pin = epdconfig.BUSY_PIN
self.cs_pin = epdconfig.CS_PIN
def send_command(self, command): self.width = EPD_WIDTH
epdconfig.digital_write(self.dc_pin, 0) self.height = EPD_HEIGHT
epdconfig.digital_write(self.cs_pin, 0)
epdconfig.spi_writebyte([command]) def reset(self):
epdconfig.digital_write(self.cs_pin, 1) epdconfig.digital_write(self.reset_pin, 1)
epdconfig.delay_ms(20)
def send_data(self, data): epdconfig.digital_write(self.reset_pin, 0)
epdconfig.digital_write(self.dc_pin, 1) epdconfig.delay_ms(2)
epdconfig.digital_write(self.cs_pin, 0) epdconfig.digital_write(self.reset_pin, 1)
epdconfig.spi_writebyte([data]) epdconfig.delay_ms(20)
epdconfig.digital_write(self.cs_pin, 1)
def send_command(self, command):
def send_data2(self, data): epdconfig.digital_write(self.dc_pin, 0)
epdconfig.digital_write(self.dc_pin, 1) epdconfig.digital_write(self.cs_pin, 0)
epdconfig.digital_write(self.cs_pin, 0) epdconfig.spi_writebyte([command])
epdconfig.spi_writebyte2(data) epdconfig.digital_write(self.cs_pin, 1)
epdconfig.digital_write(self.cs_pin, 1)
def send_data(self, data):
def wait_busy(self): epdconfig.digital_write(self.dc_pin, 1)
while epdconfig.digital_read(self.busy_pin) == 0: epdconfig.digital_write(self.cs_pin, 0)
epdconfig.delay_ms(5) epdconfig.spi_writebyte([data])
epdconfig.digital_write(self.cs_pin, 1)
def turn_on_display(self):
self.send_command(0x04) # POWER_ON def send_data2(self, data):
self.wait_busy() epdconfig.digital_write(self.dc_pin, 1)
self.send_command(0x12) # DISPLAY_REFRESH epdconfig.digital_write(self.cs_pin, 0)
self.send_data(0x00) epdconfig.spi_writebyte2(data)
self.wait_busy() epdconfig.digital_write(self.cs_pin, 1)
self.send_command(0x02) # POWER_OFF
self.send_data(0x00) def wait_busy(self):
self.wait_busy() while epdconfig.digital_read(self.busy_pin) == 0:
epdconfig.delay_ms(5)
def init(self):
if epdconfig.module_init() != 0: def turn_on_display(self):
return -1 self.send_command(0x04) # POWER_ON
self.reset() self.wait_busy()
self.wait_busy() self.send_command(0x12) # DISPLAY_REFRESH
epdconfig.delay_ms(30) self.send_data(0x00)
self.wait_busy()
self.send_command(0xAA) self.send_command(0x02) # POWER_OFF
for v in [0x49, 0x55, 0x20, 0x08, 0x09, 0x18]: self.send_data(0x00)
self.send_data(v) self.wait_busy()
self.send_command(0x01) def init(self):
self.send_data(0x3F) epdconfig.module_init()
self.reset()
self.send_command(0x00) self.wait_busy()
self.send_data(0x5F) epdconfig.delay_ms(30)
self.send_data(0x69)
for cmd, data in INIT_SEQUENCE:
self.send_command(0x03) self.send_command(cmd)
for v in [0x00, 0x54, 0x00, 0x44]: for v in data:
self.send_data(v) self.send_data(v)
self.send_command(0x05) self.send_command(0x04)
for v in [0x40, 0x1F, 0x1F, 0x2C]: self.wait_busy()
self.send_data(v)
def getbuffer(self, image, saturation: float, contrast: float, gamma: float,
self.send_command(0x06) left_text: str | None = None, right_text: str | None = None,
for v in [0x6F, 0x1F, 0x17, 0x49]: orientation: int = 0):
self.send_data(v) image = image.convert('RGB')
if image.size != (self.width, self.height):
self.send_command(0x08) print(f"Input: {image.size[0]}x{image.size[1]}{self.width}x{self.height}")
for v in [0x6F, 0x1F, 0x1F, 0x22]: image = _crop_center(image, self.width, self.height)
self.send_data(v)
print("Enhancing...")
self.send_command(0x30) image = _enhance_for_eink(image, saturation, contrast, gamma)
self.send_data(0x03)
indices = _dither_atkinson(image)
self.send_command(0x50)
self.send_data(0x3F) if left_text or right_text:
print("Rendering overlay...")
self.send_command(0x60) render_text_into_indices(indices, left_text, right_text, orientation)
self.send_data(0x02)
self.send_data(0x00) print("Packing buffer...")
flat = indices.reshape(-1)
self.send_command(0x61) return ((flat[0::2].astype(np.uint8) << 4) | flat[1::2].astype(np.uint8)).tolist()
for v in [0x03, 0x20, 0x01, 0xE0]:
self.send_data(v) def display(self, image):
self.send_command(0x10)
self.send_command(0x84) self.send_data2(image)
self.send_data(0x01) self.turn_on_display()
self.send_command(0xE3) def sleep(self):
self.send_data(0x2F) self.send_command(0x07) # DEEP_SLEEP
self.send_data(0xA5)
self.send_command(0x04) epdconfig.delay_ms(2000)
self.wait_busy() epdconfig.module_exit()
return 0
def getbuffer(self, image, saturation: float, contrast: float, gamma: float,
enhance: bool = True, show_progress: bool = True):
image = image.convert('RGB')
imwidth, imheight = image.size
if imwidth != self.width or imheight != self.height:
if show_progress:
print(f"Input: {imwidth}x{imheight}{self.width}x{self.height}")
image = _crop_center(image, self.width, self.height, show_progress)
if enhance:
if show_progress:
print("Enhancing...")
image = _enhance_for_eink(image, saturation, contrast, gamma)
indices = _dither_atkinson(image, show_progress)
if show_progress:
print("Packing buffer...")
flat = indices.reshape(-1)
packed = (flat[0::2].astype(np.uint8) << 4) | flat[1::2].astype(np.uint8)
buf = packed.tolist()
if show_progress:
print("Ready")
return buf
def display(self, image):
self.send_command(0x10)
self.send_data2(image)
self.turn_on_display()
def Clear(self, color=0x11):
self.send_command(0x10)
self.send_data2([color] * (self.height * self.width // 2))
self.turn_on_display()
def sleep(self):
self.send_command(0x07) # DEEP_SLEEP
self.send_data(0xA5)
epdconfig.delay_ms(2000)
epdconfig.module_exit()