Retries and simplification
This commit is contained in:
parent
39a7d9546e
commit
cd70c4cdca
6 changed files with 391 additions and 345 deletions
15
CLAUDE.md
15
CLAUDE.md
|
|
@ -30,18 +30,20 @@ python3 display.py --saturation 1.5 --contrast 1.1 --gamma 0.85
|
||||||
4. Sends to e-ink display driver
|
4. Sends to e-ink display driver
|
||||||
|
|
||||||
**`src/lib/immich.py`** — Immich API client. Key behaviors:
|
**`src/lib/immich.py`** — Immich API client. Key behaviors:
|
||||||
- `PhotoHistory` tracks displayed photos in `photo_history.json` to avoid repeats (resets after 7 days)
|
- `PhotoHistory` tracks displayed photos in `photo_history.json` to avoid repeats (resets after 7 days). Asset is only marked displayed after a successful download.
|
||||||
- `_pick_weighted_random()` biases selection: 50% chance favorites, 50% chance recent (last 7 days), otherwise random
|
- `_pick_weighted_random()` biases selection: 20% favorites, 50% recently-added (last 30 days, by Immich `createdAt`), otherwise uniform random
|
||||||
- Filters photos by orientation (portrait/landscape) based on EXIF data including rotation tags
|
- Filters photos by orientation (portrait/landscape) based on EXIF data including rotation tags. Raises if nothing matches the requested orientation.
|
||||||
- Downloads preview-size thumbnails, not originals
|
- Downloads preview-size thumbnails, not originals
|
||||||
|
- Asset lists (people-search and album) are cached on disk in `/tmp/frame_cache/` for 1 hour
|
||||||
|
- `urlopen` calls retry transient failures twice (3s, 10s backoff)
|
||||||
|
|
||||||
**`src/lib/homeassistant.py`** — Simple Home Assistant REST client for presence detection.
|
**`src/lib/homeassistant.py`** — Simple Home Assistant REST client for presence detection.
|
||||||
|
|
||||||
**`src/lib/waveshare_epd/epd7in3e.py`** — Modified Waveshare driver. The `getbuffer()` method handles the full image pipeline:
|
**`src/lib/waveshare_epd/epd7in3e.py`** — Modified Waveshare driver. The `getbuffer()` method handles the full image pipeline:
|
||||||
- Center-crops to 800x480 (or 480x800)
|
- Center-crops to 800x480 (or 480x800)
|
||||||
- Enhances saturation/contrast/gamma for e-ink (defaults: saturation=1.4, contrast=1.2, gamma=0.9)
|
- Enhances saturation/contrast/gamma for e-ink (caller passes values; CLI defaults live in `display.py`: saturation=1.3, contrast=1.05, gamma=0.90)
|
||||||
- Atkinson dithering to 6-color palette using numba JIT
|
- Atkinson dithering to 6-color palette using numba JIT; produces palette indices directly (no Pillow quantize round-trip)
|
||||||
- Packs into 4-bit-per-pixel buffer (two pixels per byte)
|
- Packs into 4-bit-per-pixel buffer (two pixels per byte) via numpy
|
||||||
|
|
||||||
**`src/lib/waveshare_epd/epdconfig.py`** — GPIO/SPI hardware config. **Critical: PWR pin is BCM 27** (not default 18).
|
**`src/lib/waveshare_epd/epdconfig.py`** — GPIO/SPI hardware config. **Critical: PWR pin is BCM 27** (not default 18).
|
||||||
|
|
||||||
|
|
@ -55,4 +57,5 @@ python3 display.py --saturation 1.5 --contrast 1.1 --gamma 0.85
|
||||||
- **Dependencies on Pi**: `python3-pil python3-opencv python3-numba python3-smbus spidev gpiozero`
|
- **Dependencies on Pi**: `python3-pil python3-opencv python3-numba python3-smbus spidev gpiozero`
|
||||||
- **Config via environment variables**: `IMMICH_URL`, `IMMICH_API_KEY`, `HA_URL`, `HA_TOKEN` (with hardcoded defaults in display.py)
|
- **Config via environment variables**: `IMMICH_URL`, `IMMICH_API_KEY`, `HA_URL`, `HA_TOKEN` (with hardcoded defaults in display.py)
|
||||||
- **Uses only stdlib `urllib`** — no requests library; the Immich client uses `urllib.request` directly
|
- **Uses only stdlib `urllib`** — no requests library; the Immich client uses `urllib.request` directly
|
||||||
|
- **Single-instance lock** at `/tmp/frame.lock` (fcntl) — overlapping cron runs exit cleanly
|
||||||
- `sys.path.append` is used to add `lib/` to the path from display.py
|
- `sys.path.append` is used to add `lib/` to the path from display.py
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
import argparse
|
import argparse
|
||||||
|
import fcntl
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
@ -12,6 +13,8 @@ from waveshare_epd import epd7in3e
|
||||||
from immich import ImmichClient, get_random_photo_of_people, get_random_photo_from_album
|
from immich import ImmichClient, get_random_photo_of_people, get_random_photo_from_album
|
||||||
from homeassistant import HomeAssistantClient
|
from homeassistant import HomeAssistantClient
|
||||||
|
|
||||||
|
LOCK_FILE = "/tmp/frame.lock"
|
||||||
|
|
||||||
IMMICH_URL = os.environ.get("IMMICH_URL", "https://immich.schmelczer.dev")
|
IMMICH_URL = os.environ.get("IMMICH_URL", "https://immich.schmelczer.dev")
|
||||||
IMMICH_API_KEY = os.environ.get("IMMICH_API_KEY", "6crxVS1JLTJxsfGlzVhN2kefdL4EP7HPkkoMk9L6ZOE")
|
IMMICH_API_KEY = os.environ.get("IMMICH_API_KEY", "6crxVS1JLTJxsfGlzVhN2kefdL4EP7HPkkoMk9L6ZOE")
|
||||||
|
|
||||||
|
|
@ -51,6 +54,13 @@ def main() -> None:
|
||||||
parser.add_argument("--no-enhance", action="store_true")
|
parser.add_argument("--no-enhance", action="store_true")
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
lock_fd = open(LOCK_FILE, "w")
|
||||||
|
try:
|
||||||
|
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||||
|
except BlockingIOError:
|
||||||
|
print("Another instance running, skipping")
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
now = datetime.now()
|
now = datetime.now()
|
||||||
print(f"Time: {now.strftime('%H:%M')}")
|
print(f"Time: {now.strftime('%H:%M')}")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,25 +1,37 @@
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
import json
|
import json
|
||||||
from urllib.request import Request, urlopen
|
import time
|
||||||
|
from urllib.error import URLError
|
||||||
|
from urllib.request import Request, urlopen
|
||||||
class HomeAssistantClient:
|
|
||||||
def __init__(self, base_url: str, token: str):
|
RETRY_DELAYS = (3, 10)
|
||||||
self.base_url = base_url.rstrip("/")
|
|
||||||
self.token = token
|
|
||||||
|
class HomeAssistantClient:
|
||||||
def get_state(self, entity_id: str) -> dict:
|
def __init__(self, base_url: str, token: str):
|
||||||
url = f"{self.base_url}/api/states/{entity_id}"
|
self.base_url = base_url.rstrip("/")
|
||||||
req = Request(url, headers={
|
self.token = token
|
||||||
"Authorization": f"Bearer {self.token}",
|
|
||||||
"Content-Type": "application/json",
|
def get_state(self, entity_id: str) -> dict:
|
||||||
})
|
url = f"{self.base_url}/api/states/{entity_id}"
|
||||||
with urlopen(req, timeout=30) as resp:
|
req = Request(url, headers={
|
||||||
return json.loads(resp.read().decode())
|
"Authorization": f"Bearer {self.token}",
|
||||||
|
"Content-Type": "application/json",
|
||||||
def is_person_home(self, entity_id: str) -> bool:
|
})
|
||||||
try:
|
last_err: Exception | None = None
|
||||||
return self.get_state(entity_id).get("state") == "home"
|
for attempt in range(len(RETRY_DELAYS) + 1):
|
||||||
except Exception as e:
|
try:
|
||||||
print(f"Failed to check {entity_id}: {e}")
|
with urlopen(req, timeout=30) as resp:
|
||||||
return False
|
return json.loads(resp.read().decode())
|
||||||
|
except (URLError, TimeoutError) as e:
|
||||||
|
last_err = e
|
||||||
|
if attempt < len(RETRY_DELAYS):
|
||||||
|
time.sleep(RETRY_DELAYS[attempt])
|
||||||
|
raise last_err
|
||||||
|
|
||||||
|
def is_person_home(self, entity_id: str) -> bool:
|
||||||
|
try:
|
||||||
|
return self.get_state(entity_id).get("state") == "home"
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Failed to check {entity_id}: {e}")
|
||||||
|
return False
|
||||||
|
|
|
||||||
|
|
@ -1,260 +1,307 @@
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
import json
|
import hashlib
|
||||||
import random
|
import json
|
||||||
import tempfile
|
import random
|
||||||
from dataclasses import dataclass
|
import tempfile
|
||||||
from datetime import datetime, timedelta, timezone
|
import time
|
||||||
from pathlib import Path
|
from dataclasses import dataclass
|
||||||
from urllib.request import Request, urlopen
|
from datetime import datetime, timedelta, timezone
|
||||||
|
from pathlib import Path
|
||||||
from progress import ProgressBar
|
from urllib.error import URLError
|
||||||
|
from urllib.request import Request, urlopen
|
||||||
HISTORY_FILE = Path(__file__).parent.parent / "photo_history.json"
|
|
||||||
HISTORY_MAX_AGE_DAYS = 7
|
from progress import ProgressBar
|
||||||
|
|
||||||
|
HISTORY_FILE = Path(__file__).parent.parent / "photo_history.json"
|
||||||
class PhotoHistory:
|
HISTORY_MAX_AGE_DAYS = 7
|
||||||
"""Track displayed photos to avoid repeats. Clears after 7 days."""
|
|
||||||
|
CACHE_DIR = Path(tempfile.gettempdir()) / "frame_cache"
|
||||||
def __init__(self, path: Path = HISTORY_FILE):
|
CACHE_TTL_SECONDS = 3600
|
||||||
self.path = path
|
|
||||||
self.displayed: set[str] = set()
|
RETRY_DELAYS = (3, 10)
|
||||||
self.created_at: datetime | None = None
|
|
||||||
self._load()
|
|
||||||
|
def _urlopen_with_retry(req: Request, timeout: int = 30):
|
||||||
def _load(self) -> None:
|
"""urlopen wrapper that retries transient network failures."""
|
||||||
if not self.path.exists():
|
last_err: Exception | None = None
|
||||||
self._reset()
|
for attempt in range(len(RETRY_DELAYS) + 1):
|
||||||
return
|
try:
|
||||||
try:
|
return urlopen(req, timeout=timeout)
|
||||||
data = json.loads(self.path.read_text())
|
except (URLError, TimeoutError) as e:
|
||||||
self.created_at = datetime.fromisoformat(data.get("created_at", ""))
|
last_err = e
|
||||||
if self.created_at.tzinfo is None:
|
if attempt < len(RETRY_DELAYS):
|
||||||
self.created_at = self.created_at.replace(tzinfo=timezone.utc)
|
time.sleep(RETRY_DELAYS[attempt])
|
||||||
if datetime.now(timezone.utc) - self.created_at > timedelta(days=HISTORY_MAX_AGE_DAYS):
|
raise last_err
|
||||||
print(f"Photo history expired (>{HISTORY_MAX_AGE_DAYS} days), clearing...")
|
|
||||||
self._reset()
|
|
||||||
else:
|
def _cache_get(key: str) -> list[dict] | None:
|
||||||
self.displayed = set(data.get("displayed", []))
|
path = CACHE_DIR / f"{key}.json"
|
||||||
except (json.JSONDecodeError, ValueError, KeyError):
|
if not path.exists():
|
||||||
self._reset()
|
return None
|
||||||
|
if time.time() - path.stat().st_mtime > CACHE_TTL_SECONDS:
|
||||||
def _reset(self) -> None:
|
return None
|
||||||
self.displayed = set()
|
try:
|
||||||
self.created_at = datetime.now(timezone.utc)
|
return json.loads(path.read_text())
|
||||||
self._save()
|
except (json.JSONDecodeError, OSError):
|
||||||
|
return None
|
||||||
def _save(self) -> None:
|
|
||||||
self.path.write_text(json.dumps({
|
|
||||||
"created_at": self.created_at.isoformat(),
|
def _cache_set(key: str, value: list[dict]) -> None:
|
||||||
"displayed": list(self.displayed),
|
CACHE_DIR.mkdir(exist_ok=True)
|
||||||
}, indent=2))
|
(CACHE_DIR / f"{key}.json").write_text(json.dumps(value))
|
||||||
|
|
||||||
def mark_displayed(self, asset_id: str) -> None:
|
|
||||||
self.displayed.add(asset_id)
|
class PhotoHistory:
|
||||||
self._save()
|
"""Track displayed photos to avoid repeats. Clears after 7 days."""
|
||||||
|
|
||||||
def filter_new(self, assets: list[dict]) -> list[dict]:
|
def __init__(self, path: Path = HISTORY_FILE):
|
||||||
return [a for a in assets if a.get("id") not in self.displayed]
|
self.path = path
|
||||||
|
self.displayed: set[str] = set()
|
||||||
|
self.created_at: datetime | None = None
|
||||||
_history: PhotoHistory | None = None
|
self._load()
|
||||||
_people_cache: dict[str, str] = {} # name -> id cache
|
|
||||||
|
def _load(self) -> None:
|
||||||
|
if not self.path.exists():
|
||||||
def get_history() -> PhotoHistory:
|
self._reset()
|
||||||
global _history
|
return
|
||||||
if _history is None:
|
try:
|
||||||
_history = PhotoHistory()
|
data = json.loads(self.path.read_text())
|
||||||
return _history
|
self.created_at = datetime.fromisoformat(data.get("created_at", ""))
|
||||||
|
if self.created_at.tzinfo is None:
|
||||||
|
self.created_at = self.created_at.replace(tzinfo=timezone.utc)
|
||||||
@dataclass
|
if datetime.now(timezone.utc) - self.created_at > timedelta(days=HISTORY_MAX_AGE_DAYS):
|
||||||
class ImmichClient:
|
print(f"Photo history expired (>{HISTORY_MAX_AGE_DAYS} days), clearing...")
|
||||||
base_url: str
|
self._reset()
|
||||||
api_key: str
|
else:
|
||||||
|
self.displayed = set(data.get("displayed", []))
|
||||||
def _request(self, method: str, endpoint: str, data: dict | None = None,
|
except (json.JSONDecodeError, ValueError, KeyError):
|
||||||
show_progress: bool = False, progress_desc: str = "Fetching") -> dict:
|
self._reset()
|
||||||
url = f"{self.base_url.rstrip('/')}/api/{endpoint.lstrip('/')}"
|
|
||||||
headers = {"x-api-key": self.api_key}
|
def _reset(self) -> None:
|
||||||
body = None
|
self.displayed = set()
|
||||||
if data is not None:
|
self.created_at = datetime.now(timezone.utc)
|
||||||
headers["Content-Type"] = "application/json"
|
self._save()
|
||||||
body = json.dumps(data).encode()
|
|
||||||
|
def _save(self) -> None:
|
||||||
req = Request(url, data=body, headers=headers, method=method)
|
self.path.write_text(json.dumps({
|
||||||
with urlopen(req, timeout=30) as resp:
|
"created_at": self.created_at.isoformat(),
|
||||||
total_size = resp.headers.get('Content-Length')
|
"displayed": list(self.displayed),
|
||||||
if total_size and show_progress:
|
}, indent=2))
|
||||||
total_size = int(total_size)
|
|
||||||
progress = ProgressBar(total_size, desc=progress_desc)
|
def mark_displayed(self, asset_id: str) -> None:
|
||||||
chunks = bytearray()
|
self.displayed.add(asset_id)
|
||||||
while chunk := resp.read(8192):
|
self._save()
|
||||||
chunks.extend(chunk)
|
|
||||||
progress.update(len(chunk))
|
def filter_new(self, assets: list[dict]) -> list[dict]:
|
||||||
progress.finish()
|
return [a for a in assets if a.get("id") not in self.displayed]
|
||||||
return json.loads(chunks.decode())
|
|
||||||
return json.loads(resp.read().decode())
|
|
||||||
|
_history: PhotoHistory | None = None
|
||||||
def get_people(self) -> list[dict]:
|
|
||||||
return self._request("GET", "/people")["people"]
|
|
||||||
|
def get_history() -> PhotoHistory:
|
||||||
def get_person_id(self, name: str) -> str | None:
|
global _history
|
||||||
for person in self.get_people():
|
if _history is None:
|
||||||
if person["name"].lower() == name.lower():
|
_history = PhotoHistory()
|
||||||
return person["id"]
|
return _history
|
||||||
return None
|
|
||||||
|
|
||||||
def search_assets_by_people(self, person_ids: list[str]) -> list[dict]:
|
@dataclass
|
||||||
items = []
|
class ImmichClient:
|
||||||
page = 1
|
base_url: str
|
||||||
while True:
|
api_key: str
|
||||||
result = self._request("POST", "/search/metadata", {
|
|
||||||
"personIds": person_ids,
|
def _request(self, method: str, endpoint: str, data: dict | None = None,
|
||||||
"size": 250,
|
show_progress: bool = False, progress_desc: str = "Fetching") -> dict:
|
||||||
"page": page,
|
url = f"{self.base_url.rstrip('/')}/api/{endpoint.lstrip('/')}"
|
||||||
"type": "IMAGE",
|
headers = {"x-api-key": self.api_key}
|
||||||
"withExif": True,
|
body = None
|
||||||
})
|
if data is not None:
|
||||||
batch = result.get("assets", {}).get("items", [])
|
headers["Content-Type"] = "application/json"
|
||||||
items.extend(batch)
|
body = json.dumps(data).encode()
|
||||||
if not batch or not result.get("assets", {}).get("nextPage"):
|
|
||||||
break
|
req = Request(url, data=body, headers=headers, method=method)
|
||||||
page += 1
|
with _urlopen_with_retry(req, timeout=30) as resp:
|
||||||
return items
|
total_size = resp.headers.get('Content-Length')
|
||||||
|
if total_size and show_progress:
|
||||||
def download_asset(self, asset_id: str, dest: Path, show_progress: bool = True) -> Path:
|
total_size = int(total_size)
|
||||||
url = f"{self.base_url.rstrip('/')}/api/assets/{asset_id}/thumbnail?size=preview"
|
progress = ProgressBar(total_size, desc=progress_desc)
|
||||||
req = Request(url, headers={"x-api-key": self.api_key})
|
chunks = bytearray()
|
||||||
with urlopen(req, timeout=30) as resp:
|
while chunk := resp.read(8192):
|
||||||
total_size = resp.headers.get('Content-Length')
|
chunks.extend(chunk)
|
||||||
if total_size and show_progress:
|
progress.update(len(chunk))
|
||||||
total_size = int(total_size)
|
progress.finish()
|
||||||
progress = ProgressBar(total_size, desc="Downloading")
|
return json.loads(chunks.decode())
|
||||||
data = bytearray()
|
return json.loads(resp.read().decode())
|
||||||
while chunk := resp.read(8192):
|
|
||||||
data.extend(chunk)
|
def get_people(self) -> list[dict]:
|
||||||
progress.update(len(chunk))
|
return self._request("GET", "/people")["people"]
|
||||||
progress.finish()
|
|
||||||
dest.write_bytes(bytes(data))
|
def get_person_id(self, name: str) -> str | None:
|
||||||
else:
|
for person in self.get_people():
|
||||||
dest.write_bytes(resp.read())
|
if person["name"].lower() == name.lower():
|
||||||
return dest
|
return person["id"]
|
||||||
|
return None
|
||||||
def get_album_id(self, name: str) -> str | None:
|
|
||||||
for album in self._request("GET", "/albums"):
|
def search_assets_by_people(self, person_ids: list[str]) -> list[dict]:
|
||||||
if album["albumName"].lower() == name.lower():
|
key = "people_" + hashlib.md5("_".join(sorted(person_ids)).encode()).hexdigest()
|
||||||
return album["id"]
|
cached = _cache_get(key)
|
||||||
return None
|
if cached is not None:
|
||||||
|
return cached
|
||||||
def get_album_assets(self, album_id: str, show_progress: bool = False) -> list[dict]:
|
|
||||||
album = self._request("GET", f"/albums/{album_id}",
|
items = []
|
||||||
show_progress=show_progress, progress_desc="Fetching album")
|
page = 1
|
||||||
return album.get("assets", [])
|
while True:
|
||||||
|
result = self._request("POST", "/search/metadata", {
|
||||||
|
"personIds": person_ids,
|
||||||
def _is_portrait(asset: dict) -> bool | None:
|
"size": 250,
|
||||||
"""Check if asset displays as portrait, accounting for EXIF orientation."""
|
"page": page,
|
||||||
exif = asset.get("exifInfo") or {}
|
"type": "IMAGE",
|
||||||
width = exif.get("exifImageWidth") or 0
|
"withExif": True,
|
||||||
height = exif.get("exifImageHeight") or 0
|
})
|
||||||
if not (width and height):
|
batch = result.get("assets", {}).get("items", [])
|
||||||
return None
|
items.extend(batch)
|
||||||
# EXIF orientation 6 and 8 mean 90° rotation (swap dimensions)
|
if not batch or not result.get("assets", {}).get("nextPage"):
|
||||||
orientation = str(exif.get("orientation") or "1")
|
break
|
||||||
if orientation in ("6", "8"):
|
page += 1
|
||||||
width, height = height, width
|
_cache_set(key, items)
|
||||||
return height > width
|
return items
|
||||||
|
|
||||||
|
def download_asset(self, asset_id: str, dest: Path, show_progress: bool = True) -> Path:
|
||||||
def _filter_by_orientation(assets: list[dict], portrait: bool) -> list[dict]:
|
url = f"{self.base_url.rstrip('/')}/api/assets/{asset_id}/thumbnail?size=preview"
|
||||||
"""Filter assets by orientation, accounting for EXIF rotation."""
|
req = Request(url, headers={"x-api-key": self.api_key})
|
||||||
filtered = []
|
with _urlopen_with_retry(req, timeout=30) as resp:
|
||||||
no_dimensions = 0
|
total_size = resp.headers.get('Content-Length')
|
||||||
for asset in assets:
|
if total_size and show_progress:
|
||||||
is_portrait = _is_portrait(asset)
|
total_size = int(total_size)
|
||||||
if is_portrait is not None:
|
progress = ProgressBar(total_size, desc="Downloading")
|
||||||
if is_portrait == portrait:
|
data = bytearray()
|
||||||
filtered.append(asset)
|
while chunk := resp.read(8192):
|
||||||
else:
|
data.extend(chunk)
|
||||||
no_dimensions += 1
|
progress.update(len(chunk))
|
||||||
if no_dimensions:
|
progress.finish()
|
||||||
print(f"Note: {no_dimensions}/{len(assets)} photos missing dimension data")
|
dest.write_bytes(bytes(data))
|
||||||
return filtered
|
else:
|
||||||
|
dest.write_bytes(resp.read())
|
||||||
|
return dest
|
||||||
def _pick_weighted_random(assets: list[dict]) -> dict:
|
|
||||||
"""Pick random asset, biased towards favorites (20%) and recently added photos (50%)."""
|
def get_album_id(self, name: str) -> str | None:
|
||||||
if not assets:
|
for album in self._request("GET", "/albums"):
|
||||||
raise ValueError("No assets to choose from")
|
if album["albumName"].lower() == name.lower():
|
||||||
|
return album["id"]
|
||||||
cutoff = datetime.now(timezone.utc) - timedelta(days=30)
|
return None
|
||||||
favorites = [a for a in assets if a.get("isFavorite")]
|
|
||||||
recent = []
|
def get_album_assets(self, album_id: str, show_progress: bool = False) -> list[dict]:
|
||||||
for asset in assets:
|
key = f"album_{album_id}"
|
||||||
date_str = asset.get("createdAt", "")
|
cached = _cache_get(key)
|
||||||
try:
|
if cached is not None:
|
||||||
if datetime.fromisoformat(date_str.replace("Z", "+00:00")) >= cutoff:
|
return cached
|
||||||
recent.append(asset)
|
|
||||||
except (ValueError, AttributeError):
|
album = self._request("GET", f"/albums/{album_id}",
|
||||||
pass
|
show_progress=show_progress, progress_desc="Fetching album")
|
||||||
|
assets = album.get("assets", [])
|
||||||
if favorites and random.random() < 0.2:
|
_cache_set(key, assets)
|
||||||
return random.choice(favorites)
|
return assets
|
||||||
if recent and random.random() < 0.5:
|
|
||||||
return random.choice(recent)
|
|
||||||
return random.choice(assets)
|
def _is_portrait(asset: dict) -> bool | None:
|
||||||
|
"""Check if asset displays as portrait, accounting for EXIF orientation."""
|
||||||
|
exif = asset.get("exifInfo") or {}
|
||||||
def _download_random_asset(client: ImmichClient, assets: list[dict]) -> Path:
|
width = exif.get("exifImageWidth") or 0
|
||||||
history = get_history()
|
height = exif.get("exifImageHeight") or 0
|
||||||
new_assets = history.filter_new(assets)
|
if not (width and height):
|
||||||
|
return None
|
||||||
if new_assets:
|
# EXIF orientation 6 and 8 mean 90° rotation (swap dimensions)
|
||||||
print(f"Photos: {len(new_assets)} new / {len(assets)} total")
|
orientation = str(exif.get("orientation") or "1")
|
||||||
asset = _pick_weighted_random(new_assets)
|
if orientation in ("6", "8"):
|
||||||
else:
|
width, height = height, width
|
||||||
print(f"All {len(assets)} photos shown, picking from full list")
|
return height > width
|
||||||
asset = _pick_weighted_random(assets)
|
|
||||||
|
|
||||||
history.mark_displayed(asset["id"])
|
def _filter_by_orientation(assets: list[dict], portrait: bool) -> list[dict]:
|
||||||
dest = Path(tempfile.gettempdir()) / "immich_photo.jpg"
|
"""Filter assets by orientation, accounting for EXIF rotation."""
|
||||||
return client.download_asset(asset["id"], dest)
|
filtered = []
|
||||||
|
no_dimensions = 0
|
||||||
|
for asset in assets:
|
||||||
def get_random_photo_of_people(client: ImmichClient, names: list[str], orientation: int = 0) -> Path:
|
is_portrait = _is_portrait(asset)
|
||||||
person_ids = [pid for name in names if (pid := client.get_person_id(name))]
|
if is_portrait is not None:
|
||||||
if not person_ids:
|
if is_portrait == portrait:
|
||||||
raise ValueError(f"No people found: {names}")
|
filtered.append(asset)
|
||||||
|
else:
|
||||||
assets = client.search_assets_by_people(person_ids)
|
no_dimensions += 1
|
||||||
|
if no_dimensions:
|
||||||
if not assets:
|
print(f"Note: {no_dimensions}/{len(assets)} photos missing dimension data")
|
||||||
raise ValueError(f"No photos found for: {names}")
|
return filtered
|
||||||
|
|
||||||
portrait = orientation in (90, 270)
|
|
||||||
filtered = _filter_by_orientation(assets, portrait)
|
def _pick_weighted_random(assets: list[dict]) -> dict:
|
||||||
if filtered:
|
"""Pick random asset, biased towards favorites (20%) and recently added photos (50%)."""
|
||||||
assets = filtered
|
if not assets:
|
||||||
else:
|
raise ValueError("No assets to choose from")
|
||||||
print(f"No {'portrait' if portrait else 'landscape'} photos, using any orientation")
|
|
||||||
return _download_random_asset(client, assets)
|
cutoff = datetime.now(timezone.utc) - timedelta(days=30)
|
||||||
|
favorites = [a for a in assets if a.get("isFavorite")]
|
||||||
|
recent = []
|
||||||
def get_random_photo_from_album(client: ImmichClient, album_name: str, orientation: int = 0) -> Path:
|
for asset in assets:
|
||||||
album_id = client.get_album_id(album_name)
|
date_str = asset.get("createdAt", "")
|
||||||
if not album_id:
|
try:
|
||||||
raise ValueError(f"Album not found: {album_name}")
|
if datetime.fromisoformat(date_str.replace("Z", "+00:00")) >= cutoff:
|
||||||
|
recent.append(asset)
|
||||||
assets = [a for a in client.get_album_assets(album_id) if a.get("type") == "IMAGE"]
|
except (ValueError, AttributeError):
|
||||||
if not assets:
|
pass
|
||||||
raise ValueError(f"No photos in album: {album_name}")
|
|
||||||
|
if favorites and random.random() < 0.2:
|
||||||
portrait = orientation in (90, 270)
|
return random.choice(favorites)
|
||||||
filtered = _filter_by_orientation(assets, portrait)
|
if recent and random.random() < 0.5:
|
||||||
if filtered:
|
return random.choice(recent)
|
||||||
assets = filtered
|
return random.choice(assets)
|
||||||
else:
|
|
||||||
print(f"No {'portrait' if portrait else 'landscape'} photos, using any orientation")
|
|
||||||
return _download_random_asset(client, assets)
|
def _download_random_asset(client: ImmichClient, assets: list[dict]) -> Path:
|
||||||
|
history = get_history()
|
||||||
|
new_assets = history.filter_new(assets)
|
||||||
|
|
||||||
|
if new_assets:
|
||||||
|
print(f"Photos: {len(new_assets)} new / {len(assets)} total")
|
||||||
|
asset = _pick_weighted_random(new_assets)
|
||||||
|
else:
|
||||||
|
print(f"All {len(assets)} photos shown, picking from full list")
|
||||||
|
asset = _pick_weighted_random(assets)
|
||||||
|
|
||||||
|
dest = Path(tempfile.gettempdir()) / "immich_photo.jpg"
|
||||||
|
path = client.download_asset(asset["id"], dest)
|
||||||
|
history.mark_displayed(asset["id"])
|
||||||
|
return path
|
||||||
|
|
||||||
|
|
||||||
|
def get_random_photo_of_people(client: ImmichClient, names: list[str], orientation: int = 0) -> Path:
|
||||||
|
person_ids = [pid for name in names if (pid := client.get_person_id(name))]
|
||||||
|
if not person_ids:
|
||||||
|
raise ValueError(f"No people found: {names}")
|
||||||
|
|
||||||
|
assets = client.search_assets_by_people(person_ids)
|
||||||
|
|
||||||
|
if not assets:
|
||||||
|
raise ValueError(f"No photos found for: {names}")
|
||||||
|
|
||||||
|
portrait = orientation in (90, 270)
|
||||||
|
filtered = _filter_by_orientation(assets, portrait)
|
||||||
|
if not filtered:
|
||||||
|
raise ValueError(f"No {'portrait' if portrait else 'landscape'} photos available")
|
||||||
|
return _download_random_asset(client, filtered)
|
||||||
|
|
||||||
|
|
||||||
|
def get_random_photo_from_album(client: ImmichClient, album_name: str, orientation: int = 0) -> Path:
|
||||||
|
album_id = client.get_album_id(album_name)
|
||||||
|
if not album_id:
|
||||||
|
raise ValueError(f"Album not found: {album_name}")
|
||||||
|
|
||||||
|
assets = [a for a in client.get_album_assets(album_id) if a.get("type") == "IMAGE"]
|
||||||
|
if not assets:
|
||||||
|
raise ValueError(f"No photos in album: {album_name}")
|
||||||
|
|
||||||
|
portrait = orientation in (90, 270)
|
||||||
|
filtered = _filter_by_orientation(assets, portrait)
|
||||||
|
if not filtered:
|
||||||
|
raise ValueError(f"No {'portrait' if portrait else 'landscape'} photos in album: {album_name}")
|
||||||
|
return _download_random_asset(client, filtered)
|
||||||
|
|
|
||||||
|
|
@ -47,8 +47,3 @@ class ProgressBar:
|
||||||
"""Complete the progress bar."""
|
"""Complete the progress bar."""
|
||||||
self.current = self.total
|
self.current = self.total
|
||||||
self._render()
|
self._render()
|
||||||
|
|
||||||
|
|
||||||
def print_status(msg: str) -> None:
|
|
||||||
"""Print a status message."""
|
|
||||||
print(f" {msg}")
|
|
||||||
|
|
|
||||||
|
|
@ -2,38 +2,33 @@
|
||||||
# Waveshare 7.3" 6-color e-Paper driver (modified)
|
# Waveshare 7.3" 6-color e-Paper driver (modified)
|
||||||
# Original: Waveshare team, 2022-10-20
|
# Original: Waveshare team, 2022-10-20
|
||||||
|
|
||||||
import sys
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import cv2
|
import cv2
|
||||||
from PIL import Image, ImageEnhance
|
from PIL import Image, ImageEnhance
|
||||||
from numba import jit
|
from numba import jit
|
||||||
|
from progress import ProgressBar
|
||||||
from . import epdconfig
|
from . import epdconfig
|
||||||
|
|
||||||
EPD_WIDTH = 800
|
EPD_WIDTH = 800
|
||||||
EPD_HEIGHT = 480
|
EPD_HEIGHT = 480
|
||||||
|
|
||||||
DEFAULT_SATURATION = 1.4
|
# 6-color e-ink encoding: indices 0,1,2,3,5,6 are wire-format colors;
|
||||||
DEFAULT_CONTRAST = 1.2
|
# 4 is reserved/unused (filled with BLACK so nearest-color never picks it).
|
||||||
DEFAULT_GAMMA = 0.9
|
|
||||||
|
|
||||||
PALETTE_RGB = np.array([
|
PALETTE_RGB = np.array([
|
||||||
[0, 0, 0], # BLACK
|
[0, 0, 0], # 0: BLACK
|
||||||
[255, 255, 255], # WHITE
|
[255, 255, 255], # 1: WHITE
|
||||||
[255, 255, 0], # YELLOW
|
[255, 255, 0], # 2: YELLOW
|
||||||
[255, 0, 0], # RED
|
[255, 0, 0], # 3: RED
|
||||||
[0, 0, 255], # BLUE
|
[0, 0, 0], # 4: unused
|
||||||
[0, 255, 0], # GREEN
|
[0, 0, 255], # 5: BLUE
|
||||||
|
[0, 255, 0], # 6: GREEN
|
||||||
], dtype=np.float64)
|
], dtype=np.float64)
|
||||||
|
|
||||||
PERCEPTUAL_WEIGHTS = np.array([0.299, 0.587, 0.114], dtype=np.float64)
|
PERCEPTUAL_WEIGHTS = np.array([0.299, 0.587, 0.114], dtype=np.float64)
|
||||||
|
|
||||||
|
|
||||||
def _enhance_for_eink(image: Image.Image, saturation: float = None,
|
def _enhance_for_eink(image: Image.Image, saturation: float,
|
||||||
contrast: float = None, gamma: float = None) -> Image.Image:
|
contrast: float, gamma: float) -> Image.Image:
|
||||||
saturation = saturation or DEFAULT_SATURATION
|
|
||||||
contrast = contrast or DEFAULT_CONTRAST
|
|
||||||
gamma = gamma or DEFAULT_GAMMA
|
|
||||||
|
|
||||||
img = image.convert('RGB')
|
img = image.convert('RGB')
|
||||||
if saturation != 1.0:
|
if saturation != 1.0:
|
||||||
img = ImageEnhance.Color(img).enhance(saturation)
|
img = ImageEnhance.Color(img).enhance(saturation)
|
||||||
|
|
@ -66,18 +61,6 @@ def _crop_center(image: Image.Image, target_w: int, target_h: int,
|
||||||
return Image.fromarray(cv2.cvtColor(cropped, cv2.COLOR_BGR2RGB))
|
return Image.fromarray(cv2.cvtColor(cropped, cv2.COLOR_BGR2RGB))
|
||||||
|
|
||||||
|
|
||||||
def _render_progress(desc: str, current: int, total: int, width: int = 30) -> None:
|
|
||||||
if total == 0:
|
|
||||||
return
|
|
||||||
percent = int(100 * current / total)
|
|
||||||
filled = int(width * current / total)
|
|
||||||
bar = "█" * filled + "░" * (width - filled)
|
|
||||||
sys.stdout.write(f"\r{desc}: |{bar}| {percent:3d}%")
|
|
||||||
sys.stdout.flush()
|
|
||||||
if current >= total:
|
|
||||||
print()
|
|
||||||
|
|
||||||
|
|
||||||
@jit(nopython=True, cache=True)
|
@jit(nopython=True, cache=True)
|
||||||
def _find_nearest_color(r, g, b, palette, weights):
|
def _find_nearest_color(r, g, b, palette, weights):
|
||||||
best_idx, best_dist = 0, 1e10
|
best_idx, best_dist = 0, 1e10
|
||||||
|
|
@ -92,14 +75,14 @@ def _find_nearest_color(r, g, b, palette, weights):
|
||||||
|
|
||||||
|
|
||||||
@jit(nopython=True, cache=True)
|
@jit(nopython=True, cache=True)
|
||||||
def _atkinson_dither_rows(img, palette, weights, start_row, end_row):
|
def _atkinson_dither_rows(img, palette, weights, indices, start_row, end_row):
|
||||||
height, width = img.shape[:2]
|
height, width = img.shape[:2]
|
||||||
for y in range(start_row, end_row):
|
for y in range(start_row, end_row):
|
||||||
for x in range(width):
|
for x in range(width):
|
||||||
old_r, old_g, old_b = img[y, x, 0], img[y, x, 1], img[y, x, 2]
|
old_r, old_g, old_b = img[y, x, 0], img[y, x, 1], img[y, x, 2]
|
||||||
idx = _find_nearest_color(old_r, old_g, old_b, palette, weights)
|
idx = _find_nearest_color(old_r, old_g, old_b, palette, weights)
|
||||||
|
indices[y, x] = idx
|
||||||
new_r, new_g, new_b = palette[idx, 0], palette[idx, 1], palette[idx, 2]
|
new_r, new_g, new_b = palette[idx, 0], palette[idx, 1], palette[idx, 2]
|
||||||
img[y, x, 0], img[y, x, 1], img[y, x, 2] = new_r, new_g, new_b
|
|
||||||
|
|
||||||
err_r, err_g, err_b = (old_r - new_r) / 8.0, (old_g - new_g) / 8.0, (old_b - new_b) / 8.0
|
err_r, err_g, err_b = (old_r - new_r) / 8.0, (old_g - new_g) / 8.0, (old_b - new_b) / 8.0
|
||||||
|
|
||||||
|
|
@ -127,23 +110,25 @@ def _atkinson_dither_rows(img, palette, weights, start_row, end_row):
|
||||||
img[y + 2, x, 0] += err_r
|
img[y + 2, x, 0] += err_r
|
||||||
img[y + 2, x, 1] += err_g
|
img[y + 2, x, 1] += err_g
|
||||||
img[y + 2, x, 2] += err_b
|
img[y + 2, x, 2] += err_b
|
||||||
return img
|
|
||||||
|
|
||||||
|
|
||||||
def _dither_atkinson(image: Image.Image, show_progress: bool = True) -> Image.Image:
|
def _dither_atkinson(image: Image.Image, show_progress: bool = True) -> np.ndarray:
|
||||||
|
"""Atkinson-dither to the e-ink palette and return a uint8 array of palette indices."""
|
||||||
img = np.array(image.convert('RGB'), dtype=np.float64)
|
img = np.array(image.convert('RGB'), dtype=np.float64)
|
||||||
height = img.shape[0]
|
height, width = img.shape[:2]
|
||||||
|
indices = np.zeros((height, width), dtype=np.uint8)
|
||||||
if show_progress:
|
if show_progress:
|
||||||
print("Dithering...")
|
print("Dithering...")
|
||||||
|
progress = ProgressBar(height, desc="Dithering")
|
||||||
|
|
||||||
chunk_size = 48
|
chunk_size = 48
|
||||||
for i in range((height + chunk_size - 1) // chunk_size):
|
for i in range((height + chunk_size - 1) // chunk_size):
|
||||||
start, end = i * chunk_size, min((i + 1) * chunk_size, height)
|
start, end = i * chunk_size, min((i + 1) * chunk_size, height)
|
||||||
img = _atkinson_dither_rows(img, PALETTE_RGB, PERCEPTUAL_WEIGHTS, start, end)
|
_atkinson_dither_rows(img, PALETTE_RGB, PERCEPTUAL_WEIGHTS, indices, start, end)
|
||||||
if show_progress:
|
if show_progress:
|
||||||
_render_progress("Dithering", end, height)
|
progress.set(end)
|
||||||
|
|
||||||
return Image.fromarray(np.clip(img, 0, 255).astype(np.uint8), 'RGB')
|
return indices
|
||||||
|
|
||||||
|
|
||||||
class EPD:
|
class EPD:
|
||||||
|
|
@ -253,11 +238,8 @@ class EPD:
|
||||||
self.wait_busy()
|
self.wait_busy()
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
def getbuffer(self, image, saturation=None, contrast=None, gamma=None,
|
def getbuffer(self, image, saturation: float, contrast: float, gamma: float,
|
||||||
enhance=True, show_progress=True):
|
enhance: bool = True, show_progress: bool = True):
|
||||||
pal_image = Image.new("P", (1, 1))
|
|
||||||
pal_image.putpalette((0,0,0, 255,255,255, 255,255,0, 255,0,0, 0,0,0, 0,0,255, 0,255,0) + (0,0,0)*249)
|
|
||||||
|
|
||||||
image = image.convert('RGB')
|
image = image.convert('RGB')
|
||||||
imwidth, imheight = image.size
|
imwidth, imheight = image.size
|
||||||
|
|
||||||
|
|
@ -271,16 +253,13 @@ class EPD:
|
||||||
print("Enhancing...")
|
print("Enhancing...")
|
||||||
image = _enhance_for_eink(image, saturation, contrast, gamma)
|
image = _enhance_for_eink(image, saturation, contrast, gamma)
|
||||||
|
|
||||||
image = _dither_atkinson(image, show_progress)
|
indices = _dither_atkinson(image, show_progress)
|
||||||
|
|
||||||
if show_progress:
|
if show_progress:
|
||||||
print("Packing buffer...")
|
print("Packing buffer...")
|
||||||
image_6color = image.quantize(palette=pal_image, dither=Image.Dither.NONE)
|
flat = indices.reshape(-1)
|
||||||
buf_6color = bytearray(image_6color.tobytes('raw'))
|
packed = (flat[0::2].astype(np.uint8) << 4) | flat[1::2].astype(np.uint8)
|
||||||
|
buf = packed.tolist()
|
||||||
buf = [0x00] * (self.width * self.height // 2)
|
|
||||||
for i in range(0, len(buf_6color), 2):
|
|
||||||
buf[i // 2] = (buf_6color[i] << 4) + buf_6color[i + 1]
|
|
||||||
|
|
||||||
if show_progress:
|
if show_progress:
|
||||||
print("Ready")
|
print("Ready")
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue