diff --git a/pipeline/transform/merge.py b/pipeline/transform/merge.py index 6444153..5042ef1 100644 --- a/pipeline/transform/merge.py +++ b/pipeline/transform/merge.py @@ -44,7 +44,16 @@ def _build_wide( geosure_path: Path, ) -> pl.DataFrame: """Build the wide dataframe by joining epc_pp with all auxiliary data.""" - wide = pl.scan_parquet(epc_pp_path) + wide = ( + pl.scan_parquet(epc_pp_path) + .filter( + pl.col("total_floor_area").is_null() + | (pl.col("total_floor_area") > MIN_FLOOR_AREA_M2) + ) + .filter( + pl.col("latest_price").is_null() | (pl.col("latest_price") >= MIN_PRICE) + ) + ) arcgis = pl.scan_parquet(arcgis_path).select( pl.col("pcds").alias("postcode"), @@ -53,7 +62,7 @@ def _build_wide( "lsoa21", "oa21", ) - wide = wide.join(arcgis, on="postcode", how="inner") + wide = wide.join(arcgis, on="postcode", how="full", coalesce=True) wide = _join_journey_times(wide, journey_times_bank_path, "Bank") wide = _join_journey_times(wide, journey_times_fitzrovia_path, "Fitzrovia") @@ -149,9 +158,7 @@ def _build_wide( ) wide = ( - wide.filter(pl.col("total_floor_area") > MIN_FLOOR_AREA_M2) - .filter(pl.col("latest_price") >= MIN_PRICE) - .with_columns( + wide.with_columns( pl.when(pl.col("duration") == "U") .then(None) .otherwise(pl.col("duration"))