Simplify and caption
This commit is contained in:
parent
cd70c4cdca
commit
2c8d78b397
7 changed files with 551 additions and 596 deletions
188
src/display.py
188
src/display.py
|
|
@ -1,95 +1,93 @@
|
|||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
import fcntl
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
|
||||
sys.path.append(str(Path(__file__).parent / "lib"))
|
||||
from waveshare_epd import epd7in3e
|
||||
from immich import ImmichClient, get_random_photo_of_people, get_random_photo_from_album
|
||||
from homeassistant import HomeAssistantClient
|
||||
|
||||
LOCK_FILE = "/tmp/frame.lock"
|
||||
|
||||
IMMICH_URL = os.environ.get("IMMICH_URL", "https://immich.schmelczer.dev")
|
||||
IMMICH_API_KEY = os.environ.get("IMMICH_API_KEY", "6crxVS1JLTJxsfGlzVhN2kefdL4EP7HPkkoMk9L6ZOE")
|
||||
|
||||
HA_URL = os.environ.get("HA_URL", "https://homeassistant.schmelczer.dev")
|
||||
HA_TOKEN = os.environ.get("HA_TOKEN", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJmZjk3OTNmOWMzOWU0YjdmYmRjYTc5YmJkMTUyODcyNSIsImlhdCI6MTc2OTIwMjg1NCwiZXhwIjoyMDg0NTYyODU0fQ.IiL_1vTrGMlOoPMksN6lAopE0aInlY_wRnL4Jc-CeBs")
|
||||
HA_PRESENCE_ENTITIES = ["person.andras", "person.ruby"]
|
||||
|
||||
DEFAULT_SATURATION = 1.3
|
||||
DEFAULT_CONTRAST = 1.05
|
||||
DEFAULT_GAMMA = 0.90
|
||||
|
||||
def display_image(image_path: Path, orientation: int, saturation: float,
|
||||
contrast: float, gamma: float, enhance: bool) -> None:
|
||||
epd = epd7in3e.EPD()
|
||||
try:
|
||||
epd.init()
|
||||
img = Image.open(image_path).convert("RGB")
|
||||
if orientation:
|
||||
img = img.rotate(orientation, expand=True)
|
||||
buf = epd.getbuffer(img, saturation=saturation, contrast=contrast,
|
||||
gamma=gamma, enhance=enhance)
|
||||
epd.display(buf)
|
||||
finally:
|
||||
epd.sleep()
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="Display image on e-ink frame")
|
||||
parser.add_argument("--people", default="Me,Ruby",
|
||||
help="Comma-separated names for Immich search")
|
||||
parser.add_argument("--album", help="Fetch from album (overrides --people)")
|
||||
parser.add_argument("-o", "--orientation", type=int, choices=[0, 90, 180, 270],
|
||||
default=0, help="Rotation in degrees")
|
||||
parser.add_argument("--saturation", type=float, default=DEFAULT_SATURATION)
|
||||
parser.add_argument("--contrast", type=float, default=DEFAULT_CONTRAST)
|
||||
parser.add_argument("--gamma", type=float, default=DEFAULT_GAMMA)
|
||||
parser.add_argument("--no-enhance", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
lock_fd = open(LOCK_FILE, "w")
|
||||
try:
|
||||
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
except BlockingIOError:
|
||||
print("Another instance running, skipping")
|
||||
sys.exit(0)
|
||||
|
||||
now = datetime.now()
|
||||
print(f"Time: {now.strftime('%H:%M')}")
|
||||
|
||||
if 0 <= now.hour < 7:
|
||||
print("Night time, skipping")
|
||||
sys.exit(0)
|
||||
|
||||
ha = HomeAssistantClient(HA_URL, HA_TOKEN)
|
||||
home = [e.split(".")[-1].title() for e in HA_PRESENCE_ENTITIES if ha.is_person_home(e)]
|
||||
if not home:
|
||||
print("No one home, skipping")
|
||||
sys.exit(0)
|
||||
print(f"Home: {', '.join(home)}")
|
||||
|
||||
client = ImmichClient(IMMICH_URL, IMMICH_API_KEY)
|
||||
if args.album:
|
||||
image_path = get_random_photo_from_album(client, args.album, args.orientation)
|
||||
print(f"Album: {args.album}")
|
||||
else:
|
||||
names = [n.strip() for n in args.people.split(",")]
|
||||
image_path = get_random_photo_of_people(client, names, args.orientation)
|
||||
print(f"People: {', '.join(names)}")
|
||||
|
||||
try:
|
||||
display_image(image_path, args.orientation, args.saturation,
|
||||
args.contrast, args.gamma, not args.no_enhance)
|
||||
finally:
|
||||
image_path.unlink(missing_ok=True)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
import fcntl
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
from PIL import Image
|
||||
|
||||
sys.path.append(str(Path(__file__).parent / "lib"))
|
||||
from immich import ImmichClient, get_random_photo_of_people, get_random_photo_from_album
|
||||
from homeassistant import HomeAssistantClient
|
||||
from overlay import format_age, format_location
|
||||
# waveshare_epd is imported lazily after the lock — its epdconfig claims
|
||||
# GPIO pins at import time, so two overlapping invocations would both crash
|
||||
# on "GPIO busy" before reaching the flock below.
|
||||
|
||||
IMMICH_URL = os.environ.get("IMMICH_URL", "https://immich.schmelczer.dev")
|
||||
IMMICH_API_KEY = os.environ.get("IMMICH_API_KEY", "6crxVS1JLTJxsfGlzVhN2kefdL4EP7HPkkoMk9L6ZOE")
|
||||
|
||||
HA_URL = os.environ.get("HA_URL", "https://homeassistant.schmelczer.dev")
|
||||
HA_TOKEN = os.environ.get("HA_TOKEN", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJmZjk3OTNmOWMzOWU0YjdmYmRjYTc5YmJkMTUyODcyNSIsImlhdCI6MTc2OTIwMjg1NCwiZXhwIjoyMDg0NTYyODU0fQ.IiL_1vTrGMlOoPMksN6lAopE0aInlY_wRnL4Jc-CeBs")
|
||||
HA_PRESENCE_ENTITIES = ["person.andras", "person.ruby"]
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="Display image on e-ink frame")
|
||||
parser.add_argument("--people", default="Me,Ruby",
|
||||
help="Comma-separated names for Immich search")
|
||||
parser.add_argument("--album", help="Fetch from album (overrides --people)")
|
||||
parser.add_argument("-o", "--orientation", type=int, choices=[0, 90, 180, 270],
|
||||
default=0, help="Rotation in degrees")
|
||||
parser.add_argument("--saturation", type=float, default=1.3)
|
||||
parser.add_argument("--contrast", type=float, default=1.05)
|
||||
parser.add_argument("--gamma", type=float, default=0.90)
|
||||
args = parser.parse_args()
|
||||
|
||||
lock_fd = open("/tmp/frame.lock", "w")
|
||||
try:
|
||||
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
except BlockingIOError:
|
||||
print("Another instance running, skipping")
|
||||
sys.exit(0)
|
||||
|
||||
from waveshare_epd import epd7in3e
|
||||
|
||||
now = datetime.now()
|
||||
print(f"Time: {now.strftime('%H:%M')}")
|
||||
if 0 <= now.hour < 7:
|
||||
print("Night time, skipping")
|
||||
sys.exit(0)
|
||||
|
||||
ha = HomeAssistantClient(HA_URL, HA_TOKEN)
|
||||
home = [e.split(".")[-1].title() for e in HA_PRESENCE_ENTITIES if ha.is_person_home(e)]
|
||||
if not home:
|
||||
print("No one home, skipping")
|
||||
sys.exit(0)
|
||||
print(f"Home: {', '.join(home)}")
|
||||
|
||||
client = ImmichClient(IMMICH_URL, IMMICH_API_KEY)
|
||||
if args.album:
|
||||
image_path, asset = get_random_photo_from_album(client, args.album, args.orientation)
|
||||
print(f"Album: {args.album}")
|
||||
else:
|
||||
names = [n.strip() for n in args.people.split(",")]
|
||||
image_path, asset = get_random_photo_of_people(client, names, args.orientation)
|
||||
print(f"People: {', '.join(names)}")
|
||||
|
||||
left_text = format_age(asset)
|
||||
right_text = format_location(asset)
|
||||
if left_text or right_text:
|
||||
print(f"Overlay: {left_text or '-'} | {right_text or '-'}")
|
||||
|
||||
try:
|
||||
epd = epd7in3e.EPD()
|
||||
try:
|
||||
epd.init()
|
||||
img = Image.open(image_path).convert("RGB")
|
||||
if args.orientation:
|
||||
img = img.rotate(args.orientation, expand=True)
|
||||
buf = epd.getbuffer(img, saturation=args.saturation, contrast=args.contrast,
|
||||
gamma=args.gamma, left_text=left_text, right_text=right_text,
|
||||
orientation=args.orientation)
|
||||
epd.display(buf)
|
||||
finally:
|
||||
epd.sleep()
|
||||
finally:
|
||||
image_path.unlink(missing_ok=True)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
|
|||
|
|
@ -1,10 +1,8 @@
|
|||
#!/usr/bin/env python3
|
||||
import json
|
||||
import time
|
||||
from urllib.error import URLError
|
||||
from urllib.request import Request, urlopen
|
||||
from urllib.request import Request
|
||||
|
||||
RETRY_DELAYS = (3, 10)
|
||||
from net import urlopen_with_retry
|
||||
|
||||
|
||||
class HomeAssistantClient:
|
||||
|
|
@ -12,26 +10,14 @@ class HomeAssistantClient:
|
|||
self.base_url = base_url.rstrip("/")
|
||||
self.token = token
|
||||
|
||||
def get_state(self, entity_id: str) -> dict:
|
||||
url = f"{self.base_url}/api/states/{entity_id}"
|
||||
req = Request(url, headers={
|
||||
"Authorization": f"Bearer {self.token}",
|
||||
"Content-Type": "application/json",
|
||||
})
|
||||
last_err: Exception | None = None
|
||||
for attempt in range(len(RETRY_DELAYS) + 1):
|
||||
try:
|
||||
with urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
except (URLError, TimeoutError) as e:
|
||||
last_err = e
|
||||
if attempt < len(RETRY_DELAYS):
|
||||
time.sleep(RETRY_DELAYS[attempt])
|
||||
raise last_err
|
||||
|
||||
def is_person_home(self, entity_id: str) -> bool:
|
||||
req = Request(
|
||||
f"{self.base_url}/api/states/{entity_id}",
|
||||
headers={"Authorization": f"Bearer {self.token}"},
|
||||
)
|
||||
try:
|
||||
return self.get_state(entity_id).get("state") == "home"
|
||||
with urlopen_with_retry(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode()).get("state") == "home"
|
||||
except Exception as e:
|
||||
print(f"Failed to check {entity_id}: {e}")
|
||||
return False
|
||||
|
|
|
|||
|
|
@ -1,5 +1,4 @@
|
|||
#!/usr/bin/env python3
|
||||
import hashlib
|
||||
import json
|
||||
import random
|
||||
import tempfile
|
||||
|
|
@ -7,38 +6,17 @@ import time
|
|||
from dataclasses import dataclass
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from urllib.error import URLError
|
||||
from urllib.request import Request, urlopen
|
||||
from urllib.request import Request
|
||||
|
||||
from progress import ProgressBar
|
||||
from net import urlopen_with_retry
|
||||
|
||||
HISTORY_FILE = Path(__file__).parent.parent / "photo_history.json"
|
||||
HISTORY_MAX_AGE_DAYS = 7
|
||||
|
||||
CACHE_DIR = Path(tempfile.gettempdir()) / "frame_cache"
|
||||
CACHE_TTL_SECONDS = 3600
|
||||
|
||||
RETRY_DELAYS = (3, 10)
|
||||
|
||||
|
||||
def _urlopen_with_retry(req: Request, timeout: int = 30):
|
||||
"""urlopen wrapper that retries transient network failures."""
|
||||
last_err: Exception | None = None
|
||||
for attempt in range(len(RETRY_DELAYS) + 1):
|
||||
try:
|
||||
return urlopen(req, timeout=timeout)
|
||||
except (URLError, TimeoutError) as e:
|
||||
last_err = e
|
||||
if attempt < len(RETRY_DELAYS):
|
||||
time.sleep(RETRY_DELAYS[attempt])
|
||||
raise last_err
|
||||
|
||||
|
||||
def _cache_get(key: str) -> list[dict] | None:
|
||||
path = CACHE_DIR / f"{key}.json"
|
||||
if not path.exists():
|
||||
return None
|
||||
if time.time() - path.stat().st_mtime > CACHE_TTL_SECONDS:
|
||||
if not path.exists() or time.time() - path.stat().st_mtime > 3600:
|
||||
return None
|
||||
try:
|
||||
return json.loads(path.read_text())
|
||||
|
|
@ -57,30 +35,26 @@ class PhotoHistory:
|
|||
def __init__(self, path: Path = HISTORY_FILE):
|
||||
self.path = path
|
||||
self.displayed: set[str] = set()
|
||||
self.created_at: datetime | None = None
|
||||
self.created_at = datetime.now(timezone.utc)
|
||||
self._load()
|
||||
|
||||
def _load(self) -> None:
|
||||
if not self.path.exists():
|
||||
self._reset()
|
||||
self._save()
|
||||
return
|
||||
try:
|
||||
data = json.loads(self.path.read_text())
|
||||
self.created_at = datetime.fromisoformat(data.get("created_at", ""))
|
||||
self.created_at = datetime.fromisoformat(data["created_at"])
|
||||
if self.created_at.tzinfo is None:
|
||||
self.created_at = self.created_at.replace(tzinfo=timezone.utc)
|
||||
if datetime.now(timezone.utc) - self.created_at > timedelta(days=HISTORY_MAX_AGE_DAYS):
|
||||
print(f"Photo history expired (>{HISTORY_MAX_AGE_DAYS} days), clearing...")
|
||||
self._reset()
|
||||
if datetime.now(timezone.utc) - self.created_at > timedelta(days=7):
|
||||
print("Photo history expired (>7 days), clearing...")
|
||||
self.created_at = datetime.now(timezone.utc)
|
||||
self._save()
|
||||
else:
|
||||
self.displayed = set(data.get("displayed", []))
|
||||
except (json.JSONDecodeError, ValueError, KeyError):
|
||||
self._reset()
|
||||
|
||||
def _reset(self) -> None:
|
||||
self.displayed = set()
|
||||
self.created_at = datetime.now(timezone.utc)
|
||||
self._save()
|
||||
self._save()
|
||||
|
||||
def _save(self) -> None:
|
||||
self.path.write_text(json.dumps({
|
||||
|
|
@ -96,24 +70,16 @@ class PhotoHistory:
|
|||
return [a for a in assets if a.get("id") not in self.displayed]
|
||||
|
||||
|
||||
_history: PhotoHistory | None = None
|
||||
|
||||
|
||||
def get_history() -> PhotoHistory:
|
||||
global _history
|
||||
if _history is None:
|
||||
_history = PhotoHistory()
|
||||
return _history
|
||||
|
||||
|
||||
@dataclass
|
||||
class ImmichClient:
|
||||
base_url: str
|
||||
api_key: str
|
||||
|
||||
def _request(self, method: str, endpoint: str, data: dict | None = None,
|
||||
show_progress: bool = False, progress_desc: str = "Fetching") -> dict:
|
||||
url = f"{self.base_url.rstrip('/')}/api/{endpoint.lstrip('/')}"
|
||||
def __post_init__(self):
|
||||
self.base_url = self.base_url.rstrip("/")
|
||||
|
||||
def _request(self, method: str, endpoint: str, data: dict | None = None) -> dict:
|
||||
url = f"{self.base_url}/api/{endpoint.lstrip('/')}"
|
||||
headers = {"x-api-key": self.api_key}
|
||||
body = None
|
||||
if data is not None:
|
||||
|
|
@ -121,30 +87,17 @@ class ImmichClient:
|
|||
body = json.dumps(data).encode()
|
||||
|
||||
req = Request(url, data=body, headers=headers, method=method)
|
||||
with _urlopen_with_retry(req, timeout=30) as resp:
|
||||
total_size = resp.headers.get('Content-Length')
|
||||
if total_size and show_progress:
|
||||
total_size = int(total_size)
|
||||
progress = ProgressBar(total_size, desc=progress_desc)
|
||||
chunks = bytearray()
|
||||
while chunk := resp.read(8192):
|
||||
chunks.extend(chunk)
|
||||
progress.update(len(chunk))
|
||||
progress.finish()
|
||||
return json.loads(chunks.decode())
|
||||
with urlopen_with_retry(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
|
||||
def get_people(self) -> list[dict]:
|
||||
return self._request("GET", "/people")["people"]
|
||||
|
||||
def get_person_id(self, name: str) -> str | None:
|
||||
for person in self.get_people():
|
||||
for person in self._request("GET", "/people")["people"]:
|
||||
if person["name"].lower() == name.lower():
|
||||
return person["id"]
|
||||
return None
|
||||
|
||||
def search_assets_by_people(self, person_ids: list[str]) -> list[dict]:
|
||||
key = "people_" + hashlib.md5("_".join(sorted(person_ids)).encode()).hexdigest()
|
||||
key = "people_" + "_".join(sorted(person_ids))
|
||||
cached = _cache_get(key)
|
||||
if cached is not None:
|
||||
return cached
|
||||
|
|
@ -167,22 +120,11 @@ class ImmichClient:
|
|||
_cache_set(key, items)
|
||||
return items
|
||||
|
||||
def download_asset(self, asset_id: str, dest: Path, show_progress: bool = True) -> Path:
|
||||
url = f"{self.base_url.rstrip('/')}/api/assets/{asset_id}/thumbnail?size=preview"
|
||||
def download_asset(self, asset_id: str, dest: Path) -> Path:
|
||||
url = f"{self.base_url}/api/assets/{asset_id}/thumbnail?size=preview"
|
||||
req = Request(url, headers={"x-api-key": self.api_key})
|
||||
with _urlopen_with_retry(req, timeout=30) as resp:
|
||||
total_size = resp.headers.get('Content-Length')
|
||||
if total_size and show_progress:
|
||||
total_size = int(total_size)
|
||||
progress = ProgressBar(total_size, desc="Downloading")
|
||||
data = bytearray()
|
||||
while chunk := resp.read(8192):
|
||||
data.extend(chunk)
|
||||
progress.update(len(chunk))
|
||||
progress.finish()
|
||||
dest.write_bytes(bytes(data))
|
||||
else:
|
||||
dest.write_bytes(resp.read())
|
||||
with urlopen_with_retry(req, timeout=30) as resp:
|
||||
dest.write_bytes(resp.read())
|
||||
return dest
|
||||
|
||||
def get_album_id(self, name: str) -> str | None:
|
||||
|
|
@ -191,107 +133,85 @@ class ImmichClient:
|
|||
return album["id"]
|
||||
return None
|
||||
|
||||
def get_album_assets(self, album_id: str, show_progress: bool = False) -> list[dict]:
|
||||
def get_album_assets(self, album_id: str) -> list[dict]:
|
||||
key = f"album_{album_id}"
|
||||
cached = _cache_get(key)
|
||||
if cached is not None:
|
||||
return cached
|
||||
|
||||
album = self._request("GET", f"/albums/{album_id}",
|
||||
show_progress=show_progress, progress_desc="Fetching album")
|
||||
assets = album.get("assets", [])
|
||||
assets = self._request("GET", f"/albums/{album_id}").get("assets", [])
|
||||
_cache_set(key, assets)
|
||||
return assets
|
||||
|
||||
|
||||
def _is_portrait(asset: dict) -> bool | None:
|
||||
"""Check if asset displays as portrait, accounting for EXIF orientation."""
|
||||
exif = asset.get("exifInfo") or {}
|
||||
width = exif.get("exifImageWidth") or 0
|
||||
height = exif.get("exifImageHeight") or 0
|
||||
if not (width and height):
|
||||
return None
|
||||
# EXIF orientation 6 and 8 mean 90° rotation (swap dimensions)
|
||||
orientation = str(exif.get("orientation") or "1")
|
||||
if orientation in ("6", "8"):
|
||||
width, height = height, width
|
||||
return height > width
|
||||
|
||||
|
||||
def _filter_by_orientation(assets: list[dict], portrait: bool) -> list[dict]:
|
||||
"""Filter assets by orientation, accounting for EXIF rotation."""
|
||||
filtered = []
|
||||
no_dimensions = 0
|
||||
for asset in assets:
|
||||
is_portrait = _is_portrait(asset)
|
||||
if is_portrait is not None:
|
||||
if is_portrait == portrait:
|
||||
filtered.append(asset)
|
||||
else:
|
||||
no_dimensions += 1
|
||||
if no_dimensions:
|
||||
print(f"Note: {no_dimensions}/{len(assets)} photos missing dimension data")
|
||||
return filtered
|
||||
"""Keep assets matching the requested orientation. Skips assets without EXIF dimensions."""
|
||||
out = []
|
||||
for a in assets:
|
||||
exif = a.get("exifInfo") or {}
|
||||
w = exif.get("exifImageWidth") or 0
|
||||
h = exif.get("exifImageHeight") or 0
|
||||
if not (w and h):
|
||||
continue
|
||||
if exif.get("orientation") in (6, 8, "6", "8"):
|
||||
w, h = h, w
|
||||
if (h > w) == portrait:
|
||||
out.append(a)
|
||||
return out
|
||||
|
||||
|
||||
def _pick_weighted_random(assets: list[dict]) -> dict:
|
||||
"""Pick random asset, biased towards favorites (20%) and recently added photos (50%)."""
|
||||
if not assets:
|
||||
raise ValueError("No assets to choose from")
|
||||
|
||||
"""Pick random asset, biased towards favorites and recently added photos."""
|
||||
cutoff = datetime.now(timezone.utc) - timedelta(days=30)
|
||||
favorites = [a for a in assets if a.get("isFavorite")]
|
||||
recent = []
|
||||
for asset in assets:
|
||||
date_str = asset.get("createdAt", "")
|
||||
for a in assets:
|
||||
try:
|
||||
if datetime.fromisoformat(date_str.replace("Z", "+00:00")) >= cutoff:
|
||||
recent.append(asset)
|
||||
if datetime.fromisoformat(a.get("createdAt", "").replace("Z", "+00:00")) >= cutoff:
|
||||
recent.append(a)
|
||||
except (ValueError, AttributeError):
|
||||
pass
|
||||
|
||||
if favorites and random.random() < 0.2:
|
||||
return random.choice(favorites)
|
||||
if recent and random.random() < 0.5:
|
||||
return random.choice(recent)
|
||||
return random.choice(assets)
|
||||
candidates = [(favorites, 0.2), (recent, 0.4), (assets, 0.4)]
|
||||
pools, weights = zip(*[(p, w) for p, w in candidates if p])
|
||||
pool = random.choices(pools, weights=weights)[0]
|
||||
return random.choice(pool)
|
||||
|
||||
|
||||
def _download_random_asset(client: ImmichClient, assets: list[dict]) -> Path:
|
||||
history = get_history()
|
||||
new_assets = history.filter_new(assets)
|
||||
def _pick_and_download(client: ImmichClient, assets: list[dict],
|
||||
orientation: int, source_label: str) -> tuple[Path, dict]:
|
||||
portrait = orientation in (90, 270)
|
||||
filtered = _filter_by_orientation(assets, portrait)
|
||||
if not filtered:
|
||||
raise ValueError(f"No {'portrait' if portrait else 'landscape'} photos in {source_label}")
|
||||
|
||||
if new_assets:
|
||||
print(f"Photos: {len(new_assets)} new / {len(assets)} total")
|
||||
asset = _pick_weighted_random(new_assets)
|
||||
history = PhotoHistory()
|
||||
candidates = history.filter_new(filtered)
|
||||
if not candidates:
|
||||
print(f"All {len(filtered)} photos shown, picking from full list")
|
||||
candidates = filtered
|
||||
else:
|
||||
print(f"All {len(assets)} photos shown, picking from full list")
|
||||
asset = _pick_weighted_random(assets)
|
||||
print(f"Photos: {len(candidates)} new / {len(filtered)} total")
|
||||
|
||||
asset = _pick_weighted_random(candidates)
|
||||
dest = Path(tempfile.gettempdir()) / "immich_photo.jpg"
|
||||
path = client.download_asset(asset["id"], dest)
|
||||
history.mark_displayed(asset["id"])
|
||||
return path
|
||||
return path, asset
|
||||
|
||||
|
||||
def get_random_photo_of_people(client: ImmichClient, names: list[str], orientation: int = 0) -> Path:
|
||||
def get_random_photo_of_people(client: ImmichClient, names: list[str], orientation: int = 0) -> tuple[Path, dict]:
|
||||
person_ids = [pid for name in names if (pid := client.get_person_id(name))]
|
||||
if not person_ids:
|
||||
raise ValueError(f"No people found: {names}")
|
||||
|
||||
assets = client.search_assets_by_people(person_ids)
|
||||
|
||||
if not assets:
|
||||
raise ValueError(f"No photos found for: {names}")
|
||||
|
||||
portrait = orientation in (90, 270)
|
||||
filtered = _filter_by_orientation(assets, portrait)
|
||||
if not filtered:
|
||||
raise ValueError(f"No {'portrait' if portrait else 'landscape'} photos available")
|
||||
return _download_random_asset(client, filtered)
|
||||
return _pick_and_download(client, assets, orientation, f"photos for {', '.join(names)}")
|
||||
|
||||
|
||||
def get_random_photo_from_album(client: ImmichClient, album_name: str, orientation: int = 0) -> Path:
|
||||
def get_random_photo_from_album(client: ImmichClient, album_name: str, orientation: int = 0) -> tuple[Path, dict]:
|
||||
album_id = client.get_album_id(album_name)
|
||||
if not album_id:
|
||||
raise ValueError(f"Album not found: {album_name}")
|
||||
|
|
@ -300,8 +220,4 @@ def get_random_photo_from_album(client: ImmichClient, album_name: str, orientati
|
|||
if not assets:
|
||||
raise ValueError(f"No photos in album: {album_name}")
|
||||
|
||||
portrait = orientation in (90, 270)
|
||||
filtered = _filter_by_orientation(assets, portrait)
|
||||
if not filtered:
|
||||
raise ValueError(f"No {'portrait' if portrait else 'landscape'} photos in album: {album_name}")
|
||||
return _download_random_asset(client, filtered)
|
||||
return _pick_and_download(client, assets, orientation, f"album: {album_name}")
|
||||
|
|
|
|||
19
src/lib/net.py
Normal file
19
src/lib/net.py
Normal file
|
|
@ -0,0 +1,19 @@
|
|||
#!/usr/bin/env python3
|
||||
import time
|
||||
from urllib.error import URLError
|
||||
from urllib.request import Request, urlopen
|
||||
|
||||
RETRY_DELAYS = (3, 10)
|
||||
|
||||
|
||||
def urlopen_with_retry(req: Request, timeout: int = 30):
|
||||
"""urlopen wrapper that retries transient network failures."""
|
||||
last_err: Exception | None = None
|
||||
for attempt in range(len(RETRY_DELAYS) + 1):
|
||||
try:
|
||||
return urlopen(req, timeout=timeout)
|
||||
except (URLError, TimeoutError) as e:
|
||||
last_err = e
|
||||
if attempt < len(RETRY_DELAYS):
|
||||
time.sleep(RETRY_DELAYS[attempt])
|
||||
raise last_err
|
||||
106
src/lib/overlay.py
Normal file
106
src/lib/overlay.py
Normal file
|
|
@ -0,0 +1,106 @@
|
|||
"""Text overlay rendering for the e-ink frame.
|
||||
|
||||
Paints aliased white-on-black-stroke text into the dithered palette index
|
||||
array; black/white survive Atkinson dithering so edges stay crisp on e-ink.
|
||||
"""
|
||||
|
||||
import os
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import numpy as np
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
|
||||
FONT_CANDIDATES = (
|
||||
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
|
||||
"/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
|
||||
"/usr/share/fonts/truetype/freefont/FreeSansBold.ttf",
|
||||
"/System/Library/Fonts/Helvetica.ttc",
|
||||
)
|
||||
|
||||
PALETTE_BLACK = 0
|
||||
PALETTE_WHITE = 1
|
||||
|
||||
|
||||
def _load_font(size: int) -> ImageFont.ImageFont:
|
||||
for path in FONT_CANDIDATES:
|
||||
if os.path.exists(path):
|
||||
return ImageFont.truetype(path, size)
|
||||
return ImageFont.load_default()
|
||||
|
||||
|
||||
def format_age(asset: dict) -> str | None:
|
||||
"""Photo capture age as 'N days/weeks/months/years ago'."""
|
||||
exif = asset.get("exifInfo") or {}
|
||||
date_str = exif.get("dateTimeOriginal") or asset.get("fileCreatedAt")
|
||||
if not date_str:
|
||||
return None
|
||||
try:
|
||||
dt = datetime.fromisoformat(date_str.replace("Z", "+00:00"))
|
||||
except (ValueError, AttributeError):
|
||||
return None
|
||||
if dt.tzinfo is None:
|
||||
dt = dt.replace(tzinfo=timezone.utc)
|
||||
days = (datetime.now(timezone.utc) - dt).days
|
||||
if days < 0:
|
||||
return None
|
||||
if days == 0:
|
||||
return "Today"
|
||||
if days == 1:
|
||||
return "Yesterday"
|
||||
if days < 7:
|
||||
return f"{days} days ago"
|
||||
for n, unit in ((365, "year"), (30, "month"), (7, "week")):
|
||||
if days >= n:
|
||||
count = max(1, days // n)
|
||||
return f"{count} {unit}{'s' if count > 1 else ''} ago"
|
||||
|
||||
|
||||
def format_location(asset: dict) -> str | None:
|
||||
"""Most specific location available from EXIF."""
|
||||
exif = asset.get("exifInfo") or {}
|
||||
return exif.get("city") or exif.get("state") or exif.get("country") or None
|
||||
|
||||
|
||||
def render_text_into_indices(indices: np.ndarray,
|
||||
left_text: str | None,
|
||||
right_text: str | None,
|
||||
orientation: int = 0) -> None:
|
||||
"""Paint white-on-black-stroke text into a (height, width) palette-index array.
|
||||
|
||||
Text is laid out viewer-bottom-left/right, then rotated by `orientation`
|
||||
so labels land at the viewer's bottom regardless of frame mounting.
|
||||
"""
|
||||
font_size, margin, stroke_width = 20, 18, 2
|
||||
buffer_h, buffer_w = indices.shape
|
||||
if orientation in (90, 270):
|
||||
view_w, view_h = buffer_h, buffer_w
|
||||
else:
|
||||
view_w, view_h = buffer_w, buffer_h
|
||||
|
||||
fill_layer = Image.new("L", (view_w, view_h), 0)
|
||||
full_layer = Image.new("L", (view_w, view_h), 0)
|
||||
fill_draw = ImageDraw.Draw(fill_layer)
|
||||
full_draw = ImageDraw.Draw(full_layer)
|
||||
font = _load_font(font_size)
|
||||
baseline = view_h - margin
|
||||
|
||||
if left_text:
|
||||
pos = (margin, baseline)
|
||||
fill_draw.text(pos, left_text, font=font, fill=255, anchor="lb")
|
||||
full_draw.text(pos, left_text, font=font, fill=255, anchor="lb",
|
||||
stroke_width=stroke_width, stroke_fill=255)
|
||||
|
||||
if right_text:
|
||||
pos = (view_w - margin, baseline)
|
||||
fill_draw.text(pos, right_text, font=font, fill=255, anchor="rb")
|
||||
full_draw.text(pos, right_text, font=font, fill=255, anchor="rb",
|
||||
stroke_width=stroke_width, stroke_fill=255)
|
||||
|
||||
if orientation:
|
||||
fill_layer = fill_layer.rotate(orientation, expand=True)
|
||||
full_layer = full_layer.rotate(orientation, expand=True)
|
||||
|
||||
fill_mask = np.asarray(fill_layer) >= 128
|
||||
stroke_mask = (np.asarray(full_layer) >= 128) & ~fill_mask
|
||||
indices[stroke_mask] = PALETTE_BLACK
|
||||
indices[fill_mask] = PALETTE_WHITE
|
||||
|
|
@ -1,49 +1,23 @@
|
|||
"""Simple terminal progress bar for e-ink frame."""
|
||||
|
||||
import sys
|
||||
|
||||
|
||||
class ProgressBar:
|
||||
"""Simple text-based progress bar."""
|
||||
|
||||
def __init__(self, total: int, desc: str = "", width: int = 30):
|
||||
self.total = total
|
||||
self.current = 0
|
||||
self.desc = desc
|
||||
self.width = width
|
||||
self._last_percent = -1
|
||||
|
||||
def update(self, n: int = 1) -> None:
|
||||
"""Update progress by n steps."""
|
||||
self.current = min(self.current + n, self.total)
|
||||
self._render()
|
||||
|
||||
def set(self, value: int) -> None:
|
||||
"""Set progress to specific value."""
|
||||
self.current = min(value, self.total)
|
||||
self._render()
|
||||
|
||||
def _render(self) -> None:
|
||||
if self.total == 0:
|
||||
return
|
||||
|
||||
percent = int(100 * self.current / self.total)
|
||||
if percent == self._last_percent:
|
||||
return
|
||||
self._last_percent = percent
|
||||
|
||||
filled = int(self.width * self.current / self.total)
|
||||
bar = "█" * filled + "░" * (self.width - filled)
|
||||
|
||||
desc = f"{self.desc}: " if self.desc else ""
|
||||
sys.stdout.write(f"\r{desc}|{bar}| {percent:3d}%")
|
||||
sys.stdout.flush()
|
||||
|
||||
if self.current >= self.total:
|
||||
sys.stdout.write("\n")
|
||||
sys.stdout.flush()
|
||||
|
||||
def finish(self) -> None:
|
||||
"""Complete the progress bar."""
|
||||
self.current = self.total
|
||||
self._render()
|
||||
"""Simple terminal progress bar for e-ink frame."""
|
||||
|
||||
|
||||
class ProgressBar:
|
||||
def __init__(self, total: int, desc: str = ""):
|
||||
self.total = total
|
||||
self.desc = desc
|
||||
self._last_percent = -1
|
||||
|
||||
def set(self, value: int) -> None:
|
||||
if self.total == 0:
|
||||
return
|
||||
value = min(value, self.total)
|
||||
percent = int(100 * value / self.total)
|
||||
if percent == self._last_percent:
|
||||
return
|
||||
self._last_percent = percent
|
||||
|
||||
filled = int(30 * value / self.total)
|
||||
bar = "█" * filled + "░" * (30 - filled)
|
||||
end = "\n" if value >= self.total else ""
|
||||
prefix = f"{self.desc}: " if self.desc else ""
|
||||
print(f"\r{prefix}|{bar}| {percent:3d}%", end=end, flush=True)
|
||||
|
|
|
|||
|
|
@ -1,282 +1,238 @@
|
|||
#!/usr/bin/env python3
|
||||
# Waveshare 7.3" 6-color e-Paper driver (modified)
|
||||
# Original: Waveshare team, 2022-10-20
|
||||
|
||||
import numpy as np
|
||||
import cv2
|
||||
from PIL import Image, ImageEnhance
|
||||
from numba import jit
|
||||
from progress import ProgressBar
|
||||
from . import epdconfig
|
||||
|
||||
EPD_WIDTH = 800
|
||||
EPD_HEIGHT = 480
|
||||
|
||||
# 6-color e-ink encoding: indices 0,1,2,3,5,6 are wire-format colors;
|
||||
# 4 is reserved/unused (filled with BLACK so nearest-color never picks it).
|
||||
PALETTE_RGB = np.array([
|
||||
[0, 0, 0], # 0: BLACK
|
||||
[255, 255, 255], # 1: WHITE
|
||||
[255, 255, 0], # 2: YELLOW
|
||||
[255, 0, 0], # 3: RED
|
||||
[0, 0, 0], # 4: unused
|
||||
[0, 0, 255], # 5: BLUE
|
||||
[0, 255, 0], # 6: GREEN
|
||||
], dtype=np.float64)
|
||||
|
||||
PERCEPTUAL_WEIGHTS = np.array([0.299, 0.587, 0.114], dtype=np.float64)
|
||||
|
||||
|
||||
def _enhance_for_eink(image: Image.Image, saturation: float,
|
||||
contrast: float, gamma: float) -> Image.Image:
|
||||
img = image.convert('RGB')
|
||||
if saturation != 1.0:
|
||||
img = ImageEnhance.Color(img).enhance(saturation)
|
||||
if contrast != 1.0:
|
||||
img = ImageEnhance.Contrast(img).enhance(contrast)
|
||||
if gamma != 1.0:
|
||||
lut = [int((i / 255.0) ** (1.0 / gamma) * 255) for i in range(256)] * 3
|
||||
img = img.point(lut)
|
||||
return img
|
||||
|
||||
|
||||
def _crop_center(image: Image.Image, target_w: int, target_h: int,
|
||||
show_progress: bool = True) -> Image.Image:
|
||||
if show_progress:
|
||||
print("Center cropping...")
|
||||
|
||||
img_cv = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
|
||||
img_h, img_w = img_cv.shape[:2]
|
||||
img_aspect, target_aspect = img_w / img_h, target_w / target_h
|
||||
|
||||
if img_aspect < target_aspect:
|
||||
new_w, new_h = target_w, int(target_w / img_aspect)
|
||||
else:
|
||||
new_w, new_h = int(target_h * img_aspect), target_h
|
||||
|
||||
img_cv = cv2.resize(img_cv, (new_w, new_h), interpolation=cv2.INTER_LANCZOS4)
|
||||
x_off = (new_w - target_w) // 2
|
||||
y_off = (new_h - target_h) // 2
|
||||
cropped = img_cv[y_off:y_off + target_h, x_off:x_off + target_w]
|
||||
return Image.fromarray(cv2.cvtColor(cropped, cv2.COLOR_BGR2RGB))
|
||||
|
||||
|
||||
@jit(nopython=True, cache=True)
|
||||
def _find_nearest_color(r, g, b, palette, weights):
|
||||
best_idx, best_dist = 0, 1e10
|
||||
for i in range(palette.shape[0]):
|
||||
dr = (palette[i, 0] - r) * weights[0]
|
||||
dg = (palette[i, 1] - g) * weights[1]
|
||||
db = (palette[i, 2] - b) * weights[2]
|
||||
dist = dr * dr + dg * dg + db * db
|
||||
if dist < best_dist:
|
||||
best_dist, best_idx = dist, i
|
||||
return best_idx
|
||||
|
||||
|
||||
@jit(nopython=True, cache=True)
|
||||
def _atkinson_dither_rows(img, palette, weights, indices, start_row, end_row):
|
||||
height, width = img.shape[:2]
|
||||
for y in range(start_row, end_row):
|
||||
for x in range(width):
|
||||
old_r, old_g, old_b = img[y, x, 0], img[y, x, 1], img[y, x, 2]
|
||||
idx = _find_nearest_color(old_r, old_g, old_b, palette, weights)
|
||||
indices[y, x] = idx
|
||||
new_r, new_g, new_b = palette[idx, 0], palette[idx, 1], palette[idx, 2]
|
||||
|
||||
err_r, err_g, err_b = (old_r - new_r) / 8.0, (old_g - new_g) / 8.0, (old_b - new_b) / 8.0
|
||||
|
||||
if x + 1 < width:
|
||||
img[y, x + 1, 0] += err_r
|
||||
img[y, x + 1, 1] += err_g
|
||||
img[y, x + 1, 2] += err_b
|
||||
if x + 2 < width:
|
||||
img[y, x + 2, 0] += err_r
|
||||
img[y, x + 2, 1] += err_g
|
||||
img[y, x + 2, 2] += err_b
|
||||
if y + 1 < height:
|
||||
if x > 0:
|
||||
img[y + 1, x - 1, 0] += err_r
|
||||
img[y + 1, x - 1, 1] += err_g
|
||||
img[y + 1, x - 1, 2] += err_b
|
||||
img[y + 1, x, 0] += err_r
|
||||
img[y + 1, x, 1] += err_g
|
||||
img[y + 1, x, 2] += err_b
|
||||
if x + 1 < width:
|
||||
img[y + 1, x + 1, 0] += err_r
|
||||
img[y + 1, x + 1, 1] += err_g
|
||||
img[y + 1, x + 1, 2] += err_b
|
||||
if y + 2 < height:
|
||||
img[y + 2, x, 0] += err_r
|
||||
img[y + 2, x, 1] += err_g
|
||||
img[y + 2, x, 2] += err_b
|
||||
|
||||
|
||||
def _dither_atkinson(image: Image.Image, show_progress: bool = True) -> np.ndarray:
|
||||
"""Atkinson-dither to the e-ink palette and return a uint8 array of palette indices."""
|
||||
img = np.array(image.convert('RGB'), dtype=np.float64)
|
||||
height, width = img.shape[:2]
|
||||
indices = np.zeros((height, width), dtype=np.uint8)
|
||||
if show_progress:
|
||||
print("Dithering...")
|
||||
progress = ProgressBar(height, desc="Dithering")
|
||||
|
||||
chunk_size = 48
|
||||
for i in range((height + chunk_size - 1) // chunk_size):
|
||||
start, end = i * chunk_size, min((i + 1) * chunk_size, height)
|
||||
_atkinson_dither_rows(img, PALETTE_RGB, PERCEPTUAL_WEIGHTS, indices, start, end)
|
||||
if show_progress:
|
||||
progress.set(end)
|
||||
|
||||
return indices
|
||||
|
||||
|
||||
class EPD:
|
||||
def __init__(self):
|
||||
self.reset_pin = epdconfig.RST_PIN
|
||||
self.dc_pin = epdconfig.DC_PIN
|
||||
self.busy_pin = epdconfig.BUSY_PIN
|
||||
self.cs_pin = epdconfig.CS_PIN
|
||||
self.width = EPD_WIDTH
|
||||
self.height = EPD_HEIGHT
|
||||
|
||||
def reset(self):
|
||||
epdconfig.digital_write(self.reset_pin, 1)
|
||||
epdconfig.delay_ms(20)
|
||||
epdconfig.digital_write(self.reset_pin, 0)
|
||||
epdconfig.delay_ms(2)
|
||||
epdconfig.digital_write(self.reset_pin, 1)
|
||||
epdconfig.delay_ms(20)
|
||||
|
||||
def send_command(self, command):
|
||||
epdconfig.digital_write(self.dc_pin, 0)
|
||||
epdconfig.digital_write(self.cs_pin, 0)
|
||||
epdconfig.spi_writebyte([command])
|
||||
epdconfig.digital_write(self.cs_pin, 1)
|
||||
|
||||
def send_data(self, data):
|
||||
epdconfig.digital_write(self.dc_pin, 1)
|
||||
epdconfig.digital_write(self.cs_pin, 0)
|
||||
epdconfig.spi_writebyte([data])
|
||||
epdconfig.digital_write(self.cs_pin, 1)
|
||||
|
||||
def send_data2(self, data):
|
||||
epdconfig.digital_write(self.dc_pin, 1)
|
||||
epdconfig.digital_write(self.cs_pin, 0)
|
||||
epdconfig.spi_writebyte2(data)
|
||||
epdconfig.digital_write(self.cs_pin, 1)
|
||||
|
||||
def wait_busy(self):
|
||||
while epdconfig.digital_read(self.busy_pin) == 0:
|
||||
epdconfig.delay_ms(5)
|
||||
|
||||
def turn_on_display(self):
|
||||
self.send_command(0x04) # POWER_ON
|
||||
self.wait_busy()
|
||||
self.send_command(0x12) # DISPLAY_REFRESH
|
||||
self.send_data(0x00)
|
||||
self.wait_busy()
|
||||
self.send_command(0x02) # POWER_OFF
|
||||
self.send_data(0x00)
|
||||
self.wait_busy()
|
||||
|
||||
def init(self):
|
||||
if epdconfig.module_init() != 0:
|
||||
return -1
|
||||
self.reset()
|
||||
self.wait_busy()
|
||||
epdconfig.delay_ms(30)
|
||||
|
||||
self.send_command(0xAA)
|
||||
for v in [0x49, 0x55, 0x20, 0x08, 0x09, 0x18]:
|
||||
self.send_data(v)
|
||||
|
||||
self.send_command(0x01)
|
||||
self.send_data(0x3F)
|
||||
|
||||
self.send_command(0x00)
|
||||
self.send_data(0x5F)
|
||||
self.send_data(0x69)
|
||||
|
||||
self.send_command(0x03)
|
||||
for v in [0x00, 0x54, 0x00, 0x44]:
|
||||
self.send_data(v)
|
||||
|
||||
self.send_command(0x05)
|
||||
for v in [0x40, 0x1F, 0x1F, 0x2C]:
|
||||
self.send_data(v)
|
||||
|
||||
self.send_command(0x06)
|
||||
for v in [0x6F, 0x1F, 0x17, 0x49]:
|
||||
self.send_data(v)
|
||||
|
||||
self.send_command(0x08)
|
||||
for v in [0x6F, 0x1F, 0x1F, 0x22]:
|
||||
self.send_data(v)
|
||||
|
||||
self.send_command(0x30)
|
||||
self.send_data(0x03)
|
||||
|
||||
self.send_command(0x50)
|
||||
self.send_data(0x3F)
|
||||
|
||||
self.send_command(0x60)
|
||||
self.send_data(0x02)
|
||||
self.send_data(0x00)
|
||||
|
||||
self.send_command(0x61)
|
||||
for v in [0x03, 0x20, 0x01, 0xE0]:
|
||||
self.send_data(v)
|
||||
|
||||
self.send_command(0x84)
|
||||
self.send_data(0x01)
|
||||
|
||||
self.send_command(0xE3)
|
||||
self.send_data(0x2F)
|
||||
|
||||
self.send_command(0x04)
|
||||
self.wait_busy()
|
||||
return 0
|
||||
|
||||
def getbuffer(self, image, saturation: float, contrast: float, gamma: float,
|
||||
enhance: bool = True, show_progress: bool = True):
|
||||
image = image.convert('RGB')
|
||||
imwidth, imheight = image.size
|
||||
|
||||
if imwidth != self.width or imheight != self.height:
|
||||
if show_progress:
|
||||
print(f"Input: {imwidth}x{imheight} → {self.width}x{self.height}")
|
||||
image = _crop_center(image, self.width, self.height, show_progress)
|
||||
|
||||
if enhance:
|
||||
if show_progress:
|
||||
print("Enhancing...")
|
||||
image = _enhance_for_eink(image, saturation, contrast, gamma)
|
||||
|
||||
indices = _dither_atkinson(image, show_progress)
|
||||
|
||||
if show_progress:
|
||||
print("Packing buffer...")
|
||||
flat = indices.reshape(-1)
|
||||
packed = (flat[0::2].astype(np.uint8) << 4) | flat[1::2].astype(np.uint8)
|
||||
buf = packed.tolist()
|
||||
|
||||
if show_progress:
|
||||
print("Ready")
|
||||
return buf
|
||||
|
||||
def display(self, image):
|
||||
self.send_command(0x10)
|
||||
self.send_data2(image)
|
||||
self.turn_on_display()
|
||||
|
||||
def Clear(self, color=0x11):
|
||||
self.send_command(0x10)
|
||||
self.send_data2([color] * (self.height * self.width // 2))
|
||||
self.turn_on_display()
|
||||
|
||||
def sleep(self):
|
||||
self.send_command(0x07) # DEEP_SLEEP
|
||||
self.send_data(0xA5)
|
||||
epdconfig.delay_ms(2000)
|
||||
epdconfig.module_exit()
|
||||
#!/usr/bin/env python3
|
||||
# Waveshare 7.3" 6-color e-Paper driver (modified)
|
||||
|
||||
import numpy as np
|
||||
import cv2
|
||||
from PIL import Image, ImageEnhance
|
||||
from numba import jit
|
||||
from progress import ProgressBar
|
||||
from overlay import render_text_into_indices
|
||||
from . import epdconfig
|
||||
|
||||
EPD_WIDTH = 800
|
||||
EPD_HEIGHT = 480
|
||||
|
||||
# 6-color e-ink encoding: indices 0,1,2,3,5,6 are wire-format colors;
|
||||
# 4 is reserved/unused (filled with BLACK so nearest-color never picks it).
|
||||
PALETTE_RGB = np.array([
|
||||
[0, 0, 0], # 0: BLACK
|
||||
[255, 255, 255], # 1: WHITE
|
||||
[255, 255, 0], # 2: YELLOW
|
||||
[255, 0, 0], # 3: RED
|
||||
[0, 0, 0], # 4: unused
|
||||
[0, 0, 255], # 5: BLUE
|
||||
[0, 255, 0], # 6: GREEN
|
||||
], dtype=np.float64)
|
||||
|
||||
PERCEPTUAL_WEIGHTS = np.array([0.299, 0.587, 0.114], dtype=np.float64)
|
||||
|
||||
INIT_SEQUENCE = (
|
||||
(0xAA, [0x49, 0x55, 0x20, 0x08, 0x09, 0x18]),
|
||||
(0x01, [0x3F]),
|
||||
(0x00, [0x5F, 0x69]),
|
||||
(0x03, [0x00, 0x54, 0x00, 0x44]),
|
||||
(0x05, [0x40, 0x1F, 0x1F, 0x2C]),
|
||||
(0x06, [0x6F, 0x1F, 0x17, 0x49]),
|
||||
(0x08, [0x6F, 0x1F, 0x1F, 0x22]),
|
||||
(0x30, [0x03]),
|
||||
(0x50, [0x3F]),
|
||||
(0x60, [0x02, 0x00]),
|
||||
(0x61, [0x03, 0x20, 0x01, 0xE0]),
|
||||
(0x84, [0x01]),
|
||||
(0xE3, [0x2F]),
|
||||
)
|
||||
|
||||
|
||||
def _enhance_for_eink(image: Image.Image, saturation: float,
|
||||
contrast: float, gamma: float) -> Image.Image:
|
||||
img = image.convert('RGB')
|
||||
if saturation != 1.0:
|
||||
img = ImageEnhance.Color(img).enhance(saturation)
|
||||
if contrast != 1.0:
|
||||
img = ImageEnhance.Contrast(img).enhance(contrast)
|
||||
if gamma != 1.0:
|
||||
lut = [int((i / 255.0) ** (1.0 / gamma) * 255) for i in range(256)] * 3
|
||||
img = img.point(lut)
|
||||
return img
|
||||
|
||||
|
||||
def _crop_center(image: Image.Image, target_w: int, target_h: int) -> Image.Image:
|
||||
print("Center cropping...")
|
||||
img_cv = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
|
||||
img_h, img_w = img_cv.shape[:2]
|
||||
img_aspect, target_aspect = img_w / img_h, target_w / target_h
|
||||
|
||||
if img_aspect < target_aspect:
|
||||
new_w, new_h = target_w, int(target_w / img_aspect)
|
||||
else:
|
||||
new_w, new_h = int(target_h * img_aspect), target_h
|
||||
|
||||
img_cv = cv2.resize(img_cv, (new_w, new_h), interpolation=cv2.INTER_LANCZOS4)
|
||||
x_off = (new_w - target_w) // 2
|
||||
y_off = (new_h - target_h) // 2
|
||||
cropped = img_cv[y_off:y_off + target_h, x_off:x_off + target_w]
|
||||
return Image.fromarray(cv2.cvtColor(cropped, cv2.COLOR_BGR2RGB))
|
||||
|
||||
|
||||
@jit(nopython=True, cache=True)
|
||||
def _find_nearest_color(r, g, b, palette, weights):
|
||||
best_idx, best_dist = 0, 1e10
|
||||
for i in range(palette.shape[0]):
|
||||
dr = (palette[i, 0] - r) * weights[0]
|
||||
dg = (palette[i, 1] - g) * weights[1]
|
||||
db = (palette[i, 2] - b) * weights[2]
|
||||
dist = dr * dr + dg * dg + db * db
|
||||
if dist < best_dist:
|
||||
best_dist, best_idx = dist, i
|
||||
return best_idx
|
||||
|
||||
|
||||
@jit(nopython=True, cache=True)
|
||||
def _atkinson_dither_rows(img, palette, weights, indices, start_row, end_row):
|
||||
height, width = img.shape[:2]
|
||||
for y in range(start_row, end_row):
|
||||
for x in range(width):
|
||||
old_r, old_g, old_b = img[y, x, 0], img[y, x, 1], img[y, x, 2]
|
||||
idx = _find_nearest_color(old_r, old_g, old_b, palette, weights)
|
||||
indices[y, x] = idx
|
||||
new_r, new_g, new_b = palette[idx, 0], palette[idx, 1], palette[idx, 2]
|
||||
|
||||
err_r, err_g, err_b = (old_r - new_r) / 8.0, (old_g - new_g) / 8.0, (old_b - new_b) / 8.0
|
||||
|
||||
if x + 1 < width:
|
||||
img[y, x + 1, 0] += err_r
|
||||
img[y, x + 1, 1] += err_g
|
||||
img[y, x + 1, 2] += err_b
|
||||
if x + 2 < width:
|
||||
img[y, x + 2, 0] += err_r
|
||||
img[y, x + 2, 1] += err_g
|
||||
img[y, x + 2, 2] += err_b
|
||||
if y + 1 < height:
|
||||
if x > 0:
|
||||
img[y + 1, x - 1, 0] += err_r
|
||||
img[y + 1, x - 1, 1] += err_g
|
||||
img[y + 1, x - 1, 2] += err_b
|
||||
img[y + 1, x, 0] += err_r
|
||||
img[y + 1, x, 1] += err_g
|
||||
img[y + 1, x, 2] += err_b
|
||||
if x + 1 < width:
|
||||
img[y + 1, x + 1, 0] += err_r
|
||||
img[y + 1, x + 1, 1] += err_g
|
||||
img[y + 1, x + 1, 2] += err_b
|
||||
if y + 2 < height:
|
||||
img[y + 2, x, 0] += err_r
|
||||
img[y + 2, x, 1] += err_g
|
||||
img[y + 2, x, 2] += err_b
|
||||
|
||||
|
||||
def _dither_atkinson(image: Image.Image) -> np.ndarray:
|
||||
"""Atkinson-dither to the e-ink palette and return a uint8 array of palette indices."""
|
||||
img = np.array(image.convert('RGB'), dtype=np.float64)
|
||||
height, width = img.shape[:2]
|
||||
indices = np.zeros((height, width), dtype=np.uint8)
|
||||
print("Dithering...")
|
||||
progress = ProgressBar(height, desc="Dithering")
|
||||
|
||||
chunk_size = 48
|
||||
for i in range((height + chunk_size - 1) // chunk_size):
|
||||
start, end = i * chunk_size, min((i + 1) * chunk_size, height)
|
||||
_atkinson_dither_rows(img, PALETTE_RGB, PERCEPTUAL_WEIGHTS, indices, start, end)
|
||||
progress.set(end)
|
||||
|
||||
return indices
|
||||
|
||||
|
||||
class EPD:
|
||||
def __init__(self):
|
||||
self.reset_pin = epdconfig.RST_PIN
|
||||
self.dc_pin = epdconfig.DC_PIN
|
||||
self.busy_pin = epdconfig.BUSY_PIN
|
||||
self.cs_pin = epdconfig.CS_PIN
|
||||
self.width = EPD_WIDTH
|
||||
self.height = EPD_HEIGHT
|
||||
|
||||
def reset(self):
|
||||
epdconfig.digital_write(self.reset_pin, 1)
|
||||
epdconfig.delay_ms(20)
|
||||
epdconfig.digital_write(self.reset_pin, 0)
|
||||
epdconfig.delay_ms(2)
|
||||
epdconfig.digital_write(self.reset_pin, 1)
|
||||
epdconfig.delay_ms(20)
|
||||
|
||||
def send_command(self, command):
|
||||
epdconfig.digital_write(self.dc_pin, 0)
|
||||
epdconfig.digital_write(self.cs_pin, 0)
|
||||
epdconfig.spi_writebyte([command])
|
||||
epdconfig.digital_write(self.cs_pin, 1)
|
||||
|
||||
def send_data(self, data):
|
||||
epdconfig.digital_write(self.dc_pin, 1)
|
||||
epdconfig.digital_write(self.cs_pin, 0)
|
||||
epdconfig.spi_writebyte([data])
|
||||
epdconfig.digital_write(self.cs_pin, 1)
|
||||
|
||||
def send_data2(self, data):
|
||||
epdconfig.digital_write(self.dc_pin, 1)
|
||||
epdconfig.digital_write(self.cs_pin, 0)
|
||||
epdconfig.spi_writebyte2(data)
|
||||
epdconfig.digital_write(self.cs_pin, 1)
|
||||
|
||||
def wait_busy(self):
|
||||
while epdconfig.digital_read(self.busy_pin) == 0:
|
||||
epdconfig.delay_ms(5)
|
||||
|
||||
def turn_on_display(self):
|
||||
self.send_command(0x04) # POWER_ON
|
||||
self.wait_busy()
|
||||
self.send_command(0x12) # DISPLAY_REFRESH
|
||||
self.send_data(0x00)
|
||||
self.wait_busy()
|
||||
self.send_command(0x02) # POWER_OFF
|
||||
self.send_data(0x00)
|
||||
self.wait_busy()
|
||||
|
||||
def init(self):
|
||||
epdconfig.module_init()
|
||||
self.reset()
|
||||
self.wait_busy()
|
||||
epdconfig.delay_ms(30)
|
||||
|
||||
for cmd, data in INIT_SEQUENCE:
|
||||
self.send_command(cmd)
|
||||
for v in data:
|
||||
self.send_data(v)
|
||||
|
||||
self.send_command(0x04)
|
||||
self.wait_busy()
|
||||
|
||||
def getbuffer(self, image, saturation: float, contrast: float, gamma: float,
|
||||
left_text: str | None = None, right_text: str | None = None,
|
||||
orientation: int = 0):
|
||||
image = image.convert('RGB')
|
||||
if image.size != (self.width, self.height):
|
||||
print(f"Input: {image.size[0]}x{image.size[1]} → {self.width}x{self.height}")
|
||||
image = _crop_center(image, self.width, self.height)
|
||||
|
||||
print("Enhancing...")
|
||||
image = _enhance_for_eink(image, saturation, contrast, gamma)
|
||||
|
||||
indices = _dither_atkinson(image)
|
||||
|
||||
if left_text or right_text:
|
||||
print("Rendering overlay...")
|
||||
render_text_into_indices(indices, left_text, right_text, orientation)
|
||||
|
||||
print("Packing buffer...")
|
||||
flat = indices.reshape(-1)
|
||||
return ((flat[0::2].astype(np.uint8) << 4) | flat[1::2].astype(np.uint8)).tolist()
|
||||
|
||||
def display(self, image):
|
||||
self.send_command(0x10)
|
||||
self.send_data2(image)
|
||||
self.turn_on_display()
|
||||
|
||||
def sleep(self):
|
||||
self.send_command(0x07) # DEEP_SLEEP
|
||||
self.send_data(0xA5)
|
||||
epdconfig.delay_ms(2000)
|
||||
epdconfig.module_exit()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue