perfect-postcode/r5-java/src/main/java/propertymap/App.java

249 lines
10 KiB
Java

package propertymap;
import com.conveyal.r5.transit.TransportNetwork;
import org.duckdb.DuckDBConnection;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Batch-compute travel times from each origin (place) to nearby postcodes
* for all transport modes (car, bicycle, walking, transit).
*
* Each origin is spatially pre-filtered to only route to postcodes within a
* plausible travel radius for the mode. Output is sparse: only reachable
* postcodes are written (unreachable = absent from file).
*
* Output per mode: one parquet file per origin in {output-dir}/{mode}/{name}.parquet
* with columns (pcds VARCHAR, travel_minutes SMALLINT). Transit mode additionally
* includes a best_minutes SMALLINT column (5th percentile = best-case departure timing).
*/
public class App {
private static final String[] MODES = {"bicycle", "transit", "walking", "car"};
private static final int MAX_RETRIES = 2;
public static void main(String[] args) throws Exception {
String postcodesPath = requiredArg(args, "--postcodes");
String placesPath = requiredArg(args, "--places");
String outputDirStr = requiredArg(args, "--output-dir");
int threads = Integer.parseInt(optionalArg(args, "--threads", "4"));
Path outDir = Paths.get(outputDirStr);
Files.createDirectories(outDir);
LocalDate today = LocalDate.now();
TransportNetwork network = Router.loadNetwork(requiredEnv("DATA_DIR"), requiredEnv("NETWORK_CACHE_DIR"));
System.err.println("Loading postcodes (England only)...");
Parquet.Postcodes postcodes = Parquet.loadEnglandPostcodes(
postcodesPath, outDir.resolve("postcodes_ref.parquet"));
System.err.printf(" %,d postcodes%n", postcodes.lats().length);
System.err.println("Loading places (deduplicated)...");
Parquet.Places places = Parquet.loadPlaces(placesPath, outDir.resolve("places_ref.parquet"));
String[] originNames = places.names();
double[] originLats = places.lats(), originLons = places.lons();
int nOrigins = originLats.length;
System.err.printf(" %,d places%n", nOrigins);
// One thread pool shared across all modes
ExecutorService pool = Executors.newFixedThreadPool(threads);
// One DuckDB connection per thread, reused across all writes
ThreadLocal<DuckDBConnection> threadConn = ThreadLocal.withInitial(() -> {
try { return Parquet.connect(); }
catch (Exception e) { throw new RuntimeException(e); }
});
try {
for (String mode : MODES) {
processMode(network, postcodes.codes(), postcodes.lats(), postcodes.lons(),
originNames, originLats, originLons, outDir, mode, today, pool, threadConn);
}
} finally {
pool.shutdown();
pool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
}
private static void processMode(
TransportNetwork network,
String[] postcodes, double[] postcodeLats, double[] postcodeLons,
String[] originNames, double[] originLats, double[] originLons,
Path outDir, String mode, LocalDate date,
ExecutorService pool, ThreadLocal<DuckDBConnection> threadConn) throws Exception {
int nOrigins = originLats.length;
System.err.printf("%n=== %s ===%n", mode.toUpperCase());
System.err.printf(" Radius: %.0f km%n", Router.maxRadiusKm(mode));
Path modeDir = outDir.resolve(mode);
Files.createDirectories(modeDir);
List<Integer> remaining = findRemaining(modeDir, originNames);
int alreadyDone = nOrigins - remaining.size();
System.err.printf(" %,d done, %,d remaining%n", alreadyDone, remaining.size());
if (remaining.isEmpty()) {
System.err.println(" All origins completed for this mode!");
return;
}
long startMs = System.currentTimeMillis();
int total = remaining.size();
AtomicInteger completed = new AtomicInteger(0);
AtomicInteger failed = new AtomicInteger(0);
// Progress reporter on a timer instead of per-task stderr writes
ScheduledExecutorService reporter = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "progress");
t.setDaemon(true);
return t;
});
reporter.scheduleAtFixedRate(() -> {
int c = completed.get();
if (c == 0) return;
double secs = (System.currentTimeMillis() - startMs) / 1000.0;
double rate = c / secs;
double etaH = (total - c) / rate / 3600;
System.err.printf("\r [%,d/%,d] %.1f/s | ETA %.1fh | fail %d",
c, total, rate, etaH, failed.get());
}, 2, 2, TimeUnit.SECONDS);
CountDownLatch latch = new CountDownLatch(remaining.size());
for (int idx : remaining) {
pool.submit(() -> {
try {
processOrigin(network, postcodes, postcodeLats, postcodeLons,
originLats[idx], originLons[idx],
modeDir, mode, date, idx, originNames[idx], threadConn.get());
completed.incrementAndGet();
} catch (Exception e) {
failed.incrementAndGet();
System.err.printf("%n [FAIL] origin %s: %s%n", originNames[idx], e.getMessage());
} finally {
latch.countDown();
}
});
}
latch.await();
reporter.shutdown();
reporter.awaitTermination(5, TimeUnit.SECONDS);
double elapsedH = (System.currentTimeMillis() - startMs) / 3_600_000.0;
int n = completed.get();
System.err.printf("\r [%,d/%,d] %.1f/s | %.1fh | fail %d%n",
n, total, n / Math.max(elapsedH * 3600, 1), elapsedH, failed.get());
}
/** Compute and write travel times for a single origin, with retry on failure. */
private static void processOrigin(
TransportNetwork network,
String[] postcodes, double[] postcodeLats, double[] postcodeLons,
double originLat, double originLon,
Path modeDir, String mode, LocalDate date, int index, String name,
DuckDBConnection conn) throws Exception {
Path outPath = modeDir.resolve(originFilename(index, name));
Exception lastError = null;
for (int attempt = 0; attempt <= MAX_RETRIES; attempt++) {
try {
Router.FilteredResult result = Router.computeForOrigin(
network, postcodeLats, postcodeLons,
originLat, originLon, mode, date);
// Write only reachable postcodes (sparse output)
int reachable = 0;
for (short t : result.times()) if (t >= 0) reachable++;
String[] codes = new String[reachable];
short[] times = new short[reachable];
short[] bestTimes = result.bestTimes() != null ? new short[reachable] : null;
int j = 0;
for (int i = 0; i < result.times().length; i++) {
if (result.times()[i] >= 0) {
codes[j] = postcodes[result.originalIndices()[i]];
times[j] = result.times()[i];
if (bestTimes != null) bestTimes[j] = result.bestTimes()[i];
j++;
}
}
if (bestTimes != null) {
Parquet.writeTransitTravelTimes(conn, outPath, codes, times, bestTimes);
} else {
Parquet.writeTravelTimes(conn, outPath, codes, times);
}
return;
} catch (Exception e) {
lastError = e;
if (attempt < MAX_RETRIES) {
System.err.printf("%n [RETRY %d/%d] %s: %s%n",
attempt + 1, MAX_RETRIES, name, e.getMessage());
} else {
System.err.printf("%n [FAIL TRACE] %s:%n", name);
e.printStackTrace(System.err);
}
}
}
throw lastError;
}
/** Find origin indices that don't yet have output parquet files. */
private static List<Integer> findRemaining(Path modeDir, String[] names) throws Exception {
List<Integer> remaining = new ArrayList<>();
for (int i = 0; i < names.length; i++) {
Path f = modeDir.resolve(originFilename(i, names[i]));
if (!Files.exists(f) || Files.size(f) == 0) {
remaining.add(i);
}
}
return remaining;
}
/** Build a filename from index + place name (index prefix prevents collisions after sanitization). */
private static String originFilename(int index, String name) {
String safe = name.toLowerCase()
.replaceAll("[^a-z0-9 -]", "")
.replaceAll("\\s+", "-");
return String.format("%06d-%s.parquet", index, safe);
}
private static String requiredArg(String[] args, String name) {
for (int i = 0; i < args.length - 1; i++) {
if (args[i].equals(name)) return args[i + 1];
}
System.err.println("Missing required argument: " + name);
System.err.println("Usage: App --postcodes FILE --places FILE --output-dir DIR [--threads N]");
System.exit(1);
return null; // unreachable
}
private static String optionalArg(String[] args, String name, String defaultValue) {
for (int i = 0; i < args.length - 1; i++) {
if (args[i].equals(name)) return args[i + 1];
}
return defaultValue;
}
private static String requiredEnv(String name) {
String val = System.getenv(name);
if (val == null) {
System.err.println("Missing required environment variable: " + name);
System.exit(1);
}
return val;
}
}