mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-06-15 23:43:06 +03:00
* 💾 feat: Persist Context Breakdown & Branch/Total Usage Cost Persist the granular context breakdown and per-response usage/cost on the response message metadata, and re-derive branch + total usage/cost from a per-message index so the popover survives reloads and is branch-aware live. - Add aggregateEmittedUsage + buildPersistedContextUsage helpers in packages/api; capture the latest visible snapshot and every emitted on_token_usage payload via contextUsageSink/usageEmitSink. - Attach metadata.contextUsage (Part A) and metadata.usage (Part B) on the agents response message in sendCompletion. - Carry per-message usage on the token index; add sumTotalUsage/setEntryUsage and branch-scoped usage on sumBranch. - Repurpose the session accumulator into a single in-flight pending holder; flush it into the index at finalize; hydrate breakdowns on load. - Render branch cost with a conditional all-branches total in the breakdown. * 🧹 chore: Remove orphaned com_ui_session_cost i18n key * 🩹 fix: Address Codex review — normalize usage server-side, fix reload deltas - Persist per-event-normalized display units in metadata.usage (TResponseUsage) so reloaded mixed-provider turns match the live session; client reads them directly instead of re-normalizing with a single stamped provider (P2). - Persist completedOutputTokens (final call output) on metadata.contextUsage so a reloaded multi-call turn adds the post-snapshot delta, not the full tokenCount the snapshot already counts (P2). - buildIndex preserves a prior entry's immutable usage when a rebuilt cache message lacks metadata.usage, so a mid-session rebuild (regenerate) keeps a sibling branch's flushed cost (fixes the e2e regenerate failure). - Track costKnown so turns saved with contextCost off don't render $0.00 when cost display is later enabled (P3). - Use an epsilon for the all-branches cost comparison to avoid a spurious total row from float summation order (P3). - Update unit/integration/e2e tests for the new shapes; regenerate e2e asserts the all-branches total after reload (deterministic via persisted metadata). * 🩹 fix: Address Codex round 2 — pending leak, cost coverage, reload delta - Clear the in-flight pending usage on terminal abort/error (resetLive), so a stopped generation's tokens no longer merge into the next response (P2). - costKnown now means COMPLETE coverage (ANDed): a branch mixing cost-bearing and cost-less turns is flagged incomplete and the cost row is hidden rather than rendering an under-reported total (P2). - Drop the tokenCount fallback for completedOutputTokens on reload: only the persisted post-snapshot delta is used, so a multi-call turn whose provider emitted no usage_metadata no longer double-counts earlier output (P2). - Update tokens.spec for AND coverage semantics + incomplete-cost case. * 🩹 fix: Address Codex round 3 — no-usage snapshots, total coverage, provider-less cache - Skip persisting metadata.contextUsage when the response emitted no primary usage event: without a known post-snapshot output the granular gauge would undercount the reply on reload, so fall back to the coarse per-message estimate instead (P2). - Gate the all-branches cost row on totalUsage.costKnown so an incomplete total (a sibling saved without cost) never renders an under-reported figure (P2). - aggregateEmittedUsage/finalCallOutputTokens now normalize per-event with the client's magnitude fallback (normalizeEventUnits) instead of billing splitUsage, so provider-less cached events match live on reload (P2). - Add backend test for the provider-less cached case. * 🩹 fix: Address Codex round 4 — abort attribution, complete cost coverage - aggregateEmittedUsage persists cost only when EVERY call was priced; a partial pricing failure now omits cost so the client treats coverage as unknown rather than reading an under-reported sum as authoritative (P2). - finalizeUsage flushes pending into the response entry only when events were folded this session (eventCount > 0), so a late/second resumable subscriber carrying persisted metadata.usage keeps it instead of being overwritten with an empty pending record (P2). - On user stop, attribute the in-flight pending usage to the partial response (new attributePending handler) instead of discarding it in resetLive — the stopped reply's billed tokens are kept and still can't leak into the next response; resetLive's discard remains for the error path (P2). * 🐛 fix: Persist branch cost across branch switches via sticky usage history Branch cost vanished on switching to a sibling branch (until a new turn) — the cost analog of the granularity bug. buildIndex rebuilds the token index from the messages cache; a sibling generated this session whose cache message lacks metadata.usage (and is transiently dropped from the cache during regenerate) lost its live-flushed usage, so sumBranch found none and the cost row hid. Fix: a sticky per-response usage map (conversationId → messageId → usage), written by setEntryUsage and never rebuilt from the cache — the usage counterpart of snapshotsByAnchorFamily for the breakdown. buildIndex/upsertEntries restore an entry's usage from it when the message carries none; cleared on convo switch and migrated with the index. Add unit coverage for the drop-then-readd regression and an e2e assertion that branch cost survives a branch switch. * 🐛 fix: Re-index on branch switch so branch cost survives the switch The sticky usage history alone didn't fix the reported branch-switch cost drop: on a branch switch no cache `updated` event fires, so the index subscriber never re-ran, and the post-regenerate rebuild was skipped while `isSubmitting` was still true — leaving the index stale and missing the now-viewed branch's response entirely (sticky can only restore entries present in a rebuild). Re-index from the messages cache on every tail change (created/finalize AND branch switch), not just while submitting. The cache holds the full message set at switch time, so the viewed branch's response is re-added and its usage restored from metadata.usage or the sticky history → sumBranch finds it and the branch cost renders. Verified locally: the branch-switch e2e now passes (the cost section shows both the branch row and the all-branches total). Also fixed that e2e assertion to target a single cost value (strict-mode safe). * 🩹 fix: Handle stopped-stream usage — reset pending + persist abort metadata Codex round (stop/abort edges): - Resumable explicit-stop (intentional SSE close) reset UI state but never cleared pendingUsageFamily, so usage folded before the stop leaked into the next response in the conversation. Discard pending on intentional close (resetLive); a resume re-folds via backfillUsage, so nothing is lost. - The abort save path (abortMiddleware) persisted the stopped response without metadata.usage/contextUsage, so its cost + breakdown vanished on reload. Rebuild both from the job's persisted tokenUsage (emitted payloads incl. cost) and contextUsage snapshot — parity with the normal sendCompletion path; breakdown gated on a primary usage event like buildResponseMetadata. Deferred (per scope decision): mid-stream branch-switch transiently shows the streaming branch's pending on the viewed sibling (cosmetic, until finalize). * 🩹 fix: Persist abort metadata on the real agents route + tighten snapshot gate Codex round (corrects last round's wrong-path fixes): - Stopped AGENTS responses are saved by routes/agents/index.js (/chat/abort), not abortMiddleware — so last round's metadata fix never ran for them. Moved the rollup/snapshot builder into packages/api as buildAbortedResponseMetadata (shared, unit-tested) and applied it in BOTH abort save paths, so a stopped agent reply keeps its cost + breakdown on reload. - Persist the breakdown only when the FINAL visible call emitted usage: track a per-response snapshot count and require primaryUsageCount >= snapshotCount. Previously any earlier primary usage event passed the gate, so a multi-call turn whose final call emitted no usage_metadata used an earlier call's output as completedOutputTokens (already counted by the latest snapshot) → reload over-reported. Now it falls back to the coarse estimate. Resumable stop pending-reset (prior round, 3cde6fe035) already flows through clearAllSubmissions → SSE close → the intentional-close handler's resetLive. Deferred per scope: mid-stream branch-switch pending attribution (tracked). * 🩹 fix: Abort breakdown over-count + resume re-fold after pending discard Codex round (on the re-applied abort/snapshot work): - buildAbortedResponseMetadata now persists ONLY the usage/cost rollup, not the context breakdown. The abort path can't tell whether the final call emitted usage (the job stores only the latest snapshot, not a count), so persisting the breakdown risked reusing an earlier call's output as completedOutputTokens (already in the snapshot) → reload over-count. Stopped/incomplete responses now fall back to the coarse gauge estimate, which is safe and apt. - resetLive now also forgets the conversation's folded usage-event identities (clearUsageFolded). Discarding pending on a terminal/intentional close left the folded keys set, so a later resume's backfillUsage saw the persisted events as duplicates and never rebuilt pending — leaving the response's usage missing until a full reload. Clearing them lets the resume re-fold.
284 lines
8.5 KiB
JavaScript
284 lines
8.5 KiB
JavaScript
const { logger } = require('@librechat/data-schemas');
|
|
const { isAssistantsEndpoint, ErrorTypes } = require('librechat-data-provider');
|
|
const {
|
|
isEnabled,
|
|
sendEvent,
|
|
countTokens,
|
|
GenerationJobManager,
|
|
recordCollectedUsage,
|
|
sanitizeMessageForTransmit,
|
|
buildAbortedResponseMetadata,
|
|
} = require('@librechat/api');
|
|
const { truncateText, smartTruncateText } = require('~/app/clients/prompts');
|
|
const clearPendingReq = require('~/cache/clearPendingReq');
|
|
const { sendError } = require('~/server/middleware/error');
|
|
const { abortRun } = require('./abortRun');
|
|
const db = require('~/models');
|
|
|
|
/**
|
|
* Spend tokens for all models from collected usage.
|
|
* This handles both sequential and parallel agent execution.
|
|
*
|
|
* IMPORTANT: After spending, this function clears the collectedUsage array
|
|
* to prevent double-spending. The array is shared with AgentClient.collectedUsage,
|
|
* so clearing it here prevents the finally block from also spending tokens.
|
|
*
|
|
* @param {Object} params
|
|
* @param {string} params.userId - User ID
|
|
* @param {string} params.conversationId - Conversation ID
|
|
* @param {Array<Object>} params.collectedUsage - Usage metadata from all models
|
|
* @param {string} [params.fallbackModel] - Fallback model name if not in usage
|
|
* @param {string} [params.messageId] - The response message ID for transaction correlation
|
|
*/
|
|
async function spendCollectedUsage({
|
|
userId,
|
|
conversationId,
|
|
collectedUsage,
|
|
fallbackModel,
|
|
messageId,
|
|
}) {
|
|
if (!collectedUsage || collectedUsage.length === 0) {
|
|
return;
|
|
}
|
|
|
|
await recordCollectedUsage(
|
|
{
|
|
spendTokens: db.spendTokens,
|
|
spendStructuredTokens: db.spendStructuredTokens,
|
|
pricing: { getMultiplier: db.getMultiplier, getCacheMultiplier: db.getCacheMultiplier },
|
|
bulkWriteOps: { insertMany: db.bulkInsertTransactions, updateBalance: db.updateBalance },
|
|
},
|
|
{
|
|
user: userId,
|
|
conversationId,
|
|
collectedUsage,
|
|
context: 'abort',
|
|
messageId,
|
|
model: fallbackModel,
|
|
},
|
|
);
|
|
|
|
// Clear the array to prevent double-spending from the AgentClient finally block.
|
|
// The collectedUsage array is shared by reference with AgentClient.collectedUsage,
|
|
// so clearing it here ensures recordCollectedUsage() sees an empty array and returns early.
|
|
collectedUsage.length = 0;
|
|
}
|
|
|
|
/**
|
|
* Abort an active message generation.
|
|
* Uses GenerationJobManager for all agent requests.
|
|
* Since streamId === conversationId, we can directly abort by conversationId.
|
|
*/
|
|
async function abortMessage(req, res) {
|
|
const { abortKey, endpoint } = req.body;
|
|
|
|
if (isAssistantsEndpoint(endpoint)) {
|
|
return await abortRun(req, res);
|
|
}
|
|
|
|
const conversationId = abortKey?.split(':')?.[0] ?? req.user.id;
|
|
const userId = req.user.id;
|
|
|
|
// Use GenerationJobManager to abort the job (streamId === conversationId)
|
|
const abortResult = await GenerationJobManager.abortJob(conversationId);
|
|
|
|
if (!abortResult.success) {
|
|
if (!res.headersSent) {
|
|
return res.status(204).send({ message: 'Request not found' });
|
|
}
|
|
return;
|
|
}
|
|
|
|
const { jobData, content, text, collectedUsage } = abortResult;
|
|
|
|
const completionTokens = await countTokens(text);
|
|
const promptTokens = jobData?.promptTokens ?? 0;
|
|
|
|
const responseMessage = {
|
|
messageId: jobData?.responseMessageId,
|
|
parentMessageId: jobData?.userMessage?.messageId,
|
|
conversationId: jobData?.conversationId,
|
|
content,
|
|
text,
|
|
sender: jobData?.sender ?? 'AI',
|
|
finish_reason: 'incomplete',
|
|
endpoint: jobData?.endpoint,
|
|
iconURL: jobData?.iconURL,
|
|
model: jobData?.model,
|
|
unfinished: false,
|
|
error: false,
|
|
isCreatedByUser: false,
|
|
tokenCount: completionTokens,
|
|
};
|
|
|
|
/** Persist the usage/cost rollup + context breakdown for the stopped response
|
|
* so its branch/total cost and granular rows survive a reload, matching the
|
|
* normal completion path. */
|
|
const abortMetadata = buildAbortedResponseMetadata(jobData);
|
|
if (abortMetadata) {
|
|
responseMessage.metadata = abortMetadata;
|
|
}
|
|
|
|
// Spend tokens for ALL models from collectedUsage (handles parallel agents/addedConvo)
|
|
if (collectedUsage && collectedUsage.length > 0) {
|
|
await spendCollectedUsage({
|
|
userId,
|
|
conversationId: jobData?.conversationId,
|
|
collectedUsage,
|
|
fallbackModel: jobData?.model,
|
|
messageId: jobData?.responseMessageId,
|
|
});
|
|
} else {
|
|
// Fallback: no collected usage, use text-based token counting for primary model only
|
|
await db.spendTokens(
|
|
{ ...responseMessage, context: 'incomplete', user: userId },
|
|
{ promptTokens, completionTokens },
|
|
);
|
|
}
|
|
|
|
await db.saveMessage(
|
|
{
|
|
userId: req?.user?.id,
|
|
isTemporary: req?.body?.isTemporary,
|
|
interfaceConfig: req?.config?.interfaceConfig,
|
|
},
|
|
{ ...responseMessage, user: userId },
|
|
{ context: 'api/server/middleware/abortMiddleware.js' },
|
|
);
|
|
|
|
// Get conversation for title
|
|
const conversation = await db.getConvo(userId, conversationId);
|
|
|
|
const finalEvent = {
|
|
title: conversation && !conversation.title ? null : conversation?.title || 'New Chat',
|
|
final: true,
|
|
conversation,
|
|
requestMessage: jobData?.userMessage
|
|
? sanitizeMessageForTransmit({
|
|
messageId: jobData.userMessage.messageId,
|
|
parentMessageId: jobData.userMessage.parentMessageId,
|
|
conversationId: jobData.userMessage.conversationId,
|
|
text: jobData.userMessage.text,
|
|
isCreatedByUser: true,
|
|
})
|
|
: null,
|
|
responseMessage,
|
|
};
|
|
|
|
logger.debug(
|
|
`[abortMessage] ID: ${userId} | ${req.user.email} | Aborted request: ${conversationId}`,
|
|
);
|
|
|
|
if (res.headersSent) {
|
|
return sendEvent(res, finalEvent);
|
|
}
|
|
|
|
res.setHeader('Content-Type', 'application/json');
|
|
res.send(JSON.stringify(finalEvent));
|
|
}
|
|
|
|
const handleAbort = function () {
|
|
return async function (req, res) {
|
|
try {
|
|
if (isEnabled(process.env.LIMIT_CONCURRENT_MESSAGES)) {
|
|
await clearPendingReq({ userId: req.user.id });
|
|
}
|
|
return await abortMessage(req, res);
|
|
} catch (err) {
|
|
logger.error('[abortMessage] handleAbort error', err);
|
|
}
|
|
};
|
|
};
|
|
|
|
/**
|
|
* Handle abort errors during generation.
|
|
* @param {ServerResponse} res
|
|
* @param {ServerRequest} req
|
|
* @param {Error | unknown} error
|
|
* @param {Partial<TMessage> & { partialText?: string }} data
|
|
* @returns {Promise<void>}
|
|
*/
|
|
const handleAbortError = async (res, req, error, data) => {
|
|
if (error?.message?.includes('base64')) {
|
|
logger.error('[handleAbortError] Error in base64 encoding', {
|
|
...error,
|
|
stack: smartTruncateText(error?.stack, 1000),
|
|
message: truncateText(error.message, 350),
|
|
});
|
|
} else {
|
|
logger.error('[handleAbortError] AI response error; aborting request:', error);
|
|
}
|
|
const { sender, conversationId, messageId, parentMessageId, userMessageId, partialText } = data;
|
|
|
|
if (error.stack && error.stack.includes('google')) {
|
|
logger.warn(
|
|
`AI Response error for conversation ${conversationId} likely caused by Google censor/filter`,
|
|
);
|
|
}
|
|
|
|
let errorText = error?.message?.includes('"type"')
|
|
? error.message
|
|
: 'An error occurred while processing your request. Please contact the Admin.';
|
|
|
|
if (error?.type === ErrorTypes.INVALID_REQUEST) {
|
|
errorText = `{"type":"${ErrorTypes.INVALID_REQUEST}"}`;
|
|
}
|
|
|
|
if (error?.message?.includes("does not support 'system'")) {
|
|
errorText = `{"type":"${ErrorTypes.NO_SYSTEM_MESSAGES}"}`;
|
|
}
|
|
|
|
/**
|
|
* @param {string} partialText
|
|
* @returns {Promise<void>}
|
|
*/
|
|
const respondWithError = async (partialText) => {
|
|
const endpointOption = req.body?.endpointOption;
|
|
let options = {
|
|
sender,
|
|
messageId,
|
|
conversationId,
|
|
parentMessageId,
|
|
text: errorText,
|
|
user: req.user.id,
|
|
spec: endpointOption?.spec,
|
|
iconURL: endpointOption?.iconURL,
|
|
modelLabel: endpointOption?.modelLabel,
|
|
shouldSaveMessage: userMessageId != null,
|
|
model: endpointOption?.modelOptions?.model || req.body?.model,
|
|
};
|
|
|
|
if (req.body?.agent_id) {
|
|
options.agent_id = req.body.agent_id;
|
|
}
|
|
|
|
if (partialText) {
|
|
options = {
|
|
...options,
|
|
error: false,
|
|
unfinished: true,
|
|
text: partialText,
|
|
};
|
|
}
|
|
|
|
await sendError(req, res, options);
|
|
};
|
|
|
|
if (partialText && partialText.length > 5) {
|
|
try {
|
|
return await abortMessage(req, res);
|
|
} catch (err) {
|
|
logger.error('[handleAbortError] error while trying to abort message', err);
|
|
return respondWithError(partialText);
|
|
}
|
|
} else {
|
|
return respondWithError();
|
|
}
|
|
};
|
|
|
|
module.exports = {
|
|
handleAbort,
|
|
handleAbortError,
|
|
spendCollectedUsage,
|
|
};
|