Files
LibreChat/api/server/middleware/abortMiddleware.js
Danny Avila b03b2a0a29 💾 feat: Persist Context Breakdown & Branch/Total Usage Cost (#13734)
* 💾 feat: Persist Context Breakdown & Branch/Total Usage Cost

Persist the granular context breakdown and per-response usage/cost on the
response message metadata, and re-derive branch + total usage/cost from a
per-message index so the popover survives reloads and is branch-aware live.

- Add aggregateEmittedUsage + buildPersistedContextUsage helpers in
  packages/api; capture the latest visible snapshot and every emitted
  on_token_usage payload via contextUsageSink/usageEmitSink.
- Attach metadata.contextUsage (Part A) and metadata.usage (Part B) on the
  agents response message in sendCompletion.
- Carry per-message usage on the token index; add sumTotalUsage/setEntryUsage
  and branch-scoped usage on sumBranch.
- Repurpose the session accumulator into a single in-flight pending holder;
  flush it into the index at finalize; hydrate breakdowns on load.
- Render branch cost with a conditional all-branches total in the breakdown.

* 🧹 chore: Remove orphaned com_ui_session_cost i18n key

* 🩹 fix: Address Codex review — normalize usage server-side, fix reload deltas

- Persist per-event-normalized display units in metadata.usage (TResponseUsage)
  so reloaded mixed-provider turns match the live session; client reads them
  directly instead of re-normalizing with a single stamped provider (P2).
- Persist completedOutputTokens (final call output) on metadata.contextUsage so
  a reloaded multi-call turn adds the post-snapshot delta, not the full
  tokenCount the snapshot already counts (P2).
- buildIndex preserves a prior entry's immutable usage when a rebuilt cache
  message lacks metadata.usage, so a mid-session rebuild (regenerate) keeps a
  sibling branch's flushed cost (fixes the e2e regenerate failure).
- Track costKnown so turns saved with contextCost off don't render $0.00 when
  cost display is later enabled (P3).
- Use an epsilon for the all-branches cost comparison to avoid a spurious total
  row from float summation order (P3).
- Update unit/integration/e2e tests for the new shapes; regenerate e2e asserts
  the all-branches total after reload (deterministic via persisted metadata).

* 🩹 fix: Address Codex round 2 — pending leak, cost coverage, reload delta

- Clear the in-flight pending usage on terminal abort/error (resetLive), so a
  stopped generation's tokens no longer merge into the next response (P2).
- costKnown now means COMPLETE coverage (ANDed): a branch mixing cost-bearing
  and cost-less turns is flagged incomplete and the cost row is hidden rather
  than rendering an under-reported total (P2).
- Drop the tokenCount fallback for completedOutputTokens on reload: only the
  persisted post-snapshot delta is used, so a multi-call turn whose provider
  emitted no usage_metadata no longer double-counts earlier output (P2).
- Update tokens.spec for AND coverage semantics + incomplete-cost case.

* 🩹 fix: Address Codex round 3 — no-usage snapshots, total coverage, provider-less cache

- Skip persisting metadata.contextUsage when the response emitted no primary
  usage event: without a known post-snapshot output the granular gauge would
  undercount the reply on reload, so fall back to the coarse per-message
  estimate instead (P2).
- Gate the all-branches cost row on totalUsage.costKnown so an incomplete total
  (a sibling saved without cost) never renders an under-reported figure (P2).
- aggregateEmittedUsage/finalCallOutputTokens now normalize per-event with the
  client's magnitude fallback (normalizeEventUnits) instead of billing
  splitUsage, so provider-less cached events match live on reload (P2).
- Add backend test for the provider-less cached case.

* 🩹 fix: Address Codex round 4 — abort attribution, complete cost coverage

- aggregateEmittedUsage persists cost only when EVERY call was priced; a partial
  pricing failure now omits cost so the client treats coverage as unknown rather
  than reading an under-reported sum as authoritative (P2).
- finalizeUsage flushes pending into the response entry only when events were
  folded this session (eventCount > 0), so a late/second resumable subscriber
  carrying persisted metadata.usage keeps it instead of being overwritten with
  an empty pending record (P2).
- On user stop, attribute the in-flight pending usage to the partial response
  (new attributePending handler) instead of discarding it in resetLive — the
  stopped reply's billed tokens are kept and still can't leak into the next
  response; resetLive's discard remains for the error path (P2).

* 🐛 fix: Persist branch cost across branch switches via sticky usage history

Branch cost vanished on switching to a sibling branch (until a new turn) — the
cost analog of the granularity bug. buildIndex rebuilds the token index from the
messages cache; a sibling generated this session whose cache message lacks
metadata.usage (and is transiently dropped from the cache during regenerate)
lost its live-flushed usage, so sumBranch found none and the cost row hid.

Fix: a sticky per-response usage map (conversationId → messageId → usage),
written by setEntryUsage and never rebuilt from the cache — the usage counterpart
of snapshotsByAnchorFamily for the breakdown. buildIndex/upsertEntries restore an
entry's usage from it when the message carries none; cleared on convo switch and
migrated with the index. Add unit coverage for the drop-then-readd regression and
an e2e assertion that branch cost survives a branch switch.

* 🐛 fix: Re-index on branch switch so branch cost survives the switch

The sticky usage history alone didn't fix the reported branch-switch cost drop:
on a branch switch no cache `updated` event fires, so the index subscriber never
re-ran, and the post-regenerate rebuild was skipped while `isSubmitting` was
still true — leaving the index stale and missing the now-viewed branch's
response entirely (sticky can only restore entries present in a rebuild).

Re-index from the messages cache on every tail change (created/finalize AND
branch switch), not just while submitting. The cache holds the full message set
at switch time, so the viewed branch's response is re-added and its usage
restored from metadata.usage or the sticky history → sumBranch finds it and the
branch cost renders. Verified locally: the branch-switch e2e now passes (the
cost section shows both the branch row and the all-branches total). Also fixed
that e2e assertion to target a single cost value (strict-mode safe).

* 🩹 fix: Handle stopped-stream usage — reset pending + persist abort metadata

Codex round (stop/abort edges):
- Resumable explicit-stop (intentional SSE close) reset UI state but never
  cleared pendingUsageFamily, so usage folded before the stop leaked into the
  next response in the conversation. Discard pending on intentional close
  (resetLive); a resume re-folds via backfillUsage, so nothing is lost.
- The abort save path (abortMiddleware) persisted the stopped response without
  metadata.usage/contextUsage, so its cost + breakdown vanished on reload.
  Rebuild both from the job's persisted tokenUsage (emitted payloads incl. cost)
  and contextUsage snapshot — parity with the normal sendCompletion path;
  breakdown gated on a primary usage event like buildResponseMetadata.

Deferred (per scope decision): mid-stream branch-switch transiently shows the
streaming branch's pending on the viewed sibling (cosmetic, until finalize).

* 🩹 fix: Persist abort metadata on the real agents route + tighten snapshot gate

Codex round (corrects last round's wrong-path fixes):
- Stopped AGENTS responses are saved by routes/agents/index.js (/chat/abort),
  not abortMiddleware — so last round's metadata fix never ran for them. Moved
  the rollup/snapshot builder into packages/api as buildAbortedResponseMetadata
  (shared, unit-tested) and applied it in BOTH abort save paths, so a stopped
  agent reply keeps its cost + breakdown on reload.
- Persist the breakdown only when the FINAL visible call emitted usage: track a
  per-response snapshot count and require primaryUsageCount >= snapshotCount.
  Previously any earlier primary usage event passed the gate, so a multi-call
  turn whose final call emitted no usage_metadata used an earlier call's output
  as completedOutputTokens (already counted by the latest snapshot) → reload
  over-reported. Now it falls back to the coarse estimate.

Resumable stop pending-reset (prior round, 3cde6fe035) already flows through
clearAllSubmissions → SSE close → the intentional-close handler's resetLive.
Deferred per scope: mid-stream branch-switch pending attribution (tracked).

* 🩹 fix: Abort breakdown over-count + resume re-fold after pending discard

Codex round (on the re-applied abort/snapshot work):
- buildAbortedResponseMetadata now persists ONLY the usage/cost rollup, not the
  context breakdown. The abort path can't tell whether the final call emitted
  usage (the job stores only the latest snapshot, not a count), so persisting
  the breakdown risked reusing an earlier call's output as completedOutputTokens
  (already in the snapshot) → reload over-count. Stopped/incomplete responses
  now fall back to the coarse gauge estimate, which is safe and apt.
- resetLive now also forgets the conversation's folded usage-event identities
  (clearUsageFolded). Discarding pending on a terminal/intentional close left
  the folded keys set, so a later resume's backfillUsage saw the persisted
  events as duplicates and never rebuilt pending — leaving the response's usage
  missing until a full reload. Clearing them lets the resume re-fold.
2026-06-14 10:48:07 -04:00

284 lines
8.5 KiB
JavaScript

const { logger } = require('@librechat/data-schemas');
const { isAssistantsEndpoint, ErrorTypes } = require('librechat-data-provider');
const {
isEnabled,
sendEvent,
countTokens,
GenerationJobManager,
recordCollectedUsage,
sanitizeMessageForTransmit,
buildAbortedResponseMetadata,
} = require('@librechat/api');
const { truncateText, smartTruncateText } = require('~/app/clients/prompts');
const clearPendingReq = require('~/cache/clearPendingReq');
const { sendError } = require('~/server/middleware/error');
const { abortRun } = require('./abortRun');
const db = require('~/models');
/**
* Spend tokens for all models from collected usage.
* This handles both sequential and parallel agent execution.
*
* IMPORTANT: After spending, this function clears the collectedUsage array
* to prevent double-spending. The array is shared with AgentClient.collectedUsage,
* so clearing it here prevents the finally block from also spending tokens.
*
* @param {Object} params
* @param {string} params.userId - User ID
* @param {string} params.conversationId - Conversation ID
* @param {Array<Object>} params.collectedUsage - Usage metadata from all models
* @param {string} [params.fallbackModel] - Fallback model name if not in usage
* @param {string} [params.messageId] - The response message ID for transaction correlation
*/
async function spendCollectedUsage({
userId,
conversationId,
collectedUsage,
fallbackModel,
messageId,
}) {
if (!collectedUsage || collectedUsage.length === 0) {
return;
}
await recordCollectedUsage(
{
spendTokens: db.spendTokens,
spendStructuredTokens: db.spendStructuredTokens,
pricing: { getMultiplier: db.getMultiplier, getCacheMultiplier: db.getCacheMultiplier },
bulkWriteOps: { insertMany: db.bulkInsertTransactions, updateBalance: db.updateBalance },
},
{
user: userId,
conversationId,
collectedUsage,
context: 'abort',
messageId,
model: fallbackModel,
},
);
// Clear the array to prevent double-spending from the AgentClient finally block.
// The collectedUsage array is shared by reference with AgentClient.collectedUsage,
// so clearing it here ensures recordCollectedUsage() sees an empty array and returns early.
collectedUsage.length = 0;
}
/**
* Abort an active message generation.
* Uses GenerationJobManager for all agent requests.
* Since streamId === conversationId, we can directly abort by conversationId.
*/
async function abortMessage(req, res) {
const { abortKey, endpoint } = req.body;
if (isAssistantsEndpoint(endpoint)) {
return await abortRun(req, res);
}
const conversationId = abortKey?.split(':')?.[0] ?? req.user.id;
const userId = req.user.id;
// Use GenerationJobManager to abort the job (streamId === conversationId)
const abortResult = await GenerationJobManager.abortJob(conversationId);
if (!abortResult.success) {
if (!res.headersSent) {
return res.status(204).send({ message: 'Request not found' });
}
return;
}
const { jobData, content, text, collectedUsage } = abortResult;
const completionTokens = await countTokens(text);
const promptTokens = jobData?.promptTokens ?? 0;
const responseMessage = {
messageId: jobData?.responseMessageId,
parentMessageId: jobData?.userMessage?.messageId,
conversationId: jobData?.conversationId,
content,
text,
sender: jobData?.sender ?? 'AI',
finish_reason: 'incomplete',
endpoint: jobData?.endpoint,
iconURL: jobData?.iconURL,
model: jobData?.model,
unfinished: false,
error: false,
isCreatedByUser: false,
tokenCount: completionTokens,
};
/** Persist the usage/cost rollup + context breakdown for the stopped response
* so its branch/total cost and granular rows survive a reload, matching the
* normal completion path. */
const abortMetadata = buildAbortedResponseMetadata(jobData);
if (abortMetadata) {
responseMessage.metadata = abortMetadata;
}
// Spend tokens for ALL models from collectedUsage (handles parallel agents/addedConvo)
if (collectedUsage && collectedUsage.length > 0) {
await spendCollectedUsage({
userId,
conversationId: jobData?.conversationId,
collectedUsage,
fallbackModel: jobData?.model,
messageId: jobData?.responseMessageId,
});
} else {
// Fallback: no collected usage, use text-based token counting for primary model only
await db.spendTokens(
{ ...responseMessage, context: 'incomplete', user: userId },
{ promptTokens, completionTokens },
);
}
await db.saveMessage(
{
userId: req?.user?.id,
isTemporary: req?.body?.isTemporary,
interfaceConfig: req?.config?.interfaceConfig,
},
{ ...responseMessage, user: userId },
{ context: 'api/server/middleware/abortMiddleware.js' },
);
// Get conversation for title
const conversation = await db.getConvo(userId, conversationId);
const finalEvent = {
title: conversation && !conversation.title ? null : conversation?.title || 'New Chat',
final: true,
conversation,
requestMessage: jobData?.userMessage
? sanitizeMessageForTransmit({
messageId: jobData.userMessage.messageId,
parentMessageId: jobData.userMessage.parentMessageId,
conversationId: jobData.userMessage.conversationId,
text: jobData.userMessage.text,
isCreatedByUser: true,
})
: null,
responseMessage,
};
logger.debug(
`[abortMessage] ID: ${userId} | ${req.user.email} | Aborted request: ${conversationId}`,
);
if (res.headersSent) {
return sendEvent(res, finalEvent);
}
res.setHeader('Content-Type', 'application/json');
res.send(JSON.stringify(finalEvent));
}
const handleAbort = function () {
return async function (req, res) {
try {
if (isEnabled(process.env.LIMIT_CONCURRENT_MESSAGES)) {
await clearPendingReq({ userId: req.user.id });
}
return await abortMessage(req, res);
} catch (err) {
logger.error('[abortMessage] handleAbort error', err);
}
};
};
/**
* Handle abort errors during generation.
* @param {ServerResponse} res
* @param {ServerRequest} req
* @param {Error | unknown} error
* @param {Partial<TMessage> & { partialText?: string }} data
* @returns {Promise<void>}
*/
const handleAbortError = async (res, req, error, data) => {
if (error?.message?.includes('base64')) {
logger.error('[handleAbortError] Error in base64 encoding', {
...error,
stack: smartTruncateText(error?.stack, 1000),
message: truncateText(error.message, 350),
});
} else {
logger.error('[handleAbortError] AI response error; aborting request:', error);
}
const { sender, conversationId, messageId, parentMessageId, userMessageId, partialText } = data;
if (error.stack && error.stack.includes('google')) {
logger.warn(
`AI Response error for conversation ${conversationId} likely caused by Google censor/filter`,
);
}
let errorText = error?.message?.includes('"type"')
? error.message
: 'An error occurred while processing your request. Please contact the Admin.';
if (error?.type === ErrorTypes.INVALID_REQUEST) {
errorText = `{"type":"${ErrorTypes.INVALID_REQUEST}"}`;
}
if (error?.message?.includes("does not support 'system'")) {
errorText = `{"type":"${ErrorTypes.NO_SYSTEM_MESSAGES}"}`;
}
/**
* @param {string} partialText
* @returns {Promise<void>}
*/
const respondWithError = async (partialText) => {
const endpointOption = req.body?.endpointOption;
let options = {
sender,
messageId,
conversationId,
parentMessageId,
text: errorText,
user: req.user.id,
spec: endpointOption?.spec,
iconURL: endpointOption?.iconURL,
modelLabel: endpointOption?.modelLabel,
shouldSaveMessage: userMessageId != null,
model: endpointOption?.modelOptions?.model || req.body?.model,
};
if (req.body?.agent_id) {
options.agent_id = req.body.agent_id;
}
if (partialText) {
options = {
...options,
error: false,
unfinished: true,
text: partialText,
};
}
await sendError(req, res, options);
};
if (partialText && partialText.length > 5) {
try {
return await abortMessage(req, res);
} catch (err) {
logger.error('[handleAbortError] error while trying to abort message', err);
return respondWithError(partialText);
}
} else {
return respondWithError();
}
};
module.exports = {
handleAbort,
handleAbortError,
spendCollectedUsage,
};