This commit is contained in:
Andras Schmelczer 2026-02-15 09:48:30 +00:00
parent 128b3191e7
commit 03445188ea
54 changed files with 596953 additions and 3577 deletions

View file

@ -1,223 +1,208 @@
package propertymap;
import com.conveyal.r5.OneOriginResult;
import com.conveyal.r5.analyst.FreeFormPointSet;
import com.conveyal.r5.analyst.PointSet;
import com.conveyal.r5.analyst.TravelTimeComputer;
import com.conveyal.r5.analyst.WebMercatorExtents;
import com.conveyal.r5.analyst.cluster.RegionalTask;
import com.conveyal.r5.analyst.cluster.TravelTimeResult;
import com.conveyal.r5.api.util.LegMode;
import com.conveyal.r5.api.util.TransitModes;
import com.conveyal.r5.kryo.KryoNetworkSerializer;
import com.conveyal.r5.transit.TransportNetwork;
import com.google.gson.Gson;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpServer;
import org.locationtech.jts.geom.Coordinate;
import org.duckdb.DuckDBConnection;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.LocalDate;
import java.util.EnumSet;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Batch-compute travel times from each origin (place) to all destinations (postcodes)
* for all transport modes (car, bicycle, walking, transit).
*
* Output per mode: one parquet file per origin in {output-dir}/{mode}/{index}.parquet
* with columns (pcds VARCHAR, travel_minutes SMALLINT). -1 = unreachable within 120 min.
*/
public class App {
private static TransportNetwork network;
private static final Gson gson = new Gson();
static class TravelTimeRequest {
double[] origin; // [lat, lon]
double[][] destinations; // [[lat, lon], ...]
String mode; // "transit", "car", "bicycle", "walking"
}
static class TravelTimeResponse {
double[] travel_times; // minutes, -1 = unreachable
}
private static final String[] MODES = {"car", "bicycle", "walking", "transit"};
private static final int MAX_RETRIES = 2;
public static void main(String[] args) throws Exception {
String dataDir = System.getenv("DATA_DIR");
String postcodesPath = requiredArg(args, "--postcodes");
String placesPath = requiredArg(args, "--places");
String outputDirStr = requiredArg(args, "--output-dir");
int threads = Integer.parseInt(optionalArg(args, "--threads", "4"));
if (dataDir == null) {
System.err.println("Error: DATA_DIR environment variable not set");
System.exit(1);
}
Path outDir = Paths.get(outputDirStr);
Files.createDirectories(outDir);
String networkCacheDir = System.getenv("NETWORK_CACHE_DIR");
if (networkCacheDir == null) {
System.err.println("Error: NETWORK_CACHE_DIR environment variable not set");
System.exit(1);
}
LocalDate today = LocalDate.now();
TransportNetwork network = Router.loadNetwork(requiredEnv("DATA_DIR"), requiredEnv("NETWORK_CACHE_DIR"));
System.out.println("Loading transport network from " + dataDir);
System.out.println("Network cache dir: " + networkCacheDir);
System.err.println("Loading postcodes (England only)...");
Parquet.Postcodes postcodes = Parquet.loadEnglandPostcodes(
postcodesPath, outDir.resolve("postcodes_ref.parquet"));
int nDest = postcodes.lats().length;
System.err.printf(" %,d postcodes%n", nDest);
File cacheFile = new File(networkCacheDir, "network.dat");
if (cacheFile.exists()) {
System.out.println("Loading cached network from " + cacheFile);
network = KryoNetworkSerializer.read(cacheFile);
} else {
System.out.println("Building network (first run, this takes a few minutes)...");
network = TransportNetwork.fromDirectory(new File(dataDir));
new File(networkCacheDir).mkdirs();
KryoNetworkSerializer.write(network, cacheFile);
System.out.println("Network cached to " + cacheFile);
}
List<Router.DestinationChunk> chunks = Router.buildDestinationChunks(postcodes.lats(), postcodes.lons());
// Build stop-to-vertex distance tables (needed for egress routing in transit mode).
// Not built by fromDirectory() and too large to fit in the Kryo cache with 4GB heap.
System.out.println("Building stop-to-vertex distance tables...");
network.transitLayer.buildDistanceTables(null);
System.out.println("Distance tables built");
System.err.println("Loading places (deduplicated)...");
double[][] placesLatLon = Parquet.loadPlaces(placesPath, outDir.resolve("places_ref.parquet"));
double[] originLats = placesLatLon[0], originLons = placesLatLon[1];
int nOrigins = originLats.length;
System.err.printf(" %,d places%n", nOrigins);
System.err.printf(" Estimated output: %.1f GB (%,d x %,d x 2B)%n",
(double) nOrigins * nDest * 2 / 1e9, nOrigins, nDest);
System.out.println("Transport network loaded successfully");
HttpServer server = HttpServer.create(new InetSocketAddress(8003), 0);
server.createContext("/health", exchange -> {
sendResponse(exchange, 200, "ok");
// One thread pool shared across all modes
ExecutorService pool = Executors.newFixedThreadPool(threads);
// One DuckDB connection per thread, reused across all writes
ThreadLocal<DuckDBConnection> threadConn = ThreadLocal.withInitial(() -> {
try { return Parquet.connect(); }
catch (Exception e) { throw new RuntimeException(e); }
});
server.createContext("/travel-times", exchange -> {
if (!"POST".equals(exchange.getRequestMethod())) {
sendResponse(exchange, 405, "Method not allowed");
return;
try {
for (String mode : MODES) {
processMode(network, chunks, postcodes.codes(), originLats, originLons,
nDest, outDir, mode, today, pool, threadConn);
}
} finally {
pool.shutdown();
pool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
}
private static void processMode(
TransportNetwork network, List<Router.DestinationChunk> chunks,
String[] postcodes, double[] originLats, double[] originLons, int nDest,
Path outDir, String mode, LocalDate date,
ExecutorService pool, ThreadLocal<DuckDBConnection> threadConn) throws Exception {
int nOrigins = originLats.length;
System.err.printf("%n=== %s ===%n", mode.toUpperCase());
Path modeDir = outDir.resolve(mode);
Files.createDirectories(modeDir);
List<Integer> remaining = findRemaining(modeDir, nOrigins);
int alreadyDone = nOrigins - remaining.size();
System.err.printf(" %,d done, %,d remaining%n", alreadyDone, remaining.size());
if (remaining.isEmpty()) {
System.err.println(" All origins completed for this mode!");
return;
}
long startMs = System.currentTimeMillis();
int total = remaining.size();
AtomicInteger completed = new AtomicInteger(0);
AtomicInteger failed = new AtomicInteger(0);
// Progress reporter on a timer instead of per-task stderr writes
ScheduledExecutorService reporter = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "progress");
t.setDaemon(true);
return t;
});
reporter.scheduleAtFixedRate(() -> {
int c = completed.get();
if (c == 0) return;
double secs = (System.currentTimeMillis() - startMs) / 1000.0;
double rate = c / secs;
double etaH = (total - c) / rate / 3600;
System.err.printf("\r [%,d/%,d] %.1f/s | ETA %.1fh | fail %d",
c, total, rate, etaH, failed.get());
}, 2, 2, TimeUnit.SECONDS);
// Submit all work, wait for completion via CountDownLatch-like pattern
java.util.concurrent.CountDownLatch latch = new java.util.concurrent.CountDownLatch(remaining.size());
for (int idx : remaining) {
pool.submit(() -> {
try {
processOrigin(network, chunks, postcodes, originLats[idx], originLons[idx],
nDest, modeDir, mode, date, idx, threadConn.get());
completed.incrementAndGet();
} catch (Exception e) {
failed.incrementAndGet();
System.err.printf("%n [FAIL] origin %d: %s%n", idx, e.getMessage());
} finally {
latch.countDown();
}
});
}
latch.await();
reporter.shutdown();
double elapsedH = (System.currentTimeMillis() - startMs) / 3_600_000.0;
int n = completed.get();
System.err.printf("\r [%,d/%,d] %.1f/s | %.1fh | fail %d%n",
n, total, n / Math.max(elapsedH * 3600, 1), elapsedH, failed.get());
}
/** Compute and write travel times for a single origin, with retry on failure. */
private static void processOrigin(
TransportNetwork network, List<Router.DestinationChunk> chunks,
String[] postcodes, double lat, double lon, int nDest,
Path modeDir, String mode, LocalDate date, int idx,
DuckDBConnection conn) throws Exception {
Path outPath = modeDir.resolve(String.format("%06d.parquet", idx));
Exception lastError = null;
for (int attempt = 0; attempt <= MAX_RETRIES; attempt++) {
try {
handleTravelTimes(exchange);
short[] times = Router.computeTravelTimes(network, chunks, lat, lon, mode, nDest, date);
Parquet.writeTravelTimes(conn, outPath, postcodes, times);
return;
} catch (Exception e) {
System.err.println("Error handling travel-times: " + e.getMessage());
e.printStackTrace();
sendResponse(exchange, 500, "Internal server error: " + e.getMessage());
}
});
server.setExecutor(java.util.concurrent.Executors.newFixedThreadPool(4));
server.start();
System.out.println("R5 service listening on port 8003");
}
private static void sendResponse(HttpExchange exchange, int status, String body) throws IOException {
byte[] bytes = body.getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().set("Content-Type", "application/json");
exchange.sendResponseHeaders(status, bytes.length);
try (OutputStream os = exchange.getResponseBody()) {
os.write(bytes);
}
}
private static void handleTravelTimes(HttpExchange exchange) throws IOException {
long t0 = System.currentTimeMillis();
String body = new String(exchange.getRequestBody().readAllBytes(), StandardCharsets.UTF_8);
TravelTimeRequest req = gson.fromJson(body, TravelTimeRequest.class);
if (req.origin == null || req.origin.length != 2) {
sendResponse(exchange, 400, "{\"error\":\"origin must be [lat, lon]\"}");
return;
}
if (req.destinations == null || req.destinations.length == 0) {
sendResponse(exchange, 400, "{\"error\":\"destinations must be non-empty\"}");
return;
}
String mode = req.mode != null ? req.mode : "transit";
// Build destination point set (Coordinate takes x=lon, y=lat)
Coordinate[] coords = new Coordinate[req.destinations.length];
for (int i = 0; i < req.destinations.length; i++) {
coords[i] = new Coordinate(req.destinations[i][1], req.destinations[i][0]); // lon, lat
}
FreeFormPointSet destinations = new FreeFormPointSet(coords);
// Build the regional task
RegionalTask task = new RegionalTask();
task.fromLat = req.origin[0];
task.fromLon = req.origin[1];
task.date = LocalDate.now();
task.percentiles = new int[]{50};
task.recordTimes = true;
task.destinationPointSets = new PointSet[]{ destinations };
// Set grid extents from destination point set (required by TravelTimeComputer)
WebMercatorExtents extents = destinations.getWebMercatorExtents();
task.zoom = extents.zoom;
task.west = extents.west;
task.north = extents.north;
task.width = extents.width;
task.height = extents.height;
switch (mode) {
case "car":
task.fromTime = 8 * 3600;
task.toTime = 8 * 3600 + 60;
task.maxTripDurationMinutes = 120;
task.accessModes = EnumSet.of(LegMode.CAR);
task.egressModes = EnumSet.of(LegMode.CAR);
task.directModes = EnumSet.of(LegMode.CAR);
task.transitModes = EnumSet.noneOf(TransitModes.class);
break;
case "bicycle":
task.fromTime = 8 * 3600;
task.toTime = 8 * 3600 + 60;
task.maxTripDurationMinutes = 120;
task.accessModes = EnumSet.of(LegMode.BICYCLE);
task.egressModes = EnumSet.of(LegMode.BICYCLE);
task.directModes = EnumSet.of(LegMode.BICYCLE);
task.transitModes = EnumSet.noneOf(TransitModes.class);
break;
case "walking":
task.fromTime = 8 * 3600;
task.toTime = 8 * 3600 + 60;
task.maxTripDurationMinutes = 120;
task.accessModes = EnumSet.of(LegMode.WALK);
task.egressModes = EnumSet.of(LegMode.WALK);
task.directModes = EnumSet.of(LegMode.WALK);
task.transitModes = EnumSet.noneOf(TransitModes.class);
break;
default: // transit
task.fromTime = 8 * 3600;
task.toTime = 8 * 3600 + 60; // single RAPTOR sweep
task.maxTripDurationMinutes = 120;
task.maxRides = 4;
task.accessModes = EnumSet.of(LegMode.WALK);
task.egressModes = EnumSet.of(LegMode.WALK);
task.directModes = EnumSet.of(LegMode.WALK);
task.transitModes = EnumSet.of(TransitModes.TRANSIT);
break;
}
// Compute travel times
TravelTimeComputer computer = new TravelTimeComputer(task, network);
OneOriginResult result = computer.computeTravelTimes();
TravelTimeResponse response = new TravelTimeResponse();
response.travel_times = new double[req.destinations.length];
TravelTimeResult tt = result.travelTimes;
if (tt != null) {
int[][] values = tt.getValues();
// values[percentileIndex][destinationIndex]
for (int i = 0; i < req.destinations.length; i++) {
if (i < values[0].length && values[0][i] != Integer.MAX_VALUE) {
response.travel_times[i] = values[0][i]; // already in minutes
} else {
response.travel_times[i] = -1; // unreachable
lastError = e;
if (attempt < MAX_RETRIES) {
System.err.printf("%n [RETRY %d/%d] origin %d: %s%n",
attempt + 1, MAX_RETRIES, idx, e.getMessage());
}
}
} else {
for (int i = 0; i < req.destinations.length; i++) {
response.travel_times[i] = -1;
}
throw lastError;
}
/** Find origin indices that don't yet have output parquet files. */
private static List<Integer> findRemaining(Path modeDir, int nOrigins) throws Exception {
List<Integer> remaining = new ArrayList<>();
for (int i = 0; i < nOrigins; i++) {
Path f = modeDir.resolve(String.format("%06d.parquet", i));
if (!Files.exists(f) || Files.size(f) == 0) {
remaining.add(i);
}
}
return remaining;
}
long elapsed = System.currentTimeMillis() - t0;
System.out.println("Travel times (" + mode + ") computed for " + req.destinations.length +
" destinations in " + elapsed + "ms");
private static String requiredArg(String[] args, String name) {
for (int i = 0; i < args.length - 1; i++) {
if (args[i].equals(name)) return args[i + 1];
}
System.err.println("Missing required argument: " + name);
System.err.println("Usage: App --postcodes FILE --places FILE --output-dir DIR [--threads N]");
System.exit(1);
return null; // unreachable
}
sendResponse(exchange, 200, gson.toJson(response));
private static String optionalArg(String[] args, String name, String defaultValue) {
for (int i = 0; i < args.length - 1; i++) {
if (args[i].equals(name)) return args[i + 1];
}
return defaultValue;
}
private static String requiredEnv(String name) {
String val = System.getenv(name);
if (val == null) {
System.err.println("Missing required environment variable: " + name);
System.exit(1);
}
return val;
}
}