The-Agency/lib/src/daemon/daemon_manager.dart

288 lines
9 KiB
Dart

import "dart:async";
import "dart:convert";
import "dart:io";
import "daemon_types.dart";
// DaemonManager: manages background Claude sessions.
//
// Session records are stored as JSON under ~/.claude/sessions/<id>.json
// Each record includes pid, workingDir, status, log path, etc.
//
// The manager can start new background sessions, list them, stream their
// logs, attach to them (tail log), and kill them.
class DaemonManager {
DaemonManager({String? sessionsDir})
: sessionsDir = sessionsDir ?? _defaultSessionsDir();
final String sessionsDir;
static String _defaultSessionsDir() {
final home =
Platform.environment["HOME"] ??
Platform.environment["USERPROFILE"] ??
"/tmp";
return "$home/.claude/sessions";
}
Directory get _dir => Directory(sessionsDir);
// ─── registry I/O ───────────────────────────────────────────────────────
Future<void> _ensureDir() async {
await _dir.create(recursive: true);
}
String _recordPath(String id) =>
"$sessionsDir/${safeFilenameId(id)}.json";
Future<void> saveRecord(SessionRecord rec) async {
await _ensureDir();
final f = File(_recordPath(rec.id));
await f.writeAsString(jsonEncode(rec.toJson()));
}
Future<SessionRecord?> loadRecord(String id) async {
final f = File(_recordPath(id));
if (!f.existsSync()) return null;
try {
final raw = await f.readAsString();
return SessionRecord.fromJson(
jsonDecode(raw) as Map<String, dynamic>,
);
} catch (_) {
return null;
}
}
Future<void> deleteRecord(String id) async {
final f = File(_recordPath(id));
if (f.existsSync()) await f.delete();
}
/// List all session records. Stale (process-dead) running sessions
/// are updated to status=failed automatically.
Future<List<SessionRecord>> listSessions({bool refreshStatus = true}) async {
await _ensureDir();
final files = _dir
.listSync()
.whereType<File>()
.where((f) => f.path.endsWith(".json"))
.toList();
final records = <SessionRecord>[];
for (final f in files) {
try {
final raw = await f.readAsString();
final rec = SessionRecord.fromJson(
jsonDecode(raw) as Map<String, dynamic>,
);
records.add(rec);
} catch (_) {
// skip corrupt files
}
}
if (refreshStatus) {
for (final rec in records) {
if (rec.status == SessionStatus.running) {
final alive = _isPidAlive(rec.pid);
if (!alive) {
rec.status = SessionStatus.failed;
rec.endedAt = DateTime.now().toUtc().toIso8601String();
await saveRecord(rec);
}
}
}
}
records.sort((a, b) => a.startedAt.compareTo(b.startedAt));
return records;
}
// ─── process helpers ─────────────────────────────────────────────────────
bool _isPidAlive(int pid) {
// On Unix, sending signal 0 tests process existence
try {
return Process.killPid(pid, ProcessSignal.sigusr2) ||
// fallback: check /proc on linux
File("/proc/$pid").existsSync();
} catch (_) {
// ESRCH = no such process
try {
return File("/proc/$pid").existsSync();
} catch (_) {
return false;
}
}
}
// ─── start a background session ─────────────────────────────────────────
/// Spawn a new background Claude session.
///
/// [executable] is the claude binary path (defaults to "claude").
/// [promptArgs] are forwarded as-is to the child process.
/// Returns the session record.
Future<SessionRecord> startSession({
String executable = "claude",
List<String> promptArgs = const [],
String? workingDirectory,
String? model,
String? title,
}) async {
await _ensureDir();
final id = generateSessionId();
final logDir = "$sessionsDir/logs";
await Directory(logDir).create(recursive: true);
final logFile = "$logDir/${safeFilenameId(id)}.log";
final cwd = workingDirectory ?? Directory.current.path;
// args: --bg tells the legacy claude CLI to run headlessly (non-interactive)
final args = [
"--bg",
if (model != null) ...["--model", model],
...promptArgs,
];
final logSink = File(logFile).openWrite();
final proc = await Process.start(
executable,
args,
workingDirectory: cwd,
environment: {
...Platform.environment,
"CLAWD_SESSION_ID": id,
},
mode: ProcessStartMode.detachedWithStdio,
);
// pipe stdout/stderr into the log file
proc.stdout.listen((d) => logSink.add(d));
proc.stderr.listen((d) => logSink.add(d));
unawaited(proc.exitCode.then((code) async {
await logSink.close();
final rec = await loadRecord(id);
if (rec != null) {
rec.status = code == 0 ? SessionStatus.completed : SessionStatus.failed;
rec.endedAt = DateTime.now().toUtc().toIso8601String();
rec.exitCode = code;
await saveRecord(rec);
}
}));
final rec = SessionRecord(
id: id,
pid: proc.pid,
workingDirectory: cwd,
startedAt: DateTime.now().toUtc().toIso8601String(),
status: SessionStatus.running,
logFile: logFile,
title: title,
model: model,
);
await saveRecord(rec);
return rec;
}
// ─── kill a session ──────────────────────────────────────────────────────
/// Kill the session by id. force=true sends SIGKILL, otherwise SIGTERM.
Future<bool> killSession(String id, {bool force = false}) async {
final rec = await loadRecord(id);
if (rec == null) return false;
if (rec.status != SessionStatus.running) return false;
try {
final sig = force ? ProcessSignal.sigkill : ProcessSignal.sigterm;
final sent = Process.killPid(rec.pid, sig);
if (sent) {
rec.status = SessionStatus.killed;
rec.endedAt = DateTime.now().toUtc().toIso8601String();
await saveRecord(rec);
}
return sent;
} catch (_) {
return false;
}
}
// ─── logs ─────────────────────────────────────────────────────────────────
/// Read the log file for a session. Returns null if not found.
Future<String?> readLogs(String id, {int? tail}) async {
final rec = await loadRecord(id);
if (rec == null || rec.logFile == null) return null;
final f = File(rec.logFile!);
if (!f.existsSync()) return null;
final contents = await f.readAsString();
if (tail == null) return contents;
final lines = contents.split("\n");
final start = lines.length > tail ? lines.length - tail : 0;
return lines.sublist(start).join("\n");
}
/// Stream log output from a session (tail -f style).
/// The stream ends when the session process exits.
Stream<String> streamLogs(String id) async* {
final rec = await loadRecord(id);
if (rec == null || rec.logFile == null) return;
final f = File(rec.logFile!);
if (!f.existsSync()) return;
// first emit existing content
final existing = await f.readAsString();
if (existing.isNotEmpty) yield existing;
// then watch for changes
if (rec.status != SessionStatus.running) return;
var offset = existing.length;
while (true) {
await Future<void>.delayed(const Duration(milliseconds: 250));
final current = await loadRecord(id);
final content = await f.readAsString();
if (content.length > offset) {
yield content.substring(offset);
offset = content.length;
}
if (current == null || current.status != SessionStatus.running) break;
if (!_isPidAlive(current.pid)) break;
}
}
// ─── attach ──────────────────────────────────────────────────────────────
/// Print session info suitable for "attach" display.
Future<String?> describeSession(String id) async {
final rec = await loadRecord(id);
if (rec == null) return null;
final buf = StringBuffer();
buf.writeln("Session: ${rec.id}");
buf.writeln(" PID: ${rec.pid}");
buf.writeln(" Status: ${rec.status.name}");
buf.writeln(" Dir: ${rec.workingDirectory}");
buf.writeln(" Started: ${rec.startedAt}");
if (rec.endedAt != null) buf.writeln(" Ended: ${rec.endedAt}");
if (rec.title != null) buf.writeln(" Title: ${rec.title}");
if (rec.logFile != null) buf.writeln(" Log: ${rec.logFile}");
return buf.toString();
}
}