🪙 fix: Correct Context Usage Gauge After Summarization (#13744)

* 🪙 fix: Persist Context Snapshot + Summary Marker After Summarization

The post-summarization context is correctly compacted by the SDK, but the
breakdown wasn't reliably reaching the client, leaving the gauge on the
whole-history estimate (stuck at 100% forever once a conversation compacts).

Two server changes in buildResponseMetadata:
- Snapshot guard: persist the breakdown when a PRIMARY usage event follows the
  latest snapshot (tracked via contextUsageSink.latestUsageIndex, recorded in
  the on_context_usage handler) instead of a brittle snapshot-vs-primary count.
  A summarization detour adds an extra snapshot whose only following usage is
  tagged 'summarization', which the count guard could miscount and drop.
- Summary marker: whenever a turn compacts (summaryTokens > 0), persist a
  lightweight metadata.summaryUsedTokens (the pre-invoke compacted context size)
  UNCONDITIONALLY — so even when the full snapshot can't be saved (interrupted
  final call) or never reaches the client, the per-message estimate has a signal
  to cap the discarded history.

Tests: client.contextMetadata.spec (guard + marker, incl. marker-survives-drop)
and a real-pipeline summarization integration test.

* 🪙 fix: Cap the Context Estimate at the Summary Marker

When the gauge falls back to the per-message estimate (no usable snapshot on the
branch), sumBranch summed the ENTIRE branch history — after a summarization that
discarded most of it, this over-counts and pins the gauge at 100% in perpetuity.

sumBranch now stops at the deepest summarized response (metadata.summaryUsedTokens)
and records it as summaryBaseline; the walk counts only post-summary messages,
and useTokenUsage adds the baseline. So the estimate reflects the compacted
context (summary + recent turns), not the discarded history. USD/default
behavior unchanged when no marker is present.

Test: sumBranch caps a huge pre-summary history at the compacted baseline.

* 🪙 fix: Address Codex Review on the Summarization Marker

- Branch cost/usage is no longer truncated at the summary marker — sumBranch
  caps only the CONTEXT-window count there and keeps accumulating provider
  usage/cost to the root (cumulative spend isn't discarded by compaction).
- findBranchSnapshotAnchor stops at a summarized response with no snapshot of its
  own, so it can't recover a stale PRE-summary snapshot and show discarded
  history; the summary-baseline estimate is used instead.
- Abort path: buildAbortedResponseMetadata now persists the summaryUsedTokens
  marker (pre-invoke, no completedOutputTokens ambiguity, so safe on abort) so a
  STOPPED summarized turn isn't re-summed on reload.
- Marker baseline fallback now includes summaryTokens (a separate breakdown
  field) so it doesn't under-report the compacted size. DRY'd into a shared
  computeSummaryUsedTokens used by the completion and abort paths.
- Estimate popover surfaces the summary baseline as a row so the displayed rows
  reconcile with the header total.

Tests: sumBranch cost-not-truncated + anchor-stops-at-marker (client);
computeSummaryUsedTokens fallback + abort marker (packages/api).

* 🪙 fix: Attribute Persisted Context Usage to the Snapshot Run

Match the post-snapshot primary usage to the latest snapshot's runId before
persisting metadata.contextUsage. Parallel/direct runs interleave snapshots and
usage (A snapshot → B snapshot → A usage → B no-usage); the prior index-only
guard persisted B's snapshot with A's output. finalCallOutputTokens now filters
completedOutputTokens to the snapshot's run. Untagged events (older lib/resume)
match any run for back-compat.

* 🪙 fix: Harden Summary Marker Against Tool-Loops, Stale Anchors, and Emit Races

Codex round on the summarization marker:

- Avoid double-counting earlier tool-loop outputs in the summary marker: those
  outputs sit in BOTH the latest snapshot's pre-invoke baseline AND the response
  message's tokenCount the client estimate adds on top. computeSummaryUsedTokens
  now subtracts the run's prior primary outputs (priorRunOutputTokens) — the live
  path bounds them by the snapshot's usage index, the abort path by all primaries
  (an interrupted final call emits none). Single-call turns subtract 0.
- Stop treating pre-summary anchors as active: sumBranch no longer sets
  containsAnchor once the context is capped at a summary marker, so a stale
  pre-summary snapshot can't override the summary-baseline estimate.
- Capture latestUsageIndex BEFORE awaiting emitEvent: a yield (resumable SSE /
  Redis) during parallel runs could let this call's own usage advance the index
  past the event that proves the snapshot completed, dropping a valid breakdown.

* 🪙 fix: Subtract Summarization Output from the Summary Marker

recordCollectedUsage folds the summarization call's completion into the response
message's tokenCount, while the generated summary is also in the snapshot baseline
as summaryTokens. The client estimate (summaryBaseline + responseTokenCount) thus
counted the summary twice — inflating the gauge after compaction even on a
single-call turn whenever the full snapshot is unavailable. priorRunOutputTokens
now also counts summarization-tagged output (still excluding subagent/sequential,
which recordCollectedUsage keeps out of the reported total), so the marker
subtracts it. Updated unit + guard tests.

* 🪙 fix: Refine Marker Subtraction for Summarization RunId and Abort Boundary

Two Codex follow-ups on the marker-subtraction logic:

- Subtract summarization output regardless of runId: the summarize detour is its
  own model-end call that may carry a distinct runId, but its output still lands
  in this response's tokenCount AND the snapshot baseline (summaryTokens). It is
  now counted unconditionally (still within the response's own usageEmitSink),
  while primaries keep the parallel-run runId filter.
- Don't subtract primaries on the abort path: the job stores no snapshot/usage
  boundary, so a primary that completed AFTER the latest snapshot is NOT in the
  baseline; subtracting it would cancel real output and under-report. priorRun-
  OutputTokens gains an includePrimary flag (false for abort) — abort subtracts
  only the always-pre-snapshot summarization output.

* 🪙 fix: Run-Scope Summary Subtraction and Stop Subtracting on Abort

Two Codex follow-ups, resolved by reverting the round-4 detour:

- Run-scope the summarization subtraction: the summarize detour inherits the
  graph run id (traceConfig spreads config.metadata.run_id), so its usage shares
  the answer snapshot's runId — it is NOT a distinct run. priorRunOutputTokens now
  filters summarization by runId like primaries, so a parallel sibling run's
  summary (different runId, in the sibling's baseline) is no longer subtracted from
  this branch's marker. Drops the includePrimary flag added last round.
- Stop subtracting on the abort path: abort tokenCount is countTokens(text)
  (abortMiddleware) or absent (agents route) — it does not fold in summarization or
  earlier-call output the way recordCollectedUsage does, so the marker must keep
  the full baseline. buildAbortedResponseMetadata now subtracts nothing.
This commit is contained in:
Danny Avila
2026-06-14 18:23:30 -04:00
committed by GitHub
parent 2350ebb24a
commit 44c253d48a
10 changed files with 801 additions and 41 deletions

View File

@@ -0,0 +1,139 @@
const AgentClient = require('../client');
/** Minimal post-(maybe-)summary snapshot. baseUsed = maxContextTokens(1000) -
* remainingContextTokens(700) = 300, so the marker (summaryUsedTokens) is 300. */
const snapshot = (summaryTokens) => ({
runId: 'run-1',
agentId: 'agent-1',
breakdown: {
maxContextTokens: 1000,
instructionTokens: 50,
systemMessageTokens: 50,
dynamicInstructionTokens: 0,
toolSchemaTokens: 0,
summaryTokens,
toolCount: 0,
messageCount: 1,
messageTokens: 20,
availableForMessages: 900,
},
contextBudget: 1000,
remainingContextTokens: 700,
prePruneContextTokens: 300,
effectiveInstructionTokens: 50,
calibrationRatio: 1,
});
const primary = { input_tokens: 10, output_tokens: 5, total_tokens: 15 };
const summarizationUsage = { ...primary, usage_type: 'summarization' };
const primaryFor = (runId, output_tokens) => ({
input_tokens: 10,
output_tokens,
total_tokens: 10 + output_tokens,
provider: 'openAI',
runId,
});
function buildMeta({ snap, latestUsageIndex, usageEvents }) {
const self = {
collectedThoughtSignatures: null,
usageEmitSink: usageEvents,
contextUsageSink: snap
? { latest: snap, count: 1, latestUsageIndex }
: { latest: null, count: 0 },
};
return AgentClient.prototype.buildResponseMetadata.call(self);
}
describe('AgentClient.buildResponseMetadata — snapshot persistence + summary marker', () => {
it('persists the snapshot when a primary usage follows it (normal turn)', () => {
const meta = buildMeta({ snap: snapshot(0), latestUsageIndex: 0, usageEvents: [primary] });
expect(meta.contextUsage).toBeDefined();
expect(meta.summaryUsedTokens).toBeUndefined();
});
it('persists the post-summary snapshot when the only pre-primary usage is the summarization', () => {
/** A summarized turn: the summarization usage precedes the post-summary
* snapshot (index 1), then the model's primary usage follows it. The old
* count guard miscounted and dropped this; the new guard keeps it. The
* marker subtracts the summarization output (5): the generated summary is in
* the snapshot baseline (summaryTokens) AND the response tokenCount, so
* 300 5 = 295 keeps the client estimate from counting it twice. */
const meta = buildMeta({
snap: snapshot(80),
latestUsageIndex: 1,
usageEvents: [summarizationUsage, primary],
});
expect(meta.contextUsage).toBeDefined();
expect(meta.summaryUsedTokens).toBe(295);
});
it('still emits the summary marker when the final call emitted no usage', () => {
/** Interrupted summarized turn: no primary usage follows the latest snapshot,
* so the snapshot is (correctly) not persisted — but the coarse marker
* survives so the client estimate still caps the discarded history. The
* summarization output (5) is subtracted (300 5 = 295). */
const meta = buildMeta({
snap: snapshot(80),
latestUsageIndex: 1,
usageEvents: [summarizationUsage],
});
expect(meta.contextUsage).toBeUndefined();
expect(meta.summaryUsedTokens).toBe(295);
});
it('drops the snapshot and emits no marker when the final call had no usage and no summary', () => {
const meta = buildMeta({ snap: snapshot(0), latestUsageIndex: 1, usageEvents: [primary] });
expect(meta.contextUsage).toBeUndefined();
expect(meta.summaryUsedTokens).toBeUndefined();
});
it('does not persist the snapshot when only a parallel run produced post-snapshot usage', () => {
/** A snapshot (run-1) → B snapshot (run-1 is latest) but the only following
* usage belongs to a sibling run (run-2). The guard must NOT persist run-1's
* snapshot with run-2's output — it falls back to the per-message estimate. */
const meta = buildMeta({
snap: snapshot(0),
latestUsageIndex: 0,
usageEvents: [primaryFor('run-2', 99)],
});
expect(meta.contextUsage).toBeUndefined();
});
it('persists with the snapshot run output when its own primary usage follows', () => {
const meta = buildMeta({
snap: snapshot(0),
latestUsageIndex: 0,
usageEvents: [primaryFor('run-2', 99), primaryFor('run-1', 7)],
});
expect(meta.contextUsage).toBeDefined();
expect(meta.contextUsage.completedOutputTokens).toBe(7);
});
it('subtracts earlier tool-loop output from the summary marker (interrupted turn)', () => {
/** Multi-call summarized turn stopped before the final usage: the earlier
* call (output 40) is baked into baseUsed (300), so the marker is 300 40 =
* 260. No primary follows the snapshot, so the full snapshot is not persisted
* and the client uses this marker — which must not double-count the 40 that
* the response tokenCount also carries. */
const meta = buildMeta({
snap: snapshot(80),
latestUsageIndex: 1,
usageEvents: [primaryFor('run-1', 40)],
});
expect(meta.contextUsage).toBeUndefined();
expect(meta.summaryUsedTokens).toBe(260);
});
it('subtracts only this runs earlier output, not a parallel runs', () => {
const meta = buildMeta({
snap: snapshot(80),
latestUsageIndex: 2,
usageEvents: [primaryFor('run-2', 999), primaryFor('run-1', 40), primaryFor('run-1', 5)],
});
/** baseUsed 300 run-1's earlier 40 = 260; run-2's 999 is ignored. */
expect(meta.summaryUsedTokens).toBe(260);
/** run-1's own primary follows the snapshot → snapshot persisted with output 5. */
expect(meta.contextUsage.completedOutputTokens).toBe(5);
});
});

View File

@@ -1,7 +1,7 @@
const { z } = require('zod');
const { tool } = require('@langchain/core/tools');
const { ChatGenerationChunk } = require('@langchain/core/outputs');
const { HumanMessage, AIMessageChunk } = require('@langchain/core/messages');
const { HumanMessage, AIMessage, AIMessageChunk } = require('@langchain/core/messages');
const {
Run,
Providers,
@@ -377,4 +377,95 @@ describe('usage events through the real agents pipeline', () => {
expect(resumeState.contextUsage.prePruneContextTokens).toBeGreaterThan(0);
}
});
/** Drives a real summarization (tight context + padded history); self-summarize
* reuses the overridden fake model so no API key is needed. */
async function runSummarizationLoop({ res, collectedUsage, contextUsageSink, usageEmitSink }) {
const { aggregateContent } = createContentAggregator();
const handlers = getDefaultHandlers({
res,
aggregateContent,
toolEndCallback: () => {},
collectedUsage,
contextUsageSink,
usageEmitSink,
summarizationOptions: { enabled: true },
});
const pad = 'context detail to overflow the tiny budget. '.repeat(40);
const history = [
new HumanMessage(`Turn 1 question. ${pad}`),
new AIMessage(`Turn 1 answer. ${pad}`),
new HumanMessage(`Turn 2 question. ${pad}`),
new AIMessage(`Turn 2 answer. ${pad}`),
new HumanMessage(`Final question after a lot of prior history. ${pad}`),
];
const indexTokenCountMap = {};
history.forEach((message, i) => {
indexTokenCountMap[i] = charCounter(message);
});
const run = await Run.create({
runId: `summ-e2e-${Date.now()}`,
graphConfig: {
type: 'standard',
llmConfig: {
provider: Providers.OPENAI,
model: 'gpt-4o-mini',
streaming: true,
streamUsage: false,
},
instructions: 'You are a helpful assistant.',
maxContextTokens: 700,
summarizationEnabled: true,
summarizationConfig: { provider: Providers.OPENAI, model: 'gpt-4o-mini' },
},
returnContent: true,
customHandlers: handlers,
tokenCounter: charCounter,
indexTokenCountMap,
});
run.Graph.overrideModel = new UsageFakeModel(
{ responses: ['## Summary\nPrior turns compacted.', 'Here is the final answer.'] },
[{ input_tokens: 40, output_tokens: 8, total_tokens: 48 }],
);
await run.processStream(
{ messages: history },
{
configurable: { thread_id: 'summ-e2e-thread', user_id: 'user-1' },
streamMode: 'values',
version: 'v2',
},
);
return run;
}
/** A summarized turn compacts the context (summary tokens replace the older
* turns) and the reduced snapshot is persisted — the latest snapshot is
* followed by a primary usage, so the save guard keeps it and the client
* uses the snapshot (not the inflated whole-history estimate). */
test('persists the reduced (compacted) snapshot after summarization', async () => {
if (!hasContextUsageEvent) {
return;
}
const res = createMockRes();
const contextUsageSink = { latest: null, count: 0 };
const usageEmitSink = [];
await runSummarizationLoop({ res, collectedUsage: [], contextUsageSink, usageEmitSink });
const snapshot = contextUsageSink.latest;
/** Summarization fired: a summary exists and the kept message tokens are
* small (the compacted context, not the full history). */
expect(snapshot?.breakdown?.summaryTokens).toBeGreaterThan(0);
expect(snapshot?.breakdown?.messageTokens).toBeLessThan(snapshot?.breakdown?.summaryTokens);
/** The save guard keeps it: a primary usage follows the latest snapshot. */
const afterLatest = usageEmitSink.slice(contextUsageSink.latestUsageIndex ?? 0);
expect(afterLatest.some((e) => e.usage_type == null)).toBe(true);
expect(
buildPersistedContextUsage(snapshot, usageEmitSink).breakdown.summaryTokens,
).toBeGreaterThan(0);
});
});

View File

@@ -546,16 +546,23 @@ function getDefaultHandlers({
checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node) ||
!metadata?.hide_sequential_outputs
) {
await emitEvent(res, streamId, { event, data });
/** Capture the latest visible snapshot (last-wins) + count visible
* snapshots (one per model call). The count lets the save path persist
* the breakdown only when the FINAL call emitted usage (primary usage
* events === snapshots), so completedOutputTokens is a real
* post-snapshot delta and reload doesn't over-report. */
/** Capture the latest visible snapshot (last-wins) and how many usage
* events preceded it BEFORE awaiting the emit. `emitEvent` can yield
* (resumable SSE / Redis publish); with parallel runs active this
* call's own primary usage could land in `usageEmitSink` during that
* yield, pushing `latestUsageIndex` past the very event that proves the
* snapshot completed — the save path would then slice it away and drop
* a valid breakdown. The recorded index lets the save path persist only
* when a PRIMARY usage follows this snapshot (the snapshot's call
* actually invoked the model); a summarization detour emits a snapshot
* whose only following usage is tagged `summarization`, which a plain
* snapshot-count would over-count and wrongly drop. */
if (contextUsageSink) {
contextUsageSink.latest = data;
contextUsageSink.count = (contextUsageSink.count ?? 0) + 1;
contextUsageSink.latestUsageIndex = usageEmitSink?.length ?? 0;
}
await emitEvent(res, streamId, { event, data });
}
},
};

View File

@@ -26,6 +26,8 @@ const {
aggregateEmittedUsage,
resolveAgentTokenConfig,
buildPersistedContextUsage,
computeSummaryUsedTokens,
priorRunOutputTokens,
createSubagentUsageSink,
isDeepSeekReasoningProvider,
GenerationJobManager,
@@ -869,16 +871,49 @@ class AgentClient extends BaseClient {
metadata.thoughtSignatures = signatures;
}
const usageEvents = this.usageEmitSink ?? [];
/** Persist the breakdown only when the FINAL visible call (the one the latest
* snapshot precedes) emitted usage — i.e. as many primary usage events as
* visible snapshots. If the final call emitted no usage_metadata (provider
* gap, or interrupted after an earlier call did emit), `completedOutputTokens`
* would be an earlier call's output the latest snapshot already counts, so
* reload would over-report; fall back to the coarse per-message estimate. */
const primaryUsageCount = usageEvents.filter((event) => event.usage_type == null).length;
const snapshotCount = this.contextUsageSink?.count ?? 0;
if (this.contextUsageSink?.latest && snapshotCount > 0 && primaryUsageCount >= snapshotCount) {
metadata.contextUsage = buildPersistedContextUsage(this.contextUsageSink.latest, usageEvents);
/** Persist the breakdown only when the latest snapshot's OWN run completed —
* i.e. a PRIMARY usage event (usage_type == null) from that run's id arrived
* AFTER the snapshot. Matching by run id keeps `completedOutputTokens` a real
* post-snapshot delta even when parallel/direct runs interleave (A snapshot →
* B snapshot → A usage must NOT persist B's snapshot with A's output); an
* interrupted final call that emits no usage falls back to the per-message
* estimate. It still keeps the post-summary snapshot: the summarization detour
* emits an extra snapshot whose following primary usage shares that run's id,
* which the old snapshot-count guard miscounted and wrongly dropped. Events
* without a run id (older lib / resume) match any snapshot for back-compat. */
const latestSnapshot = this.contextUsageSink?.latest;
const latestSnapshotUsageIndex = this.contextUsageSink?.latestUsageIndex ?? 0;
const latestSnapshotRunId = latestSnapshot?.runId;
const hasPrimaryAfterSnapshot = usageEvents
.slice(latestSnapshotUsageIndex)
.some(
(event) =>
event.usage_type == null &&
(latestSnapshotRunId == null ||
event.runId == null ||
event.runId === latestSnapshotRunId),
);
if (latestSnapshot && hasPrimaryAfterSnapshot) {
metadata.contextUsage = buildPersistedContextUsage(latestSnapshot, usageEvents);
}
/** Lightweight summarization marker — persisted whenever this turn compacted
* the context, INDEPENDENT of the snapshot guard above. When the client has
* no usable snapshot on the branch and falls back to the per-message
* estimate, it caps the discarded pre-summary history at this baseline
* instead of re-summing it (the gauge otherwise reads 100% forever). Shared
* with the abort save path via `computeSummaryUsedTokens`. Subtract the
* response's earlier tool-loop outputs (the primaries that preceded the
* latest snapshot, same run): those tokens are inside the snapshot baseline
* AND in the response `tokenCount` the client estimate adds on top, so
* leaving them in the marker double-counts them on a multi-call turn. */
const priorOutputTokens = priorRunOutputTokens(
usageEvents,
latestSnapshotUsageIndex,
latestSnapshotRunId,
);
const summaryUsedTokens = computeSummaryUsedTokens(latestSnapshot, priorOutputTokens);
if (summaryUsedTokens != null) {
metadata.summaryUsedTokens = summaryUsedTokens;
}
const usage = aggregateEmittedUsage(usageEvents);
if (usage) {

View File

@@ -133,6 +133,13 @@ export default function Breakdown({ view, showCost, currency }: BreakdownProps)
</>
) : (
<>
{view.branchTotals.summaryBaseline > 0 && (
<Row
label={localize('com_ui_context_summary')}
value={view.branchTotals.summaryBaseline}
max={maxTokens}
/>
)}
<Row label={localize('com_ui_input')} value={view.branchTotals.input} />
<Row
label={localize('com_ui_output')}

View File

@@ -232,7 +232,13 @@ export default function useTokenUsage({
};
}
const usedTokens = branchTotals.input + branchTotals.output + liveTokens;
/** `summaryBaseline` is the compacted-context size from the deepest
* summarized response on the branch (0 if none). The branch walk stops
* there, so input/output are post-summary only — adding the baseline keeps
* the estimate from re-summing the discarded pre-summary history (which
* otherwise pins the gauge at 100% forever after a compaction). */
const usedTokens =
branchTotals.input + branchTotals.output + branchTotals.summaryBaseline + liveTokens;
const maxTokens = limits.maxContextTokens;
return {
usedTokens,

View File

@@ -88,6 +88,64 @@ describe('token index', () => {
expect(altTotals.output).toBe(1019);
});
it('caps the branch at a summary marker instead of re-summing compacted history', () => {
const summarized = {
messageId: 'a2',
parentMessageId: 'u2',
isCreatedByUser: false,
tokenCount: 40,
conversationId: CONVO,
text: '',
/** a2's turn compacted the history; pre-invoke context was 500 tokens. */
metadata: { summaryUsedTokens: 500 },
} as TMessage;
buildIndex(CONVO, [
msg('u1', Constants.NO_PARENT, true, 100),
msg('a1', 'u1', false, 9000) /** huge pre-summary history, now discarded */,
msg('u2', 'a1', true, 200),
summarized,
msg('u3', 'a2', true, 15),
msg('a3', 'u3', false, 25),
]);
const totals = sumBranch(CONVO, 'a3');
/** Walk stops at a2: only its output + the post-summary turn are summed. */
expect(totals.summaryBaseline).toBe(500);
expect(totals.input).toBe(15);
expect(totals.output).toBe(65); // a3 (25) + a2 (40)
/** Estimate used = post-summary + compacted baseline = 580, not the 9380
* raw history sum that pinned the gauge at 100%. */
expect(totals.input + totals.output + totals.summaryBaseline).toBe(580);
});
it('keeps provider usage/cost across the full branch even past a summary marker', () => {
const summarized = {
messageId: 'a2',
parentMessageId: 'u2',
isCreatedByUser: false,
tokenCount: 40,
conversationId: CONVO,
text: '',
metadata: { usage: USAGE_B, summaryUsedTokens: 500 },
} as TMessage;
buildIndex(CONVO, [
msg('u1', Constants.NO_PARENT, true, 10),
responseMsg('a1', 'u1', 9000, USAGE_A) /** pre-summary spend */,
msg('u2', 'a1', true, 200),
summarized,
msg('u3', 'a2', true, 15),
responseMsg('a3', 'u3', 25, USAGE_A) /** post-summary spend */,
]);
const totals = sumBranch(CONVO, 'a3');
/** Context is capped at the marker... */
expect(totals.summaryBaseline).toBe(500);
/** ...but cost/usage is cumulative spend and spans the WHOLE branch
* (a1 + a2 + a3 = 0.01 + 0.02 + 0.01), not truncated at the summary boundary. */
expect(totals.usage.cost).toBeCloseTo(0.04);
expect(totals.usage.input).toBe(400); // 100 + 200 + 100
});
it('flags whether the anchor message is on the branch', () => {
buildIndex(CONVO, [
msg('u1', Constants.NO_PARENT, true, 10),
@@ -99,6 +157,34 @@ describe('token index', () => {
expect(sumBranch(CONVO, 'a1-alt', 'a1').containsAnchor).toBe(false);
});
it('stops matching the anchor once the context is capped at a summary marker', () => {
const summarized = {
messageId: 'a2',
parentMessageId: 'u2',
isCreatedByUser: false,
tokenCount: 40,
conversationId: CONVO,
text: '',
metadata: { summaryUsedTokens: 500 },
} as TMessage;
buildIndex(CONVO, [
msg('u1', Constants.NO_PARENT, true, 100),
msg('a1', 'u1', false, 20),
msg('u2', 'a1', true, 200),
summarized,
msg('u3', 'a2', true, 15),
msg('a3', 'u3', false, 25),
]);
/** a1 is older than the summary marker (a2): a snapshot anchored there is
* pre-summary, so it must NOT count as on-branch — else useTokenUsage revives
* that stale breakdown over the summary-baseline estimate. */
expect(sumBranch(CONVO, 'a3', 'a1').containsAnchor).toBe(false);
/** The summarized response's own (post-summary) snapshot still matches... */
expect(sumBranch(CONVO, 'a3', 'a2').containsAnchor).toBe(true);
/** ...as does a snapshot from a newer turn. */
expect(sumBranch(CONVO, 'a3', 'a3').containsAnchor).toBe(true);
});
it('tracks uncounted messages and tolerates missing parents', () => {
buildIndex(CONVO, [msg('u2', 'missing-parent', true, undefined), msg('a2', 'u2', false, 15)]);
@@ -159,6 +245,31 @@ describe('findBranchSnapshotAnchor', () => {
expect(findBranchSnapshotAnchor(CONVO, 'a1', new Map())).toBeNull();
expect(findBranchSnapshotAnchor('unknown', 'a1', new Map([['a1', 1]]))).toBeNull();
});
it('does not cross a summary marker to recover a stale pre-summary snapshot', () => {
const summarized = {
messageId: 'a2',
parentMessageId: 'u2',
isCreatedByUser: false,
tokenCount: 40,
conversationId: CONVO,
text: '',
metadata: { summaryUsedTokens: 500 },
} as TMessage;
buildIndex(CONVO, [
msg('u1', Constants.NO_PARENT, true, 10),
msg('a1', 'u1', false, 20) /** has a snapshot, but pre-summary */,
msg('u2', 'a1', true, 30),
summarized /** compacted here, no snapshot of its own */,
msg('u3', 'a2', true, 15),
msg('a3', 'u3', false, 25),
]);
/** a1 is the only stored anchor, but it sits before the summary — the walk
* must stop at a2 and return null so the summary-baseline estimate is used. */
expect(findBranchSnapshotAnchor(CONVO, 'a3', new Map([['a1', 1]]))).toBeNull();
/** When the summarized response itself has a snapshot, return it. */
expect(findBranchSnapshotAnchor(CONVO, 'a3', new Map([['a2', 1]]))).toBe('a2');
});
});
describe('estimateTokens', () => {

View File

@@ -31,6 +31,10 @@ export interface TokenEntry {
parentMessageId: string | null;
/** Per-response provider usage from `metadata.usage` (response messages only) */
usage?: BranchUsage;
/** Pre-invoke compacted context size (`metadata.summaryUsedTokens`) for a
* response whose turn summarized. Caps the estimate so it stops re-summing
* the now-discarded pre-summary history. */
summaryUsedTokens?: number;
}
export interface BranchTotals {
@@ -47,6 +51,11 @@ export interface BranchTotals {
containsAnchor: boolean;
/** Provider usage/cost summed along the active branch */
usage: BranchUsage;
/** Compacted-context baseline from the deepest summarized response on the
* branch (0 if none). The branch walk stops there, so `input`/`output` cover
* only the post-summary messages; the estimate adds this to avoid counting
* the discarded pre-summary history. */
summaryBaseline: number;
}
export const EMPTY_BRANCH: BranchTotals = {
@@ -57,6 +66,7 @@ export const EMPTY_BRANCH: BranchTotals = {
tailId: null,
containsAnchor: false,
usage: EMPTY_USAGE,
summaryBaseline: 0,
};
/** Module-level token index: conversationId → messageId → entry. Not render state. */
@@ -128,11 +138,16 @@ function addUsage(target: BranchUsage, usage?: BranchUsage): void {
}
function toEntry(message: Partial<TMessage>): TokenEntry {
const summaryUsedTokens = message.metadata?.summaryUsedTokens;
return {
tokenCount: typeof message.tokenCount === 'number' ? message.tokenCount : 0,
isCreatedByUser: message.isCreatedByUser === true,
parentMessageId: message.parentMessageId ?? null,
usage: readPersistedUsage(message),
summaryUsedTokens:
typeof summaryUsedTokens === 'number' && summaryUsedTokens > 0
? summaryUsedTokens
: undefined,
};
}
@@ -218,6 +233,11 @@ export function sumBranch(
const totals = { input: 0, output: 0, counted: 0, total: 0, containsAnchor: false };
const usage: BranchUsage = { ...EMPTY_USAGE };
let summaryBaseline = 0;
/** Once a summary marker is crossed, older turns are out of the CONTEXT WINDOW
* (subsumed by the baseline) — but their provider spend still happened, so the
* usage/cost walk continues to the root while context counting stops. */
let contextCapped = false;
let currentId: string | null = tailId;
let guard = index.size;
@@ -227,10 +247,15 @@ export function sumBranch(
break;
}
totals.total += 1;
if (anchorId != null && currentId === anchorId) {
/** Only match the anchor while still inside the active context window. An
* anchor OLDER than the deepest summary marker belongs to a pre-summary
* snapshot; treating it as on-branch would let `useTokenUsage` revive that
* stale breakdown (discarded history) over the summary-baseline estimate
* that `findBranchSnapshotAnchor` correctly refuses to recover. */
if (!contextCapped && anchorId != null && currentId === anchorId) {
totals.containsAnchor = true;
}
if (entry.tokenCount > 0) {
if (!contextCapped && entry.tokenCount > 0) {
totals.counted += 1;
if (entry.isCreatedByUser) {
totals.input += entry.tokenCount;
@@ -238,11 +263,19 @@ export function sumBranch(
totals.output += entry.tokenCount;
}
}
/** Cost/usage is cumulative spend — never truncated at the summary boundary. */
addUsage(usage, entry.usage);
/** This response's turn compacted the history: its own output is counted
* above; record the pre-invoke compacted baseline and stop counting context
* tokens for older (summarized-away) turns, but keep walking for cost. */
if (!contextCapped && entry.summaryUsedTokens != null) {
summaryBaseline = entry.summaryUsedTokens;
contextCapped = true;
}
currentId = entry.parentMessageId;
}
return { ...totals, tailId, usage };
return { ...totals, tailId, usage, summaryBaseline };
}
/**
@@ -311,6 +344,12 @@ export function findBranchSnapshotAnchor(
if (!entry) {
break;
}
/** Stop at a summarized response that has no snapshot of its own: crossing it
* would recover an older PRE-summary snapshot (discarded history), which the
* summary-baseline estimate is meant to replace. */
if (entry.summaryUsedTokens != null) {
return null;
}
currentId = entry.parentMessageId;
}

View File

@@ -10,6 +10,8 @@ import {
resolveAgentTokenConfig,
buildPersistedContextUsage,
buildAbortedResponseMetadata,
computeSummaryUsedTokens,
priorRunOutputTokens,
} from './usage';
describe('recordCollectedUsage', () => {
@@ -1752,6 +1754,157 @@ describe('buildPersistedContextUsage', () => {
it('omits completedOutputTokens when there are no primary calls', () => {
expect(buildPersistedContextUsage(baseSnapshot, []).completedOutputTokens).toBeUndefined();
});
it('attributes completedOutputTokens to the snapshot run, not a parallel run', () => {
/** Parallel/direct runs interleave: this snapshot is run-1, but run-2 emits a
* later primary usage. The persisted delta must be run-1's own output (40),
* never run-2's trailing output (99). */
const events: TTokenUsageEvent[] = [
{
input_tokens: 100,
output_tokens: 40,
total_tokens: 140,
provider: 'openAI',
runId: 'run-1',
},
{
input_tokens: 200,
output_tokens: 99,
total_tokens: 299,
provider: 'openAI',
runId: 'run-2',
},
];
const result = buildPersistedContextUsage(baseSnapshot, events);
expect(result.completedOutputTokens).toBe(40);
});
it('omits completedOutputTokens when only other runs emitted usage', () => {
/** The snapshot run never completed (no matching primary); a sibling run's
* output must not be borrowed — fall back to the per-message estimate. */
const events: TTokenUsageEvent[] = [
{
input_tokens: 200,
output_tokens: 99,
total_tokens: 299,
provider: 'openAI',
runId: 'run-2',
},
];
expect(buildPersistedContextUsage(baseSnapshot, events).completedOutputTokens).toBeUndefined();
});
it('matches untagged usage events for back-compat (older lib / resume)', () => {
/** Events without a runId predate run tagging; they match any snapshot so the
* last primary (25) is still recorded. */
const events: TTokenUsageEvent[] = [
{ input_tokens: 100, output_tokens: 40, total_tokens: 140, provider: 'openAI' },
{ input_tokens: 200, output_tokens: 25, total_tokens: 225, provider: 'openAI' },
];
expect(buildPersistedContextUsage(baseSnapshot, events).completedOutputTokens).toBe(25);
});
});
describe('computeSummaryUsedTokens', () => {
const summarized = (over?: Partial<TContextUsageEvent>): TContextUsageEvent => ({
runId: 'run-1',
breakdown: {
maxContextTokens: 1000,
instructionTokens: 50,
systemMessageTokens: 50,
dynamicInstructionTokens: 0,
toolSchemaTokens: 0,
summaryTokens: 80,
toolCount: 0,
messageCount: 1,
messageTokens: 20,
availableForMessages: 900,
},
contextBudget: 1000,
remainingContextTokens: 700,
effectiveInstructionTokens: 50,
...over,
});
it('uses contextBudget remainingContextTokens when available', () => {
expect(computeSummaryUsedTokens(summarized())).toBe(300);
});
it('falls back to instructions + summary + messages, including summaryTokens', () => {
/** summaryTokens is a separate breakdown field, so the no-remaining fallback
* must add it: 50 + 80 + 20 = 150, not 70. */
expect(computeSummaryUsedTokens(summarized({ remainingContextTokens: undefined }))).toBe(150);
});
it('returns undefined when the turn did not summarize', () => {
expect(
computeSummaryUsedTokens(
summarized({ breakdown: { ...summarized().breakdown, summaryTokens: 0 } }),
),
).toBeUndefined();
expect(computeSummaryUsedTokens(null)).toBeUndefined();
expect(computeSummaryUsedTokens(undefined)).toBeUndefined();
});
it('subtracts the responses earlier tool-loop outputs from the marker', () => {
/** 300 baseUsed 90 earlier outputs = 210, so the client estimate
* (summaryBaseline + full response tokenCount) doesnt double-count them. */
expect(computeSummaryUsedTokens(summarized(), 90)).toBe(210);
});
it('clamps to undefined when the prior outputs exceed the baseline', () => {
expect(computeSummaryUsedTokens(summarized(), 5000)).toBeUndefined();
});
});
describe('priorRunOutputTokens', () => {
const ev = (over: Partial<TTokenUsageEvent>): TTokenUsageEvent => ({
input_tokens: 10,
output_tokens: 0,
total_tokens: 10,
provider: 'openAI',
...over,
});
it('sums primary outputs before the index for the matching run', () => {
const events = [
ev({ output_tokens: 20, runId: 'run-1' }),
ev({ output_tokens: 30, runId: 'run-1' }),
ev({ output_tokens: 99, runId: 'run-1' }), // at/after the index — excluded
];
expect(priorRunOutputTokens(events, 2, 'run-1')).toBe(50);
});
it('counts run-matched primary + summarization, skips subagent/sequential and other runs', () => {
/** Both the primary and the summarization output are in this run's tokenCount
* AND baseline, so both are subtracted; subagent/sequential are excluded from
* the reported output total; a parallel run's primary is not this snapshot's. */
const events = [
ev({ output_tokens: 20, runId: 'run-1' }), // primary, matches
ev({ output_tokens: 8, runId: 'run-1', usage_type: 'summarization' }), // counted
ev({ output_tokens: 5, runId: 'run-1', usage_type: 'subagent' }), // skipped
ev({ output_tokens: 7, runId: 'run-1', usage_type: 'sequential' }), // skipped
ev({ output_tokens: 40, runId: 'run-2' }), // other-run primary — skipped
];
expect(priorRunOutputTokens(events, 5, 'run-1')).toBe(28);
});
it('does not subtract a parallel sibling runs summarization output', () => {
/** The summarize detour inherits the graph run id (traceConfig), so a sibling
* run's summary carries a DIFFERENT runId; its summary is in the sibling's
* baseline, not this snapshot's, so subtracting it would under-report. */
const events = [
ev({ output_tokens: 20, runId: 'run-1' }),
ev({ output_tokens: 8, runId: 'run-2', usage_type: 'summarization' }), // sibling — skipped
];
expect(priorRunOutputTokens(events, 2, 'run-1')).toBe(20);
});
it('matches untagged events for back-compat and returns 0 with no prior calls', () => {
const events = [ev({ output_tokens: 20 }), ev({ output_tokens: 30 })];
expect(priorRunOutputTokens(events, 2, 'run-1')).toBe(50);
expect(priorRunOutputTokens(events, 0, 'run-1')).toBe(0);
});
});
describe('buildAbortedResponseMetadata', () => {
@@ -1783,6 +1936,65 @@ describe('buildAbortedResponseMetadata', () => {
expect(result).toEqual({ usage: { input: 100, output: 20, cacheWrite: 0, cacheRead: 0 } });
expect((result as { contextUsage?: unknown }).contextUsage).toBeUndefined();
});
const abortSnapshot: TContextUsageEvent = {
runId: 'run-1',
breakdown: {
maxContextTokens: 1000,
instructionTokens: 50,
systemMessageTokens: 50,
dynamicInstructionTokens: 0,
toolSchemaTokens: 0,
summaryTokens: 80,
toolCount: 0,
messageCount: 1,
messageTokens: 20,
availableForMessages: 900,
},
contextBudget: 1000,
remainingContextTokens: 700,
};
it('persists the summary marker (but not the full snapshot) for a stopped summarized turn', () => {
/** A single-call stopped turn: the interrupted call emitted no usage, so the
* marker is the full baseUsed (300) with nothing to subtract. */
const result = buildAbortedResponseMetadata({
tokenUsage: JSON.stringify([]),
contextUsage: JSON.stringify(abortSnapshot),
});
expect(result?.summaryUsedTokens).toBe(300);
/** The full snapshot stays off the abort path (completedOutputTokens ambiguity). */
expect((result as { contextUsage?: unknown }).contextUsage).toBeUndefined();
});
it('does not subtract any output from the marker on a stopped turn', () => {
/** The abort tokenCount comes from countTokens(text) or is absent — it does
* NOT fold in summarization/earlier-call output the way recordCollectedUsage
* does. So the marker is the full baseUsed (300); subtracting the summarization
* (8) or the primary (20) here would under-report after reload. */
const events: TTokenUsageEvent[] = [
{
input_tokens: 100,
output_tokens: 20,
total_tokens: 120,
provider: 'openAI',
runId: 'run-1',
},
{
input_tokens: 60,
output_tokens: 8,
total_tokens: 68,
provider: 'openAI',
runId: 'run-1',
usage_type: 'summarization',
},
];
const result = buildAbortedResponseMetadata({
tokenUsage: JSON.stringify(events),
contextUsage: JSON.stringify(abortSnapshot),
});
expect(result?.summaryUsedTokens).toBe(300);
});
});
describe('resolveAgentTokenConfig', () => {

View File

@@ -247,16 +247,23 @@ function normalizeEventUnits(event: TTokenUsageEvent): {
};
}
/** Output tokens of the response's final primary model call — the call the
* latest pre-invoke snapshot precedes. Persisted as the snapshot's
* `completedOutputTokens` so a reloaded multi-call turn adds only this delta
* (matching the live finalizer) instead of the full response `tokenCount`,
* which the snapshot already counts for earlier steps. */
function finalCallOutputTokens(events: ReadonlyArray<TTokenUsageEvent>): number {
/** Output tokens of the final primary model call belonging to the snapshot's
* run — the call the latest pre-invoke snapshot precedes. Persisted as the
* snapshot's `completedOutputTokens` so a reloaded multi-call turn adds only
* this delta (matching the live finalizer) instead of the full response
* `tokenCount`, which the snapshot already counts for earlier steps. Filtering
* by `runId` prevents a parallel run's later usage from being attributed to this
* snapshot; untagged events (older lib / resume) match any run for back-compat. */
function finalCallOutputTokens(events: ReadonlyArray<TTokenUsageEvent>, runId?: string): number {
for (let i = events.length - 1; i >= 0; i--) {
if (events[i].usage_type == null) {
return normalizeEventUnits(events[i]).output;
const event = events[i];
if (event.usage_type != null) {
continue;
}
if (runId != null && event.runId != null && event.runId !== runId) {
continue;
}
return normalizeEventUnits(event).output;
}
return 0;
}
@@ -273,7 +280,7 @@ export function buildPersistedContextUsage(
usageEvents: ReadonlyArray<TTokenUsageEvent> = [],
): TContextUsageEvent {
const { breakdown } = snapshot;
const completedOutputTokens = finalCallOutputTokens(usageEvents);
const completedOutputTokens = finalCallOutputTokens(usageEvents, snapshot.runId);
let toolTokenCounts = breakdown.toolTokenCounts;
if (toolTokenCounts != null) {
const trimmed: Record<string, number> = {};
@@ -291,6 +298,83 @@ export function buildPersistedContextUsage(
};
}
/**
* Sum of this response's output tokens already folded into a later snapshot's
* pre-invoke baseline that the response message's `tokenCount` ALSO carries — the
* overlap `computeSummaryUsedTokens` subtracts from the marker so the live-path
* client estimate (`summaryBaseline + responseTokenCount`) doesn't double-count:
* - earlier tool-loop PRIMARY calls: a multi-call turn's first output sits in the
* kept-message context of the next call's snapshot AND in `tokenCount`.
* - the SUMMARIZATION call's generated summary: it sits in the snapshot baseline
* as `summaryTokens` AND in `tokenCount` (`recordCollectedUsage` folds
* summarization completion into the reported output total; subagent/sequential
* are kept out of that total, so they are excluded here too).
*
* Both are matched by `runId` and bounded by `beforeIndex` to the calls that
* preceded the snapshot. The summarize detour inherits the graph run id
* (`traceConfig` spreads `config.metadata.run_id`), so it shares the snapshot's
* `runId`; a parallel sibling run's summary carries a DIFFERENT `runId` and must
* NOT be subtracted (its summary lives in the sibling's baseline, not this one).
* Untagged events (older lib / resume) match any run for back-compat.
*
* Only the live path (which builds `tokenCount` via `recordCollectedUsage`) calls
* this; the abort path subtracts nothing — see {@link buildAbortedResponseMetadata}.
*/
export function priorRunOutputTokens(
events: ReadonlyArray<TTokenUsageEvent>,
beforeIndex: number,
runId?: string,
): number {
let total = 0;
const end = Math.min(beforeIndex, events.length);
for (let i = 0; i < end; i++) {
const event = events[i];
if (event.usage_type != null && event.usage_type !== 'summarization') {
continue;
}
if (runId != null && event.runId != null && event.runId !== runId) {
continue;
}
total += normalizeEventUnits(event).output;
}
return total;
}
/**
* Pre-invoke compacted context size for a summarized turn (instructions +
* summary + kept messages), or `undefined` when the turn did not summarize.
* Persisted as the lightweight `summaryUsedTokens` marker so the client estimate
* fallback caps the discarded pre-summary history instead of re-summing it (the
* gauge otherwise reads 100% in perpetuity after a compaction). Pre-invoke, so
* it carries none of the `completedOutputTokens` ambiguity that keeps the full
* snapshot off some save paths. `summaryTokens` is a SEPARATE breakdown field, so
* the non-`remainingContextTokens` fallback adds it explicitly.
*
* `priorOutputTokens` (this response's earlier tool-loop outputs, see
* {@link priorRunOutputTokens}) is subtracted: those tokens are inside the
* baseline's kept messages AND in the response message's `tokenCount` the client
* adds on top, so leaving them in the marker double-counts them on a tool-loop
* summarized turn. Single-call turns pass 0 and are unaffected.
*/
export function computeSummaryUsedTokens(
snapshot: TContextUsageEvent | null | undefined,
priorOutputTokens = 0,
): number | undefined {
const summaryTokens = snapshot?.breakdown?.summaryTokens ?? 0;
if (!snapshot || summaryTokens <= 0) {
return undefined;
}
const maxTokens = snapshot.contextBudget ?? snapshot.breakdown.maxContextTokens ?? 0;
const baseUsed =
snapshot.remainingContextTokens != null
? maxTokens - snapshot.remainingContextTokens
: (snapshot.effectiveInstructionTokens ?? snapshot.breakdown.instructionTokens ?? 0) +
summaryTokens +
(snapshot.breakdown.messageTokens ?? 0);
const adjusted = baseUsed - Math.max(0, priorOutputTokens);
return adjusted > 0 ? Math.round(adjusted) : undefined;
}
function parseUsageEvents(value?: string | null): TTokenUsageEvent[] {
if (typeof value !== 'string' || value.length === 0) {
return [];
@@ -309,21 +393,50 @@ function parseUsageEvents(value?: string | null): TTokenUsageEvent[] {
* reload (finding: stopped responses otherwise lose cost). Shared by every abort
* save path (agents abort route + legacy abort middleware).
*
* Deliberately persists ONLY `usage`, not `contextUsage`: unlike the live path,
* the abort path can't tell whether the FINAL call (the one the latest snapshot
* precedes) emitted usage — the job stores only the latest snapshot, not the
* snapshot count. If the final call emitted none, `completedOutputTokens` would
* reuse an earlier call's output the snapshot already counts → reload
* over-reports. A stopped/incomplete response therefore falls back to the coarse
* per-message gauge estimate on reload, which is both safe and apt for an
* interrupted turn that never reached a clean pre-invoke breakdown.
* Deliberately omits the full `contextUsage`: unlike the live path, the abort
* path can't tell whether the FINAL call (the one the latest snapshot precedes)
* emitted usage — the job stores only the latest snapshot, not the snapshot
* count. If the final call emitted none, `completedOutputTokens` would reuse an
* earlier call's output the snapshot already counts → reload over-reports. So a
* stopped response falls back to the per-message gauge estimate on reload.
*
* It DOES persist the `summaryUsedTokens` marker when the stopped turn had
* summarized: that marker is pre-invoke (no `completedOutputTokens` ambiguity),
* and without it the fallback estimate re-sums the history the compaction
* discarded — leaving a stopped summarized turn pinned at 100%. Unlike the live
* path, the abort `tokenCount` comes from `countTokens(text)` (abortMiddleware) or
* is absent (agents abort route) — it does NOT fold in the summarization or
* earlier-call output the way `recordCollectedUsage` does. So the marker subtracts
* NOTHING: the full pre-invoke baseline is correct, and the client adds only the
* partial answer text on top (no overlap to cancel).
*/
export function buildAbortedResponseMetadata(
job: { tokenUsage?: string | null } | null | undefined,
): { usage?: TResponseUsage } | undefined {
job: { tokenUsage?: string | null; contextUsage?: string | null } | null | undefined,
): { usage?: TResponseUsage; summaryUsedTokens?: number } | undefined {
const events = parseUsageEvents(job?.tokenUsage);
const usage = aggregateEmittedUsage(events);
return usage ? { usage } : undefined;
let snapshot: TContextUsageEvent | null = null;
if (typeof job?.contextUsage === 'string' && job.contextUsage.length > 0) {
try {
snapshot = JSON.parse(job.contextUsage) as TContextUsageEvent;
} catch {
snapshot = null;
}
}
/** Subtract nothing: the abort `tokenCount` (countTokens(text) or absent) does
* not fold in summarization/earlier-call output, so the full baseline is the
* marker and the client's partial-text addition has no overlap to cancel. */
const summaryUsedTokens = computeSummaryUsedTokens(snapshot);
const metadata: { usage?: TResponseUsage; summaryUsedTokens?: number } = {};
if (usage) {
metadata.usage = usage;
}
if (summaryUsedTokens != null) {
metadata.summaryUsedTokens = summaryUsedTokens;
}
return Object.keys(metadata).length > 0 ? metadata : undefined;
}
/**