Files
The-Agency/lib/src/session/session_runtime.dart
T

771 lines
26 KiB
Dart

import "dart:convert";
import "package:flutter/foundation.dart";
import "../api/openrouter_client.dart";
import "../api/response_parser.dart";
import "../compact/compact_service.dart";
import "../hooks/hook_runner.dart";
import "../hooks/hook_types.dart";
import "../local_state.dart";
import "../permissions/permission_types.dart";
import "../services/cost_tracker.dart" as cost_tracker;
import "../chat/tool_loop_service.dart";
import "conversation_history.dart";
import "session_store.dart";
import "session_types.dart";
// All the mutable state that belongs to a single chat thread.
// Previously this was all crammed onto ChatProvider, which meant switching
// threads would clobber in-flight state from the previous thread.
//
// A SessionRuntime is created when a session is activated and kept alive
// as long as there might be work still running (isLoading || isCompacting).
// ChatProvider holds a Map<sessionId, SessionRuntime> and switches which
// one is "active" when the user changes threads — the background ones keep
// running and save themselves to disk when done.
class SessionRuntime {
SessionRuntime({
required ConversationSession session,
required ToolLoopService toolLoopService,
required HookRunner? hookRunner,
required LocalSettings Function() getSettings,
required String Function(String?) normalizeModelId,
required VoidCallback onChanged,
void Function(double costDelta)? onCostAdded,
bool Function()? isActive,
void Function(String sessionId, String name)? onNameGenerated,
Future<void> Function(String rule)? onPersistAllowRule,
}) : _toolLoopService = toolLoopService,
_hookRunner = hookRunner,
_getSettings = getSettings,
_normalizeModelId = normalizeModelId,
_onChanged = onChanged,
_onCostAdded = onCostAdded,
_isActive = isActive,
_onNameGenerated = onNameGenerated,
_onPersistAllowRule = onPersistAllowRule {
_conversationHistory = ConversationHistory(session: session);
_apiMessages = _buildApiMessages(session.messages);
// restore persisted per-thread mode override
_permissionModeOverride = session.permissionMode;
}
final ToolLoopService _toolLoopService;
final HookRunner? _hookRunner;
final VoidCallback _onChanged;
final void Function(double costDelta)? _onCostAdded;
final LocalSettings Function() _getSettings;
final String Function(String?) _normalizeModelId;
final bool Function()? _isActive;
final void Function(String sessionId, String name)? _onNameGenerated;
final Future<void> Function(String rule)? _onPersistAllowRule;
bool _nameGenerated = false;
late final ConversationHistory _conversationHistory;
late List<Map<String, dynamic>> _apiMessages;
OpenRouterClient? _client;
bool _isLoading = false;
bool _isCompacting = false;
bool _stopRequested = false;
PendingPermission? _pendingPermission;
final List<QueuedMessage> _messageQueue = [];
// per-thread permission mode override (null = use global setting)
String? _permissionModeOverride;
String get permissionModeOverride =>
_permissionModeOverride ?? _getSettings().permissionMode ?? "default";
Future<void> setPermissionModeOverride(String mode) async {
_permissionModeOverride = mode;
final session = _conversationHistory.session;
if (session != null) {
session.permissionMode = mode;
await SessionStore.instance.saveSession(session);
}
_onChanged();
}
// set when a turn finishes while the user is viewing a different thread
bool _hasUnreadResult = false;
// true while a streaming tool is actively pushing chunks — prevents onToolResult
// from double-adding the content that was already appended chunk by chunk
bool _streamingToolOutput = false;
// compact state
String? _lastCompactSummary;
bool _suppressCompactWarning = false;
int _consecutiveCompactFailures = 0;
static const int _maxConsecutiveCompactFailures = 3;
// ─── read-only accessors ────────────────────────────────────────────────────
String get sessionId => _conversationHistory.session?.id ?? "";
List<Message> get messages => _conversationHistory.getMessages();
int get messageCount => messages.length;
String? get workingDirectory =>
_conversationHistory.session?.workingDirectory;
bool get isLoading => _isLoading;
bool get isCompacting => _isCompacting;
bool get isStopping => _stopRequested;
PendingPermission? get pendingPermission => _pendingPermission;
bool get hasUnreadResult => _hasUnreadResult;
void setUnreadResult(bool value) {
_hasUnreadResult = value;
_onChanged();
}
void markRead() {
if (!_hasUnreadResult) return;
_hasUnreadResult = false;
_onChanged();
}
int get queuedMessageCount => _messageQueue.length;
List<String> get queuedMessages =>
List.unmodifiable(_messageQueue.map((m) => m.text));
String? get lastCompactSummary => _lastCompactSummary;
int get contextTokens {
final msgs = messages;
for (var i = msgs.length - 1; i >= 0; i--) {
final ct = msgs[i].contextTokens;
if (ct != null && ct > 0) return ct;
}
return 0;
}
TokenWarningState? get tokenWarningState {
final ct = contextTokens;
if (ct <= 0) return null;
final model = _getSettings().model ?? "";
final state = calculateTokenWarningState(ct, model);
if (_suppressCompactWarning && !state.isClean) {
return TokenWarningState(
percentLeft: state.percentLeft,
isAboveWarningThreshold: false,
isAboveErrorThreshold: false,
isAboveAutoCompactThreshold: false,
isAtBlockingLimit: false,
);
}
return state;
}
// ─── message queue ──────────────────────────────────────────────────────────
void removeQueuedMessage(int index) {
if (index < 0 || index >= _messageQueue.length) return;
_messageQueue.removeAt(index);
_onChanged();
}
QueuedMessage? _dequeue() {
if (_messageQueue.isEmpty) return null;
int bestIdx = 0;
for (int i = 1; i < _messageQueue.length; i++) {
if (_messageQueue[i].priority.order < _messageQueue[bestIdx].priority.order) {
bestIdx = i;
}
}
final cmd = _messageQueue[bestIdx];
_messageQueue.removeAt(bestIdx);
return cmd;
}
// ─── send message ───────────────────────────────────────────────────────────
Future<void> sendMessage(
String text, {
QueuePriority priority = QueuePriority.next,
List<AttachmentData>? attachments,
}) async {
_hasUnreadResult = false;
final hasAttachments = attachments != null && attachments.isNotEmpty;
if (text.isEmpty && !hasAttachments) return;
// intercept /compact
final trimmed = text.trim();
if (trimmed.startsWith("/compact")) {
final custom = trimmed.length > 8 ? trimmed.substring(8).trim() : null;
await runCompact(customInstructions: custom?.isEmpty == true ? null : custom);
return;
}
if (_isLoading) {
_messageQueue.add(QueuedMessage(text: text, priority: priority));
_onChanged();
return;
}
final settings = _getSettings();
final apiKey = settings.openRouterApiKey;
if (apiKey == null || apiKey.isEmpty) {
throw Exception("OpenRouter API key not set.");
}
final model = _normalizeModelId(settings.model);
try {
_stopRequested = false;
bool hasStreamingAssistantMessage = false;
_client = await OpenRouterClientFactory.create(apiKey: apiKey);
// detect first turn before the user message is added
final bool isFirstTurn = _apiMessages.isEmpty && !_nameGenerated;
final session = _conversationHistory.session;
if (session != null) {
session.model = model;
if (session.name == "New Chat") {
session.name = _buildSessionName(text);
}
}
await _hookRunner?.runHooksForKind(
HookKind.userPromptSubmit,
input: {"message": text},
);
// build attachment blocks
final List<Map<String, dynamic>> attachmentBlocks = [];
if (attachments != null) {
for (final att in attachments) {
if (att.isImage) {
final dataUrl =
"data:${att.mimeType};base64,${base64Encode(att.data)}";
attachmentBlocks.add(<String, dynamic>{
"type": "image_url",
"image_url": <String, dynamic>{"url": dataUrl},
});
} else {
final decoded = utf8.decode(att.data, allowMalformed: true);
attachmentBlocks.add(<String, dynamic>{
"type": "text",
"text": "File: ${att.name}\n\n$decoded",
});
}
}
}
final msgAttachments = attachments
?.map((a) => MessageAttachment(
name: a.name,
mimeType: a.mimeType,
data: a.data,
))
.toList();
_conversationHistory.addMessage(
"user",
text,
attachments: msgAttachments,
);
_isLoading = true;
_onChanged();
final toolLoopResult = await _toolLoopService.runTurn(
client: _client!,
model: model,
apiKey: apiKey,
getSettings: () {
var base = _getSettings();
// apply thread-level permission mode override if set
if (_permissionModeOverride != null) {
base = base.copyWith(permissionMode: _permissionModeOverride);
}
final sessionRules = _conversationHistory.session?.alwaysAllowRules ?? [];
if (sessionRules.isEmpty) return base;
final merged = base.alwaysAllowRules.toSet()..addAll(sessionRules);
return base.copyWith(alwaysAllowRules: merged.toList());
},
apiMessages: _apiMessages,
userText: text,
attachmentBlocks: attachmentBlocks.isEmpty ? null : attachmentBlocks,
workingDirectory: workingDirectory,
advisorModel: _getSettings().advisorModel,
onToolCall: (toolName, input) {
_conversationHistory.addMessage(
"tool",
_formatToolCall(toolName, input),
);
_onChanged();
},
onToolOutputChunk: (toolName, chunk) {
// append live chunk to the last tool message (which onToolCall just added)
_streamingToolOutput = true;
_conversationHistory.appendToLastMessage(chunk);
_onChanged();
},
onToolResult: (toolName, result) {
if (_streamingToolOutput) {
// content already in the message from live chunks — dont double-add
_streamingToolOutput = false;
} else {
_conversationHistory.addMessage(
"tool",
_formatToolResult(toolName, result),
);
}
_onChanged();
// save after each tool result so progress isnt lost if app dies mid-turn
final s = _conversationHistory.session;
if (s != null) SessionStore.instance.saveSession(s);
},
onAssistantTextDelta: (delta) {
if (!hasStreamingAssistantMessage) {
_conversationHistory.addMessage("assistant", "");
hasStreamingAssistantMessage = true;
}
_conversationHistory.appendToLastMessage(delta);
_onChanged();
},
onAssistantMessageComplete: () {
hasStreamingAssistantMessage = false;
_onChanged();
// save after each complete assistant message (streaming done)
final s = _conversationHistory.session;
if (s != null) SessionStore.instance.saveSession(s);
},
onPermissionRequired: (toolName, input, {String? suggestionRule}) async {
final pending = PendingPermission(
toolName: toolName,
input: input,
suggestionRule: suggestionRule,
);
_pendingPermission = pending;
_onChanged();
final decision = await pending.future;
_pendingPermission = null;
_onChanged();
return decision;
},
shouldStop: () => _stopRequested,
);
_apiMessages = toolLoopResult.apiMessages;
// time-based microcompact
final mcResult = applyTimeBasedMicrocompact(_apiMessages);
if (mcResult != null) _apiMessages = mcResult;
final ct = toolLoopResult.response.contextTokens;
final rawUsage = toolLoopResult.response.usage;
final responseCost = (rawUsage?["cost"] as num?)?.toDouble() ?? 0.0;
double advisorCostTotal = 0;
for (final au in toolLoopResult.advisorUsages) {
cost_tracker.addToTotalSessionCost(
cost: au.costUsd,
inputTokens: au.inputTokens,
outputTokens: au.outputTokens,
cacheReadTokens: 0,
cacheCreationTokens: 0,
model: au.model,
);
advisorCostTotal += au.costUsd;
}
final totalCostThisTurn = responseCost + advisorCostTotal;
cost_tracker.addToTotalSessionCost(
cost: responseCost,
inputTokens: toolLoopResult.response.inputTokens ?? 0,
outputTokens: toolLoopResult.response.outputTokens ?? 0,
cacheReadTokens: toolLoopResult.response.cacheReadInputTokens ?? 0,
cacheCreationTokens: toolLoopResult.response.cacheCreationInputTokens ?? 0,
webSearchRequests: toolLoopResult.webSearchRequests,
webFetchRequests: toolLoopResult.webFetchRequests,
model: toolLoopResult.response.model,
);
if (!toolLoopResult.finalResponseWasStreamed) {
_conversationHistory.addMessage(
"assistant",
toolLoopResult.responseText,
tokens: toolLoopResult.response.outputTokens,
contextTokens: ct,
cost: totalCostThisTurn > 0 ? totalCostThisTurn : null,
);
} else {
_conversationHistory.setLastMessageContextTokens(ct);
_conversationHistory.setLastMessageCost(
totalCostThisTurn > 0 ? totalCostThisTurn : null,
);
}
if (totalCostThisTurn > 0) _onCostAdded?.call(totalCostThisTurn);
_onChanged();
// generate an AI name for the thread after the first turn
if (isFirstTurn && session != null && _onNameGenerated != null) {
_nameGenerated = true;
_generateThreadName(session, text, apiKey, model);
}
// auto-compact
if (ct > 0) {
final warning = calculateTokenWarningState(ct, model);
if (warning.isAboveAutoCompactThreshold &&
_consecutiveCompactFailures < _maxConsecutiveCompactFailures &&
_client != null) {
try {
_suppressCompactWarning = false;
await _runCompactInternal(
client: _client!,
model: model,
suppressFollowUpQuestions: true,
);
} catch (e) {
_consecutiveCompactFailures++;
print("[compact] auto-compact failed: $e");
}
}
}
if (session != null) {
await SessionStore.instance.saveSession(session);
}
_onChanged();
} catch (error, stackTrace) {
print("SessionRuntime.sendMessage failed: $error");
print(stackTrace);
if (error is RequestCancelledException) {
_conversationHistory.addMessage("assistant", "Generation stopped.");
final session = _conversationHistory.session;
if (session != null) {
await SessionStore.instance.saveSession(session);
}
return;
}
if (error is ToolLoopException) {
_apiMessages = List<Map<String, dynamic>>.from(error.apiMessages);
}
_conversationHistory.addMessage(
"assistant",
"This turn failed before the assistant could finish: $error",
);
final session = _conversationHistory.session;
if (session != null) {
await SessionStore.instance.saveSession(session);
}
rethrow;
} finally {
_client?.close();
_client = null;
_stopRequested = false;
_isLoading = false;
if (_conversationHistory.session?.id != null && !(_isActive?.call() ?? false)) {
_hasUnreadResult = true;
}
_onChanged();
}
final next = _dequeue();
if (next != null) {
_onChanged();
await sendMessage(next.text, priority: next.priority);
}
}
// ─── stop ───────────────────────────────────────────────────────────────────
void stopGenerating() {
if (!_isLoading) return;
_pendingPermission?.resolve(PermissionDecision.reject);
_pendingPermission = null;
_messageQueue.clear();
_stopRequested = true;
_client?.cancelActiveRequest();
_onChanged();
_hookRunner?.runHooksForKind(HookKind.stop);
}
// ─── compact ────────────────────────────────────────────────────────────────
Future<void> runCompact({String? customInstructions}) async {
if (_apiMessages.isEmpty) return;
if (_isLoading || _isCompacting) return;
final settings = _getSettings();
final apiKey = settings.openRouterApiKey;
if (apiKey == null || apiKey.isEmpty) return;
final model = _normalizeModelId(settings.model);
final client = await OpenRouterClientFactory.create(apiKey: apiKey);
try {
_isCompacting = true;
_suppressCompactWarning = false;
_onChanged();
await _runCompactInternal(
client: client,
model: model,
customInstructions: customInstructions,
suppressFollowUpQuestions: false,
);
} catch (e) {
print("[compact] manual compact failed: $e");
_conversationHistory.addMessage("assistant", "Compaction failed: $e");
_onChanged();
} finally {
client.close();
_isCompacting = false;
_onChanged();
}
}
Future<void> _runCompactInternal({
required OpenRouterClient client,
required String model,
String? customInstructions,
bool suppressFollowUpQuestions = false,
}) async {
final result = await compactConversation(
client: client,
model: model,
apiMessages: _apiMessages,
customInstructions: customInstructions,
suppressFollowUpQuestions: suppressFollowUpQuestions,
);
_apiMessages = result.messages;
_lastCompactSummary = result.summaryText;
_suppressCompactWarning = true;
_consecutiveCompactFailures = 0;
// store a boundary marker so that on session restore we know where to
// cut the history for the api call. content = the summary string the
// model will see; full message history before this marker is kept for
// the user to scroll back through.
_conversationHistory.addMessage("compact_boundary", result.messages.first["content"] as String);
_conversationHistory.addMessage(
"assistant",
"✦ Conversation compacted (${result.preCompactMessageCount} messages → summary). "
"Context has been reset.",
);
final session = _conversationHistory.session;
if (session != null) {
await SessionStore.instance.saveSession(session);
}
// re-name the thread using the compact summary
if (session != null && _onNameGenerated != null) {
final settings = _getSettings();
final apiKey = settings.openRouterApiKey;
final model = _normalizeModelId(settings.model);
if (apiKey != null && apiKey.isNotEmpty) {
_generateThreadName(session, result.summaryText, apiKey, model);
}
}
_onChanged();
}
// ─── permission ─────────────────────────────────────────────────────────────
Future<void> resolvePermission(PermissionDecision decision, {String? persistRule}) async {
final pending = _pendingPermission;
if (pending == null) return;
if (decision == PermissionDecision.allowAlways) {
if (persistRule != null && _onPersistAllowRule != null) {
// persist to localSettings — survives session switches
await _onPersistAllowRule!(persistRule);
} else {
// session-scoped only (file tools)
final session = _conversationHistory.session;
if (session != null) {
final rule = pending.suggestionRule ?? _buildRuleString(pending.toolName, pending.input);
if (!session.alwaysAllowRules.contains(rule)) {
session.alwaysAllowRules.add(rule);
await SessionStore.instance.saveSession(session);
}
}
}
}
pending.resolve(decision);
_pendingPermission = null;
_onChanged();
}
// ─── dispose ────────────────────────────────────────────────────────────────
void dispose() {
_client?.close();
_client = null;
}
// ─── helpers ────────────────────────────────────────────────────────────────
List<Map<String, dynamic>> _buildApiMessages(List<Message> messages) {
// find the last compact boundary (if any) — everything before it belongs
// to the old pre-compact history that the model shouldnt see again.
// the boundary's content is the summary string we send as the first user msg.
int lastBoundary = -1;
for (var i = messages.length - 1; i >= 0; i--) {
if (messages[i].role == "compact_boundary") {
lastBoundary = i;
break;
}
}
if (lastBoundary == -1) {
// no compaction yet — send everything
return messages
.where((m) => m.role == "user" || m.role == "assistant")
.map((m) => <String, dynamic>{"role": m.role, "content": m.content})
.toList(growable: true);
}
// start with the summary as a user message, then all user/assistant
// messages that came after the boundary
final result = <Map<String, dynamic>>[
{"role": "user", "content": messages[lastBoundary].content},
];
for (var i = lastBoundary + 1; i < messages.length; i++) {
final m = messages[i];
if (m.role == "user" || m.role == "assistant") {
result.add({"role": m.role, "content": m.content});
}
}
return result;
}
// fires async — does not block the caller. context is a short snippet
// (first user msg or compact summary) — NOT the full conversation history.
void _generateThreadName(ConversationSession session, String context, String apiKey, String model) {
() async {
try {
final client = await OpenRouterClientFactory.create(apiKey: apiKey);
try {
final snippet = context.length > 600 ? "${context.substring(0, 600)}..." : context;
final resp = await client.createMessage(
model: model,
maxTokens: 20,
messages: [
{"role": "user", "content": snippet},
],
system: "Generate a very short title (3-6 words) for this conversation. Reply with ONLY the title text — no quotes, no period at the end, nothing else.",
temperature: 0.3,
);
final name = ResponseParser.extractTextContent(resp)
.replaceAll(RegExp(r'["\n\r`]'), "")
.trim();
if (name.isEmpty || name.length > 80) return;
session.name = name;
await SessionStore.instance.saveSession(session);
_onNameGenerated?.call(session.id, name);
_onChanged();
} finally {
client.close();
}
} catch (e) {
print("[thread name] generation failed: $e");
}
}();
}
String _buildSessionName(String text) {
final sanitized = text.replaceAll(RegExp(r"\s+"), " ").trim();
if (sanitized.isEmpty) return "New Chat";
const maxLength = 48;
if (sanitized.length <= maxLength) return sanitized;
return "${sanitized.substring(0, maxLength - 1).trimRight()}";
}
String _formatToolCall(String toolName, Map<String, dynamic> input) {
const encoder = JsonEncoder.withIndent(" ");
final visibleInput = Map<String, dynamic>.fromEntries(
input.entries.where((e) => !e.key.startsWith("_")),
);
return "$toolName call\n${encoder.convert(visibleInput)}";
}
String _formatToolResult(String toolName, String result) {
return "$toolName result\n$result";
}
String _buildRuleString(String toolName, Map<String, dynamic> input) {
String? content;
if (toolName == "Bash") {
content = input["command"] as String?;
} else if (toolName == "Read" || toolName == "Write" || toolName == "Edit") {
content = input["file_path"] as String?;
} else if (toolName == "Glob" || toolName == "Grep") {
content = input["pattern"] as String?;
} else if (toolName == "WebFetch" || toolName == "WebSearch") {
content = input["url"] as String? ?? input["query"] as String?;
}
if (content == null || content.isEmpty) return toolName;
return "$toolName($content)";
}
}
// Small data classes used by SessionRuntime that were previously on ChatProvider
enum QueuePriority {
now(0),
next(1),
later(2);
final int order;
const QueuePriority(this.order);
}
class QueuedMessage {
final String text;
final QueuePriority priority;
const QueuedMessage({required this.text, required this.priority});
}
class AttachmentData {
final String name;
final String mimeType;
final List<int> data;
const AttachmentData({required this.name, required this.mimeType, required this.data});
bool get isImage => mimeType.startsWith("image/");
}