import csv import io import zipfile from datetime import date from pathlib import Path import polars as pl from pipeline.transform.join_epc_pp import ( EPC_SOURCE_COLUMNS, _run, _scan_epc_certificates, ) def _write_csv(path: Path, fieldnames: list[str], rows: list[dict[str, str]]) -> None: with path.open("w", newline="") as file: writer = csv.DictWriter(file, fieldnames=fieldnames) writer.writeheader() writer.writerows(rows) def _row(**overrides: str) -> dict[str, str]: row = { "address": "1 Example Street", "postcode": " aa1 1aa ", "current_energy_rating": "c", "potential_energy_rating": "b", "property_type": "House", "built_form": "Mid-Terrace", "inspection_date": "2024-01-02", "total_floor_area": "84.5", "number_habitable_rooms": "5", "floor_height": "2.4", "construction_age_band": "England and Wales: 1950-1966", "tenure": "owner-occupied", } row.update(overrides) return row def test_scan_epc_certificates_supports_legacy_uppercase_csv(tmp_path: Path): csv_path = tmp_path / "certificates.csv" fieldnames = [column.upper() for column in EPC_SOURCE_COLUMNS] row = {column.upper(): value for column, value in _row().items()} row["NUMBER_HABITABLE_ROOMS"] = "0" _write_csv(csv_path, fieldnames, [row]) df = _scan_epc_certificates(csv_path, tmp_path).collect() assert df.to_dicts() == [ { "epc_address": "1 Example Street", "epc_postcode": "AA1 1AA", "current_energy_rating": "C", "potential_energy_rating": "B", "epc_property_type": "House", "built_form": "Mid-Terrace", "inspection_date": "2024-01-02", "total_floor_area": 84.5, "number_habitable_rooms": None, "floor_height": 2.4, "construction_age_band": "England and Wales: 1950-1966", "tenure": "owner-occupied", } ] def test_scan_epc_certificates_supports_domestic_zip(tmp_path: Path): zip_path = tmp_path / "domestic-csv.zip" rows_2023 = [_row(address="2 Example Street", inspection_date="2023-03-04")] rows_2024 = [ _row( address="3 Example Street", postcode="BB2 2BB", inspection_date="2024-05-06", total_floor_area="", tenure="Rented (social)", ) ] with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as archive: for member_name, rows in [ ("certificates-2023.csv", rows_2023), ("nested/certificates-2024.csv", rows_2024), ]: csv_text = [",".join(EPC_SOURCE_COLUMNS)] csv_text.extend( ",".join(row[column] for column in EPC_SOURCE_COLUMNS) for row in rows ) archive.writestr(member_name, "\n".join(csv_text) + "\n") archive.writestr("recommendations-2024.csv", "address,postcode\nignored,X\n") df = _scan_epc_certificates(zip_path, tmp_path).sort("inspection_date").collect() assert df.select("epc_address", "epc_postcode", "total_floor_area").to_dicts() == [ { "epc_address": "2 Example Street", "epc_postcode": "AA1 1AA", "total_floor_area": 84.5, }, { "epc_address": "3 Example Street", "epc_postcode": "BB2 2BB", "total_floor_area": None, }, ] assert df.get_column("tenure").to_list() == ["owner-occupied", "Rented (social)"] assert df.schema["number_habitable_rooms"] == pl.Int16 def test_run_joins_domestic_zip_with_price_paid(tmp_path: Path): zip_path = tmp_path / "domestic-csv.zip" with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as archive: csv_buffer = io.StringIO() writer = csv.DictWriter(csv_buffer, fieldnames=EPC_SOURCE_COLUMNS) writer.writeheader() writer.writerows( [ _row( current_energy_rating="d", inspection_date="2023-01-01", total_floor_area="80", tenure="Rented (social)", ), _row( current_energy_rating="c", inspection_date="2024-01-01", total_floor_area="85", tenure="owner-occupied", ), ] ) archive.writestr("certificates-2024.csv", csv_buffer.getvalue()) price_paid_path = tmp_path / "price-paid.parquet" pl.DataFrame( { "price": [250_000], "date_of_transfer": [date(2024, 2, 3)], "property_type": ["T"], "postcode": ["AA1 1AA"], "paon": ["1"], "saon": [None], "street": ["Example Street"], "locality": [None], "town_city": ["Exampletown"], "duration": ["F"], "old_new": ["N"], } ).write_parquet(price_paid_path) output_path = tmp_path / "epc-pp.parquet" _run(zip_path, price_paid_path, output_path, tmp_path) df = pl.read_parquet(output_path) assert df.height == 1 assert df.select( "epc_address", "current_energy_rating", "total_floor_area", "construction_age_band", "was_council_house", ).to_dicts() == [ { "epc_address": "1 Example Street", "current_energy_rating": "C", "total_floor_area": 85.0, "construction_age_band": 1950, "was_council_house": "Yes", } ] assert df.get_column("renovation_history").list.len().to_list() == [1]