This commit is contained in:
Andras Schmelczer 2026-03-30 08:09:47 +01:00
commit 36d975545b
38 changed files with 2837 additions and 0 deletions

85
src/display.py Normal file
View file

@ -0,0 +1,85 @@
#!/usr/bin/env python3
import argparse
import os
import sys
from datetime import datetime
from pathlib import Path
from PIL import Image, ImageDraw, ImageFont
sys.path.append(str(Path(__file__).parent / "lib"))
from waveshare_epd import epd7in3e
from immich import ImmichClient, get_random_photo_of_people, get_random_photo_from_album
from homeassistant import HomeAssistantClient
IMMICH_URL = os.environ.get("IMMICH_URL", "https://immich.example.com")
IMMICH_API_KEY = os.environ.get("IMMICH_API_KEY", "REDACTED_IMMICH_API_KEY")
HA_URL = os.environ.get("HA_URL", "https://homeassistant.example.com")
HA_TOKEN = os.environ.get("HA_TOKEN", "REDACTED_HA_TOKEN")
HA_PRESENCE_ENTITIES = ["person.andras", "person.ruby"]
DEFAULT_SATURATION = 1.3
DEFAULT_CONTRAST = 1.05
DEFAULT_GAMMA = 0.90
def display_image(image_path: Path, orientation: int, saturation: float,
contrast: float, gamma: float, enhance: bool) -> None:
epd = epd7in3e.EPD()
try:
epd.init()
img = Image.open(image_path).convert("RGB")
if orientation:
img = img.rotate(orientation, expand=True)
buf = epd.getbuffer(img, saturation=saturation, contrast=contrast,
gamma=gamma, enhance=enhance)
epd.display(buf)
finally:
epd.sleep()
def main() -> None:
parser = argparse.ArgumentParser(description="Display image on e-ink frame")
parser.add_argument("--people", default="Me,Ruby",
help="Comma-separated names for Immich search")
parser.add_argument("--album", help="Fetch from album (overrides --people)")
parser.add_argument("-o", "--orientation", type=int, choices=[0, 90, 180, 270],
default=0, help="Rotation in degrees")
parser.add_argument("--saturation", type=float, default=DEFAULT_SATURATION)
parser.add_argument("--contrast", type=float, default=DEFAULT_CONTRAST)
parser.add_argument("--gamma", type=float, default=DEFAULT_GAMMA)
parser.add_argument("--no-enhance", action="store_true")
args = parser.parse_args()
now = datetime.now()
print(f"Time: {now.strftime('%H:%M')}")
if 0 <= now.hour < 7:
print("Night time, skipping")
sys.exit(0)
ha = HomeAssistantClient(HA_URL, HA_TOKEN)
home = [e.split(".")[-1].title() for e in HA_PRESENCE_ENTITIES if ha.is_person_home(e)]
if not home:
print("No one home, skipping")
sys.exit(0)
print(f"Home: {', '.join(home)}")
client = ImmichClient(IMMICH_URL, IMMICH_API_KEY)
if args.album:
image_path = get_random_photo_from_album(client, args.album, args.orientation)
print(f"Album: {args.album}")
else:
names = [n.strip() for n in args.people.split(",")]
image_path = get_random_photo_of_people(client, names, args.orientation)
print(f"People: {', '.join(names)}")
try:
display_image(image_path, args.orientation, args.saturation,
args.contrast, args.gamma, not args.no_enhance)
finally:
image_path.unlink(missing_ok=True)
if __name__ == "__main__":
main()

25
src/lib/homeassistant.py Normal file
View file

@ -0,0 +1,25 @@
#!/usr/bin/env python3
import json
from urllib.request import Request, urlopen
class HomeAssistantClient:
def __init__(self, base_url: str, token: str):
self.base_url = base_url.rstrip("/")
self.token = token
def get_state(self, entity_id: str) -> dict:
url = f"{self.base_url}/api/states/{entity_id}"
req = Request(url, headers={
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json",
})
with urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
def is_person_home(self, entity_id: str) -> bool:
try:
return self.get_state(entity_id).get("state") == "home"
except Exception as e:
print(f"Failed to check {entity_id}: {e}")
return False

260
src/lib/immich.py Normal file
View file

@ -0,0 +1,260 @@
#!/usr/bin/env python3
import json
import random
import tempfile
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from urllib.request import Request, urlopen
from progress import ProgressBar
HISTORY_FILE = Path(__file__).parent.parent / "photo_history.json"
HISTORY_MAX_AGE_DAYS = 7
class PhotoHistory:
"""Track displayed photos to avoid repeats. Clears after 7 days."""
def __init__(self, path: Path = HISTORY_FILE):
self.path = path
self.displayed: set[str] = set()
self.created_at: datetime | None = None
self._load()
def _load(self) -> None:
if not self.path.exists():
self._reset()
return
try:
data = json.loads(self.path.read_text())
self.created_at = datetime.fromisoformat(data.get("created_at", ""))
if self.created_at.tzinfo is None:
self.created_at = self.created_at.replace(tzinfo=timezone.utc)
if datetime.now(timezone.utc) - self.created_at > timedelta(days=HISTORY_MAX_AGE_DAYS):
print(f"Photo history expired (>{HISTORY_MAX_AGE_DAYS} days), clearing...")
self._reset()
else:
self.displayed = set(data.get("displayed", []))
except (json.JSONDecodeError, ValueError, KeyError):
self._reset()
def _reset(self) -> None:
self.displayed = set()
self.created_at = datetime.now(timezone.utc)
self._save()
def _save(self) -> None:
self.path.write_text(json.dumps({
"created_at": self.created_at.isoformat(),
"displayed": list(self.displayed),
}, indent=2))
def mark_displayed(self, asset_id: str) -> None:
self.displayed.add(asset_id)
self._save()
def filter_new(self, assets: list[dict]) -> list[dict]:
return [a for a in assets if a.get("id") not in self.displayed]
_history: PhotoHistory | None = None
_people_cache: dict[str, str] = {} # name -> id cache
def get_history() -> PhotoHistory:
global _history
if _history is None:
_history = PhotoHistory()
return _history
@dataclass
class ImmichClient:
base_url: str
api_key: str
def _request(self, method: str, endpoint: str, data: dict | None = None,
show_progress: bool = False, progress_desc: str = "Fetching") -> dict:
url = f"{self.base_url.rstrip('/')}/api/{endpoint.lstrip('/')}"
headers = {"x-api-key": self.api_key}
body = None
if data is not None:
headers["Content-Type"] = "application/json"
body = json.dumps(data).encode()
req = Request(url, data=body, headers=headers, method=method)
with urlopen(req, timeout=30) as resp:
total_size = resp.headers.get('Content-Length')
if total_size and show_progress:
total_size = int(total_size)
progress = ProgressBar(total_size, desc=progress_desc)
chunks = bytearray()
while chunk := resp.read(8192):
chunks.extend(chunk)
progress.update(len(chunk))
progress.finish()
return json.loads(chunks.decode())
return json.loads(resp.read().decode())
def get_people(self) -> list[dict]:
return self._request("GET", "/people")["people"]
def get_person_id(self, name: str) -> str | None:
for person in self.get_people():
if person["name"].lower() == name.lower():
return person["id"]
return None
def search_assets_by_people(self, person_ids: list[str]) -> list[dict]:
items = []
page = 1
while True:
result = self._request("POST", "/search/metadata", {
"personIds": person_ids,
"size": 250,
"page": page,
"type": "IMAGE",
"withExif": True,
})
batch = result.get("assets", {}).get("items", [])
items.extend(batch)
if not batch or not result.get("assets", {}).get("nextPage"):
break
page += 1
return items
def download_asset(self, asset_id: str, dest: Path, show_progress: bool = True) -> Path:
url = f"{self.base_url.rstrip('/')}/api/assets/{asset_id}/thumbnail?size=preview"
req = Request(url, headers={"x-api-key": self.api_key})
with urlopen(req, timeout=30) as resp:
total_size = resp.headers.get('Content-Length')
if total_size and show_progress:
total_size = int(total_size)
progress = ProgressBar(total_size, desc="Downloading")
data = bytearray()
while chunk := resp.read(8192):
data.extend(chunk)
progress.update(len(chunk))
progress.finish()
dest.write_bytes(bytes(data))
else:
dest.write_bytes(resp.read())
return dest
def get_album_id(self, name: str) -> str | None:
for album in self._request("GET", "/albums"):
if album["albumName"].lower() == name.lower():
return album["id"]
return None
def get_album_assets(self, album_id: str, show_progress: bool = False) -> list[dict]:
album = self._request("GET", f"/albums/{album_id}",
show_progress=show_progress, progress_desc="Fetching album")
return album.get("assets", [])
def _is_portrait(asset: dict) -> bool | None:
"""Check if asset displays as portrait, accounting for EXIF orientation."""
exif = asset.get("exifInfo") or {}
width = exif.get("exifImageWidth") or 0
height = exif.get("exifImageHeight") or 0
if not (width and height):
return None
# EXIF orientation 6 and 8 mean 90° rotation (swap dimensions)
orientation = str(exif.get("orientation") or "1")
if orientation in ("6", "8"):
width, height = height, width
return height > width
def _filter_by_orientation(assets: list[dict], portrait: bool) -> list[dict]:
"""Filter assets by orientation, accounting for EXIF rotation."""
filtered = []
no_dimensions = 0
for asset in assets:
is_portrait = _is_portrait(asset)
if is_portrait is not None:
if is_portrait == portrait:
filtered.append(asset)
else:
no_dimensions += 1
if no_dimensions:
print(f"Note: {no_dimensions}/{len(assets)} photos missing dimension data")
return filtered
def _pick_weighted_random(assets: list[dict]) -> dict:
"""Pick random asset, slightly biased towards favorites (20%) and recent photos (20%)."""
if not assets:
raise ValueError("No assets to choose from")
one_week_ago = datetime.now(timezone.utc) - timedelta(days=7)
favorites = [a for a in assets if a.get("isFavorite")]
recent = []
for asset in assets:
date_str = asset.get("fileCreatedAt") or asset.get("createdAt", "")
try:
if datetime.fromisoformat(date_str.replace("Z", "+00:00")) >= one_week_ago:
recent.append(asset)
except (ValueError, AttributeError):
pass
if favorites and random.random() < 0.2:
return random.choice(favorites)
if recent and random.random() < 0.25:
return random.choice(recent)
return random.choice(assets)
def _download_random_asset(client: ImmichClient, assets: list[dict]) -> Path:
history = get_history()
new_assets = history.filter_new(assets)
if new_assets:
print(f"Photos: {len(new_assets)} new / {len(assets)} total")
asset = _pick_weighted_random(new_assets)
else:
print(f"All {len(assets)} photos shown, picking from full list")
asset = _pick_weighted_random(assets)
history.mark_displayed(asset["id"])
dest = Path(tempfile.gettempdir()) / "immich_photo.jpg"
return client.download_asset(asset["id"], dest)
def get_random_photo_of_people(client: ImmichClient, names: list[str], orientation: int = 0) -> Path:
person_ids = [pid for name in names if (pid := client.get_person_id(name))]
if not person_ids:
raise ValueError(f"No people found: {names}")
assets = client.search_assets_by_people(person_ids)
if not assets:
raise ValueError(f"No photos found for: {names}")
portrait = orientation in (90, 270)
filtered = _filter_by_orientation(assets, portrait)
if filtered:
assets = filtered
else:
print(f"No {'portrait' if portrait else 'landscape'} photos, using any orientation")
return _download_random_asset(client, assets)
def get_random_photo_from_album(client: ImmichClient, album_name: str, orientation: int = 0) -> Path:
album_id = client.get_album_id(album_name)
if not album_id:
raise ValueError(f"Album not found: {album_name}")
assets = [a for a in client.get_album_assets(album_id) if a.get("type") == "IMAGE"]
if not assets:
raise ValueError(f"No photos in album: {album_name}")
portrait = orientation in (90, 270)
filtered = _filter_by_orientation(assets, portrait)
if filtered:
assets = filtered
else:
print(f"No {'portrait' if portrait else 'landscape'} photos, using any orientation")
return _download_random_asset(client, assets)

54
src/lib/progress.py Normal file
View file

@ -0,0 +1,54 @@
"""Simple terminal progress bar for e-ink frame."""
import sys
class ProgressBar:
"""Simple text-based progress bar."""
def __init__(self, total: int, desc: str = "", width: int = 30):
self.total = total
self.current = 0
self.desc = desc
self.width = width
self._last_percent = -1
def update(self, n: int = 1) -> None:
"""Update progress by n steps."""
self.current = min(self.current + n, self.total)
self._render()
def set(self, value: int) -> None:
"""Set progress to specific value."""
self.current = min(value, self.total)
self._render()
def _render(self) -> None:
if self.total == 0:
return
percent = int(100 * self.current / self.total)
if percent == self._last_percent:
return
self._last_percent = percent
filled = int(self.width * self.current / self.total)
bar = "" * filled + "" * (self.width - filled)
desc = f"{self.desc}: " if self.desc else ""
sys.stdout.write(f"\r{desc}|{bar}| {percent:3d}%")
sys.stdout.flush()
if self.current >= self.total:
sys.stdout.write("\n")
sys.stdout.flush()
def finish(self) -> None:
"""Complete the progress bar."""
self.current = self.total
self._render()
def print_status(msg: str) -> None:
"""Print a status message."""
print(f" {msg}")

Binary file not shown.

Binary file not shown.

View file

View file

@ -0,0 +1,303 @@
#!/usr/bin/env python3
# Waveshare 7.3" 6-color e-Paper driver (modified)
# Original: Waveshare team, 2022-10-20
import sys
import numpy as np
import cv2
from PIL import Image, ImageEnhance
from numba import jit
from . import epdconfig
EPD_WIDTH = 800
EPD_HEIGHT = 480
DEFAULT_SATURATION = 1.4
DEFAULT_CONTRAST = 1.2
DEFAULT_GAMMA = 0.9
PALETTE_RGB = np.array([
[0, 0, 0], # BLACK
[255, 255, 255], # WHITE
[255, 255, 0], # YELLOW
[255, 0, 0], # RED
[0, 0, 255], # BLUE
[0, 255, 0], # GREEN
], dtype=np.float64)
PERCEPTUAL_WEIGHTS = np.array([0.299, 0.587, 0.114], dtype=np.float64)
def _enhance_for_eink(image: Image.Image, saturation: float = None,
contrast: float = None, gamma: float = None) -> Image.Image:
saturation = saturation or DEFAULT_SATURATION
contrast = contrast or DEFAULT_CONTRAST
gamma = gamma or DEFAULT_GAMMA
img = image.convert('RGB')
if saturation != 1.0:
img = ImageEnhance.Color(img).enhance(saturation)
if contrast != 1.0:
img = ImageEnhance.Contrast(img).enhance(contrast)
if gamma != 1.0:
lut = [int((i / 255.0) ** (1.0 / gamma) * 255) for i in range(256)] * 3
img = img.point(lut)
return img
def _crop_center(image: Image.Image, target_w: int, target_h: int,
show_progress: bool = True) -> Image.Image:
if show_progress:
print("Center cropping...")
img_cv = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
img_h, img_w = img_cv.shape[:2]
img_aspect, target_aspect = img_w / img_h, target_w / target_h
if img_aspect < target_aspect:
new_w, new_h = target_w, int(target_w / img_aspect)
else:
new_w, new_h = int(target_h * img_aspect), target_h
img_cv = cv2.resize(img_cv, (new_w, new_h), interpolation=cv2.INTER_LANCZOS4)
x_off = (new_w - target_w) // 2
y_off = (new_h - target_h) // 2
cropped = img_cv[y_off:y_off + target_h, x_off:x_off + target_w]
return Image.fromarray(cv2.cvtColor(cropped, cv2.COLOR_BGR2RGB))
def _render_progress(desc: str, current: int, total: int, width: int = 30) -> None:
if total == 0:
return
percent = int(100 * current / total)
filled = int(width * current / total)
bar = "" * filled + "" * (width - filled)
sys.stdout.write(f"\r{desc}: |{bar}| {percent:3d}%")
sys.stdout.flush()
if current >= total:
print()
@jit(nopython=True, cache=True)
def _find_nearest_color(r, g, b, palette, weights):
best_idx, best_dist = 0, 1e10
for i in range(palette.shape[0]):
dr = (palette[i, 0] - r) * weights[0]
dg = (palette[i, 1] - g) * weights[1]
db = (palette[i, 2] - b) * weights[2]
dist = dr * dr + dg * dg + db * db
if dist < best_dist:
best_dist, best_idx = dist, i
return best_idx
@jit(nopython=True, cache=True)
def _atkinson_dither_rows(img, palette, weights, start_row, end_row):
height, width = img.shape[:2]
for y in range(start_row, end_row):
for x in range(width):
old_r, old_g, old_b = img[y, x, 0], img[y, x, 1], img[y, x, 2]
idx = _find_nearest_color(old_r, old_g, old_b, palette, weights)
new_r, new_g, new_b = palette[idx, 0], palette[idx, 1], palette[idx, 2]
img[y, x, 0], img[y, x, 1], img[y, x, 2] = new_r, new_g, new_b
err_r, err_g, err_b = (old_r - new_r) / 8.0, (old_g - new_g) / 8.0, (old_b - new_b) / 8.0
if x + 1 < width:
img[y, x + 1, 0] += err_r
img[y, x + 1, 1] += err_g
img[y, x + 1, 2] += err_b
if x + 2 < width:
img[y, x + 2, 0] += err_r
img[y, x + 2, 1] += err_g
img[y, x + 2, 2] += err_b
if y + 1 < height:
if x > 0:
img[y + 1, x - 1, 0] += err_r
img[y + 1, x - 1, 1] += err_g
img[y + 1, x - 1, 2] += err_b
img[y + 1, x, 0] += err_r
img[y + 1, x, 1] += err_g
img[y + 1, x, 2] += err_b
if x + 1 < width:
img[y + 1, x + 1, 0] += err_r
img[y + 1, x + 1, 1] += err_g
img[y + 1, x + 1, 2] += err_b
if y + 2 < height:
img[y + 2, x, 0] += err_r
img[y + 2, x, 1] += err_g
img[y + 2, x, 2] += err_b
return img
def _dither_atkinson(image: Image.Image, show_progress: bool = True) -> Image.Image:
img = np.array(image.convert('RGB'), dtype=np.float64)
height = img.shape[0]
if show_progress:
print("Dithering...")
chunk_size = 48
for i in range((height + chunk_size - 1) // chunk_size):
start, end = i * chunk_size, min((i + 1) * chunk_size, height)
img = _atkinson_dither_rows(img, PALETTE_RGB, PERCEPTUAL_WEIGHTS, start, end)
if show_progress:
_render_progress("Dithering", end, height)
return Image.fromarray(np.clip(img, 0, 255).astype(np.uint8), 'RGB')
class EPD:
def __init__(self):
self.reset_pin = epdconfig.RST_PIN
self.dc_pin = epdconfig.DC_PIN
self.busy_pin = epdconfig.BUSY_PIN
self.cs_pin = epdconfig.CS_PIN
self.width = EPD_WIDTH
self.height = EPD_HEIGHT
def reset(self):
epdconfig.digital_write(self.reset_pin, 1)
epdconfig.delay_ms(20)
epdconfig.digital_write(self.reset_pin, 0)
epdconfig.delay_ms(2)
epdconfig.digital_write(self.reset_pin, 1)
epdconfig.delay_ms(20)
def send_command(self, command):
epdconfig.digital_write(self.dc_pin, 0)
epdconfig.digital_write(self.cs_pin, 0)
epdconfig.spi_writebyte([command])
epdconfig.digital_write(self.cs_pin, 1)
def send_data(self, data):
epdconfig.digital_write(self.dc_pin, 1)
epdconfig.digital_write(self.cs_pin, 0)
epdconfig.spi_writebyte([data])
epdconfig.digital_write(self.cs_pin, 1)
def send_data2(self, data):
epdconfig.digital_write(self.dc_pin, 1)
epdconfig.digital_write(self.cs_pin, 0)
epdconfig.spi_writebyte2(data)
epdconfig.digital_write(self.cs_pin, 1)
def wait_busy(self):
while epdconfig.digital_read(self.busy_pin) == 0:
epdconfig.delay_ms(5)
def turn_on_display(self):
self.send_command(0x04) # POWER_ON
self.wait_busy()
self.send_command(0x12) # DISPLAY_REFRESH
self.send_data(0x00)
self.wait_busy()
self.send_command(0x02) # POWER_OFF
self.send_data(0x00)
self.wait_busy()
def init(self):
if epdconfig.module_init() != 0:
return -1
self.reset()
self.wait_busy()
epdconfig.delay_ms(30)
self.send_command(0xAA)
for v in [0x49, 0x55, 0x20, 0x08, 0x09, 0x18]:
self.send_data(v)
self.send_command(0x01)
self.send_data(0x3F)
self.send_command(0x00)
self.send_data(0x5F)
self.send_data(0x69)
self.send_command(0x03)
for v in [0x00, 0x54, 0x00, 0x44]:
self.send_data(v)
self.send_command(0x05)
for v in [0x40, 0x1F, 0x1F, 0x2C]:
self.send_data(v)
self.send_command(0x06)
for v in [0x6F, 0x1F, 0x17, 0x49]:
self.send_data(v)
self.send_command(0x08)
for v in [0x6F, 0x1F, 0x1F, 0x22]:
self.send_data(v)
self.send_command(0x30)
self.send_data(0x03)
self.send_command(0x50)
self.send_data(0x3F)
self.send_command(0x60)
self.send_data(0x02)
self.send_data(0x00)
self.send_command(0x61)
for v in [0x03, 0x20, 0x01, 0xE0]:
self.send_data(v)
self.send_command(0x84)
self.send_data(0x01)
self.send_command(0xE3)
self.send_data(0x2F)
self.send_command(0x04)
self.wait_busy()
return 0
def getbuffer(self, image, saturation=None, contrast=None, gamma=None,
enhance=True, show_progress=True):
pal_image = Image.new("P", (1, 1))
pal_image.putpalette((0,0,0, 255,255,255, 255,255,0, 255,0,0, 0,0,0, 0,0,255, 0,255,0) + (0,0,0)*249)
image = image.convert('RGB')
imwidth, imheight = image.size
if imwidth != self.width or imheight != self.height:
if show_progress:
print(f"Input: {imwidth}x{imheight}{self.width}x{self.height}")
image = _crop_center(image, self.width, self.height, show_progress)
if enhance:
if show_progress:
print("Enhancing...")
image = _enhance_for_eink(image, saturation, contrast, gamma)
image = _dither_atkinson(image, show_progress)
if show_progress:
print("Packing buffer...")
image_6color = image.quantize(palette=pal_image, dither=Image.Dither.NONE)
buf_6color = bytearray(image_6color.tobytes('raw'))
buf = [0x00] * (self.width * self.height // 2)
for i in range(0, len(buf_6color), 2):
buf[i // 2] = (buf_6color[i] << 4) + buf_6color[i + 1]
if show_progress:
print("Ready")
return buf
def display(self, image):
self.send_command(0x10)
self.send_data2(image)
self.turn_on_display()
def Clear(self, color=0x11):
self.send_command(0x10)
self.send_data2([color] * (self.height * self.width // 2))
self.turn_on_display()
def sleep(self):
self.send_command(0x07) # DEEP_SLEEP
self.send_data(0xA5)
epdconfig.delay_ms(2000)
epdconfig.module_exit()

View file

@ -0,0 +1,167 @@
# /*****************************************************************************
# * | File : epdconfig.py
# * | Author : Waveshare team
# * | Function : Hardware underlying interface
# * | Info :
# *----------------
# * | This version: V1.2
# * | Date : 2022-10-29
# * | Info :
# ******************************************************************************
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documnetation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS OR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
import os
import logging
import sys
import time
import subprocess
from ctypes import *
logger = logging.getLogger(__name__)
class RaspberryPi:
# Pin definition
RST_PIN = 17
DC_PIN = 25
CS_PIN = 8
BUSY_PIN = 24
PWR_PIN = 27
MOSI_PIN = 10
SCLK_PIN = 11
def __init__(self):
import spidev
import gpiozero
self.SPI = spidev.SpiDev()
self.GPIO_RST_PIN = gpiozero.LED(self.RST_PIN)
self.GPIO_DC_PIN = gpiozero.LED(self.DC_PIN)
# self.GPIO_CS_PIN = gpiozero.LED(self.CS_PIN)
self.GPIO_PWR_PIN = gpiozero.LED(self.PWR_PIN)
self.GPIO_BUSY_PIN = gpiozero.Button(self.BUSY_PIN, pull_up = False)
def digital_write(self, pin, value):
if pin == self.RST_PIN:
if value:
self.GPIO_RST_PIN.on()
else:
self.GPIO_RST_PIN.off()
elif pin == self.DC_PIN:
if value:
self.GPIO_DC_PIN.on()
else:
self.GPIO_DC_PIN.off()
# elif pin == self.CS_PIN:
# if value:
# self.GPIO_CS_PIN.on()
# else:
# self.GPIO_CS_PIN.off()
elif pin == self.PWR_PIN:
if value:
self.GPIO_PWR_PIN.on()
else:
self.GPIO_PWR_PIN.off()
def digital_read(self, pin):
if pin == self.BUSY_PIN:
return self.GPIO_BUSY_PIN.value
elif pin == self.RST_PIN:
return self.RST_PIN.value
elif pin == self.DC_PIN:
return self.DC_PIN.value
# elif pin == self.CS_PIN:
# return self.CS_PIN.value
elif pin == self.PWR_PIN:
return self.PWR_PIN.value
def delay_ms(self, delaytime):
time.sleep(delaytime / 1000.0)
def spi_writebyte(self, data):
self.SPI.writebytes(data)
def spi_writebyte2(self, data):
self.SPI.writebytes2(data)
def DEV_SPI_write(self, data):
self.DEV_SPI.DEV_SPI_SendData(data)
def DEV_SPI_nwrite(self, data):
self.DEV_SPI.DEV_SPI_SendnData(data)
def DEV_SPI_read(self):
return self.DEV_SPI.DEV_SPI_ReadData()
def module_init(self, cleanup=False):
self.GPIO_PWR_PIN.on()
if cleanup:
find_dirs = [
os.path.dirname(os.path.realpath(__file__)),
'/usr/local/lib',
'/usr/lib',
]
self.DEV_SPI = None
for find_dir in find_dirs:
val = int(os.popen('getconf LONG_BIT').read())
logging.debug("System is %d bit"%val)
if val == 64:
so_filename = os.path.join(find_dir, 'DEV_Config_64.so')
else:
so_filename = os.path.join(find_dir, 'DEV_Config_32.so')
if os.path.exists(so_filename):
self.DEV_SPI = CDLL(so_filename)
break
if self.DEV_SPI is None:
RuntimeError('Cannot find DEV_Config.so')
self.DEV_SPI.DEV_Module_Init()
else:
# SPI device, bus = 0, device = 0
self.SPI.open(0, 0)
self.SPI.max_speed_hz = 4000000
self.SPI.mode = 0b00
return 0
def module_exit(self, cleanup=False):
logger.debug("spi end")
self.SPI.close()
self.GPIO_RST_PIN.off()
self.GPIO_DC_PIN.off()
self.GPIO_PWR_PIN.off()
logger.debug("close 5V, Module enters 0 power consumption ...")
if cleanup:
self.GPIO_RST_PIN.close()
self.GPIO_DC_PIN.close()
# self.GPIO_CS_PIN.close()
self.GPIO_PWR_PIN.close()
self.GPIO_BUSY_PIN.close()
implementation = RaspberryPi()
for func in [x for x in dir(implementation) if not x.startswith('_')]:
setattr(sys.modules[__name__], func, getattr(implementation, func))

Binary file not shown.

Binary file not shown.