import { fail, handleOptions, json } from "../_shared/http.ts"; import { requireUser } from "../_shared/supabase.ts"; type StopRow = { stop_sequence: number; stop_name: string; scheduled_time?: string | null; }; type TripRow = { trip_number: string; duty_number: string; bus_work_number: string; direction: string; service_code: string; sort_order: number; stops: StopRow[]; }; type RequestBody = { channel_id?: string; file_name?: string; source_mime?: string; parser?: string; trips?: TripRow[]; }; const BATCH_SIZE = 500; async function decompressGzip(compressed: Uint8Array): Promise { const ds = new DecompressionStream("gzip"); const writer = ds.writable.getWriter(); const reader = ds.readable.getReader(); writer.write(compressed); writer.close(); const chunks: Uint8Array[] = []; while (true) { const { done, value } = await reader.read(); if (done) break; chunks.push(value); } const totalLen = chunks.reduce((acc, c) => acc + c.length, 0); const result = new Uint8Array(totalLen); let offset = 0; for (const chunk of chunks) { result.set(chunk, offset); offset += chunk.length; } return result; } Deno.serve(async (req) => { const preflight = handleOptions(req); if (preflight) return preflight; if (req.method !== "POST") return fail("Method not allowed", 405); const { client, user, error: userError } = await requireUser(req); if (!user) return fail(userError ?? "Unauthorized", 401); let body: RequestBody; const contentEncoding = req.headers.get("content-encoding") ?? ""; const isGzip = contentEncoding.toLowerCase() === "gzip"; try { if (isGzip) { const raw = new Uint8Array(await req.arrayBuffer()); const decompressed = await decompressGzip(raw); const text = new TextDecoder().decode(decompressed); body = JSON.parse(text); } else { body = await req.json(); } } catch { return fail("Failed to parse request body"); } const channelId = (body.channel_id ?? "").trim(); const fileName = (body.file_name ?? "").trim(); const sourceMime = (body.source_mime ?? "").trim(); const parser = (body.parser ?? "").trim(); const trips = Array.isArray(body.trips) ? body.trips : []; if (!channelId) return fail("channel_id is required"); if (!fileName) return fail("file_name is required"); if (!sourceMime) return fail("source_mime is required"); if (!parser) return fail("parser is required"); if (trips.length === 0) return fail("trips is empty"); // Verify channel access const { data: channel, error: channelError } = await client .from("channels") .select("id, type") .eq("id", channelId) .eq("type", "operations") .maybeSingle(); if (channelError) return fail(channelError.message, 400); if (!channel) return fail("forbidden", 403); // Get next version const { data: existing } = await client .from("operations_schedules") .select("version") .eq("channel_id", channelId) .order("version", { ascending: false }) .limit(1); const latestVersion = Array.isArray(existing) && existing.length > 0 ? ((existing[0].version as number) ?? 0) : 0; const nextVersion = latestVersion + 1; // Deactivate old schedules const { error: deactivateError } = await client .from("operations_schedules") .update({ is_active: false }) .eq("channel_id", channelId) .eq("is_active", true); if (deactivateError) return fail(deactivateError.message, 500); // Insert schedule row const { data: scheduleRow, error: scheduleError } = await client .from("operations_schedules") .insert({ channel_id: channelId, version: nextVersion, source_file_name: fileName, source_mime: sourceMime, parser, parse_status: "parsed", uploaded_by: user.id, is_active: true, parsed_at: new Date().toISOString(), }) .select("id") .single(); if (scheduleError || !scheduleRow) { return fail(scheduleError?.message ?? "Failed to create schedule", 500); } const scheduleId = scheduleRow.id as string; // Batch insert trips const tripInserts = trips.map((t, i) => ({ schedule_id: scheduleId, trip_number: t.trip_number, duty_number: t.duty_number, bus_work_number: t.bus_work_number, direction: t.direction, service_code: t.service_code, sort_order: t.sort_order ?? i, })); const { data: insertedTrips, error: tripError } = await client .from("operations_trips") .insert(tripInserts) .select("id, trip_number, duty_number"); if (tripError || !insertedTrips) { return fail(tripError?.message ?? "Failed to insert trips", 500); } // Map trip_number+duty_number -> id // (trip_number alone might not be unique across duties) const tripIdMap = new Map(); for (const row of insertedTrips) { const key = `${row.trip_number}__${row.duty_number}`; tripIdMap.set(key, row.id as string); } // Build all stop rows const allStopRows: Record[] = []; for (const trip of trips) { const key = `${trip.trip_number}__${trip.duty_number}`; const tripId = tripIdMap.get(key); if (!tripId) continue; for (const stop of trip.stops ?? []) { allStopRows.push({ trip_id: tripId, stop_sequence: stop.stop_sequence, stop_name: stop.stop_name, scheduled_time: stop.scheduled_time ?? null, }); } } // Batch insert stops in chunks for (let i = 0; i < allStopRows.length; i += BATCH_SIZE) { const chunk = allStopRows.slice(i, i + BATCH_SIZE); const { error: stopError } = await client .from("operations_trip_stops") .insert(chunk); if (stopError) return fail(stopError.message, 500); } return json({ schedule_id: scheduleId, version: nextVersion }); });