🪢 fix: Tie MCP Cleanup To Resumable Runs (#13769)

* fix: Clean up request-scoped MCP connections

* test: Format MCP request context spec

* refactor: Move MCP request context to API package
This commit is contained in:
Danny Avila
2026-06-15 15:26:03 -04:00
committed by GitHub
parent 0537930144
commit 055585f9f1
6 changed files with 395 additions and 70 deletions

View File

@@ -1,3 +1,5 @@
const { EventEmitter } = require('events');
const mockLogger = {
debug: jest.fn(),
warn: jest.fn(),
@@ -21,6 +23,58 @@ const mockFilterPersistableAbortContent = jest.fn((content) =>
const mockGetConvo = jest.fn();
const mockGetMessages = jest.fn();
const mockSaveMessage = jest.fn();
let mockMCPContexts = new WeakMap();
const mockCreateMCPRequestContext = jest.fn(() => ({
connections: new Map(),
pending: new Map(),
cleanupStarted: false,
cleanupOnResponse: false,
responseCleanupAttached: false,
}));
const mockGetMCPRequestContext = jest.fn((req) => {
if (!req) {
return undefined;
}
let context = mockMCPContexts.get(req);
if (!context) {
context = mockCreateMCPRequestContext();
mockMCPContexts.set(req, context);
}
return context.cleanupStarted ? undefined : context;
});
const mockCleanupMCPRequestContext = jest.fn(async (context) => {
if (!context || context.cleanupStarted) {
return;
}
context.cleanupStarted = true;
const connections = new Set(context.connections.values());
const settled = await Promise.allSettled(context.pending.values());
for (const result of settled) {
if (result.status === 'fulfilled' && result.value) {
connections.add(result.value);
}
}
await Promise.allSettled(Array.from(connections).map((connection) => connection.disconnect?.()));
context.connections.clear();
context.pending.clear();
});
const mockCleanupMCPRequestContextForReq = jest.fn(async (req) => {
const context = mockMCPContexts.get(req);
if (!context) {
return;
}
try {
await mockCleanupMCPRequestContext(context);
} finally {
mockMCPContexts.delete(req);
}
});
jest.mock('@librechat/data-schemas', () => ({
logger: mockLogger,
@@ -32,7 +86,11 @@ jest.mock('@librechat/api', () => ({
buildMessageFiles: jest.fn(() => []),
resolveTitleTiming: jest.fn(() => 'immediate'),
GenerationJobManager: mockGenerationJobManager,
cleanupMCPRequestContext: (...args) => mockCleanupMCPRequestContext(...args),
createMCPRequestContext: (...args) => mockCreateMCPRequestContext(...args),
getMCPRequestContext: (...args) => mockGetMCPRequestContext(...args),
filterPersistableAbortContent: (...args) => mockFilterPersistableAbortContent(...args),
cleanupMCPRequestContextForReq: (...args) => mockCleanupMCPRequestContextForReq(...args),
decrementPendingRequest: (...args) => mockDecrementPendingRequest(...args),
sanitizeMessageForTransmit: jest.fn((message) => message),
checkAndIncrementPendingRequest: (...args) => mockCheckAndIncrementPendingRequest(...args),
@@ -79,10 +137,33 @@ jest.mock('~/models', () => ({
}));
const AgentController = require('../request');
const { getMCPRequestContext } = require('~/server/services/MCPRequestContext');
function createResumableResponse() {
const res = new EventEmitter();
res.headersSent = false;
res.writableEnded = false;
res.finished = false;
res.destroyed = false;
res.json = jest.fn(() => {
res.headersSent = true;
res.writableEnded = true;
res.finished = true;
res.emit('finish');
return res;
});
res.status = jest.fn(() => res);
return res;
}
function nextTick() {
return new Promise((resolve) => setImmediate(resolve));
}
describe('ResumableAgentController resume metadata', () => {
beforeEach(() => {
jest.clearAllMocks();
mockMCPContexts = new WeakMap();
mockCheckAndIncrementPendingRequest.mockResolvedValue({ allowed: true });
mockDecrementPendingRequest.mockResolvedValue(undefined);
mockGetConvo.mockResolvedValue({ createdAt: '2026-06-07T00:00:00.000Z' });
@@ -228,6 +309,47 @@ describe('ResumableAgentController resume metadata', () => {
);
});
it('keeps request-scoped MCP connections until resumable initialization finishes', async () => {
const conversationId = 'conversation-123';
const disconnect = jest.fn().mockResolvedValue(undefined);
const initializeClient = jest.fn(async ({ req, res }) => {
const context = getMCPRequestContext(req, res);
context.connections.set('mcp-server', { disconnect });
await nextTick();
expect(disconnect).not.toHaveBeenCalled();
throw new Error('stop after request-scoped MCP connection');
});
const req = {
user: { id: 'user-123' },
body: {
text: 'Use a BODY-scoped MCP server.',
messageId: 'user-message',
parentMessageId: 'parent-message',
conversationId,
endpointOption: {
endpoint: 'agents',
modelOptions: { model: 'gpt-4.1' },
},
},
config: {},
};
const res = createResumableResponse();
await AgentController(req, res, jest.fn(), initializeClient, null);
expect(res.json).toHaveBeenCalledWith({
streamId: conversationId,
conversationId,
status: 'started',
});
expect(disconnect).toHaveBeenCalledTimes(1);
expect(disconnect.mock.invocationCallOrder[0]).toBeLessThan(
mockDecrementPendingRequest.mock.invocationCallOrder[0],
);
});
it('stores model spec icon fallbacks and agent ids in early resume metadata', async () => {
const conversationId = 'conversation-123';
const initializeClient = jest.fn().mockRejectedValue(new Error('stop before tool loading'));

View File

@@ -13,6 +13,10 @@ const {
isUnpersistedPreliminaryParent,
} = require('@librechat/api');
const { disposeClient, clientRegistry, requestDataMap } = require('~/server/cleanup');
const {
getMCPRequestContext,
cleanupMCPRequestContextForReq,
} = require('~/server/services/MCPRequestContext');
const { handleAbortError } = require('~/server/middleware');
const { logViolation } = require('~/cache');
const { saveMessage, getMessages, getConvo } = require('~/models');
@@ -139,6 +143,14 @@ function getAgentResponseModel(req, endpointOption) {
return getEndpointResponseModel(endpointOption);
}
async function finishResumableRequest(req, userId) {
try {
await cleanupMCPRequestContextForReq(req);
} finally {
await decrementPendingRequest(userId);
}
}
function rejectPreliminaryParentMessageId(res) {
return res.status(409).json({
error:
@@ -209,6 +221,7 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit
const job = await GenerationJobManager.createJob(streamId, userId, conversationId);
const jobCreatedAt = job.createdAt; // Capture creation time to detect job replacement
req._resumableStreamId = streamId;
getMCPRequestContext(req, undefined, { cleanupOnResponse: false });
// Send JSON response IMMEDIATELY so client can connect to SSE stream
// This is critical: tool loading (MCP OAuth) may emit events that the client needs to receive
@@ -316,7 +329,7 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit
if (job.abortController.signal.aborted) {
GenerationJobManager.completeJob(streamId, 'Request aborted during initialization');
await decrementPendingRequest(userId);
await finishResumableRequest(req, userId);
return;
}
@@ -552,7 +565,7 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit
acceptsTitleEvents = false;
resolveConvoReady();
// Still decrement pending request since we incremented at start
await decrementPendingRequest(userId);
await finishResumableRequest(req, userId);
if (immediateTitlePromise) {
immediateTitlePromise.finally(() => {
if (client) {
@@ -602,7 +615,7 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit
await GenerationJobManager.emitDone(streamId, finalEvent);
GenerationJobManager.completeJob(streamId);
await decrementPendingRequest(userId);
await finishResumableRequest(req, userId);
} else {
const finalEvent = {
final: true,
@@ -622,7 +635,7 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit
await GenerationJobManager.emitDone(streamId, finalEvent);
GenerationJobManager.completeJob(streamId, 'Request aborted');
await decrementPendingRequest(userId);
await finishResumableRequest(req, userId);
}
if (titleTiming === 'immediate') {
@@ -680,7 +693,7 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit
GenerationJobManager.completeJob(streamId, error.message);
}
await decrementPendingRequest(userId);
await finishResumableRequest(req, userId);
// Defer disposal until any immediate title settles (it holds the run/req).
if (immediateTitlePromise) {
@@ -704,7 +717,7 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit
`[ResumableAgentController] Unhandled error in background generation: ${err.message}`,
);
GenerationJobManager.completeJob(streamId, err.message);
await decrementPendingRequest(userId);
await finishResumableRequest(req, userId);
});
} catch (error) {
logger.error('[ResumableAgentController] Initialization error:', error);
@@ -715,7 +728,7 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit
await GenerationJobManager.emitError(streamId, error.message || 'Failed to start generation');
}
GenerationJobManager.completeJob(streamId, error.message);
await decrementPendingRequest(userId);
await finishResumableRequest(req, userId);
if (client) {
disposeClient(client);
}

View File

@@ -1,68 +1,12 @@
const { logger } = require('@librechat/data-schemas');
const MCP_REQUEST_CONTEXT = Symbol.for('librechat.mcpRequestContext');
function createMCPRequestContext() {
return {
connections: new Map(),
pending: new Map(),
cleanupStarted: false,
};
}
async function cleanupMCPRequestContext(context) {
if (!context || context.cleanupStarted) {
return;
}
context.cleanupStarted = true;
const connections = new Set(context.connections.values());
const pending = Array.from(context.pending.values());
if (pending.length > 0) {
const settled = await Promise.allSettled(pending);
for (const result of settled) {
if (result.status === 'fulfilled' && result.value) {
connections.add(result.value);
}
}
}
await Promise.allSettled(
Array.from(connections).map(async (connection) => {
try {
await connection.disconnect();
} catch (error) {
logger.warn('[MCP Request Context] Failed to disconnect request-scoped connection', error);
}
}),
);
context.connections.clear();
context.pending.clear();
}
function getMCPRequestContext(req, res) {
if (!req) {
return undefined;
}
if (!req[MCP_REQUEST_CONTEXT]) {
const context = createMCPRequestContext();
req[MCP_REQUEST_CONTEXT] = context;
const cleanup = () => {
cleanupMCPRequestContext(context).catch((error) => {
logger.warn('[MCP Request Context] Cleanup failed', error);
});
};
res?.once?.('finish', cleanup);
res?.once?.('close', cleanup);
}
return req[MCP_REQUEST_CONTEXT];
}
const {
cleanupMCPRequestContextForReq,
cleanupMCPRequestContext,
createMCPRequestContext,
getMCPRequestContext,
} = require('@librechat/api');
module.exports = {
cleanupMCPRequestContextForReq,
cleanupMCPRequestContext,
createMCPRequestContext,
getMCPRequestContext,

View File

@@ -19,6 +19,7 @@ export * from './mcp/zod';
export * from './mcp/errors';
export * from './mcp/cache';
export * from './mcp/tools';
export * from './mcp/request';
/* Utilities */
export * from './mcp/utils';
export * from './utils';

View File

@@ -0,0 +1,82 @@
import { EventEmitter } from 'events';
import { getMCPRequestContext, cleanupMCPRequestContextForReq } from '~/mcp/request';
jest.mock('@librechat/data-schemas', () => ({
logger: {
warn: jest.fn(),
},
}));
function createResponse({ ended = false } = {}): EventEmitter & {
writableEnded: boolean;
finished: boolean;
destroyed: boolean;
} {
const res = new EventEmitter() as EventEmitter & {
writableEnded: boolean;
finished: boolean;
destroyed: boolean;
};
res.writableEnded = ended;
res.finished = ended;
res.destroyed = false;
return res;
}
function nextTick(): Promise<void> {
return new Promise((resolve) => setImmediate(resolve));
}
describe('MCP request context', () => {
beforeEach(() => {
jest.clearAllMocks();
});
it('does not create a response-scoped context after the response has finished', () => {
const req = {};
const res = createResponse({ ended: true });
expect(getMCPRequestContext(req, res)).toBeUndefined();
});
it('keeps job-scoped contexts alive after response finish until explicit cleanup', async () => {
const req = {};
const res = createResponse();
const context = getMCPRequestContext(req, undefined, { cleanupOnResponse: false });
const disconnect = jest.fn().mockResolvedValue(undefined);
context?.connections.set('server', { disconnect });
expect(getMCPRequestContext(req, res)).toBe(context);
res.emit('finish');
await nextTick();
expect(disconnect).not.toHaveBeenCalled();
await cleanupMCPRequestContextForReq(req);
expect(disconnect).toHaveBeenCalledTimes(1);
expect(context?.connections.size).toBe(0);
expect(context?.pending.size).toBe(0);
});
it('cleans response-scoped contexts when the response finishes', async () => {
const req = {};
const res = createResponse();
const context = getMCPRequestContext(req, res);
const disconnect = jest.fn().mockResolvedValue(undefined);
const pendingDisconnect = jest.fn().mockResolvedValue(undefined);
context?.connections.set('server', { disconnect });
context?.pending.set('pending-server', Promise.resolve({ disconnect: pendingDisconnect }));
res.emit('finish');
await nextTick();
expect(disconnect).toHaveBeenCalledTimes(1);
expect(pendingDisconnect).toHaveBeenCalledTimes(1);
expect(context?.connections.size).toBe(0);
expect(context?.pending.size).toBe(0);
});
});

View File

@@ -0,0 +1,163 @@
import { logger } from '@librechat/data-schemas';
import type { RequestScopedMCPConnectionStore } from './types';
export interface MCPRequestContext extends RequestScopedMCPConnectionStore {
cleanupStarted: boolean;
cleanupOnResponse: boolean;
responseCleanupAttached: boolean;
}
export interface MCPRequestContextOptions {
cleanupOnResponse?: boolean;
}
interface MCPResponseLike {
writableEnded?: boolean;
finished?: boolean;
destroyed?: boolean;
once?: (event: 'finish' | 'close', listener: () => void) => unknown;
}
interface Disconnectable {
disconnect: () => Promise<unknown> | unknown;
}
const contexts = new WeakMap<object, MCPRequestContext>();
export function createMCPRequestContext(): MCPRequestContext {
return {
connections: new Map<string, unknown>(),
pending: new Map<string, Promise<unknown>>(),
cleanupStarted: false,
cleanupOnResponse: true,
responseCleanupAttached: false,
};
}
function isDisconnectable(value: unknown): value is Disconnectable {
return (
value != null &&
typeof value === 'object' &&
'disconnect' in value &&
typeof value.disconnect === 'function'
);
}
export async function cleanupMCPRequestContext(context?: MCPRequestContext): Promise<void> {
if (!context || context.cleanupStarted) {
return;
}
context.cleanupStarted = true;
const connections = new Set<Disconnectable>();
for (const connection of context.connections.values()) {
if (isDisconnectable(connection)) {
connections.add(connection);
}
}
const pending = Array.from(context.pending.values());
if (pending.length > 0) {
const settled = await Promise.allSettled(pending);
for (const result of settled) {
if (result.status === 'fulfilled' && isDisconnectable(result.value)) {
connections.add(result.value);
}
}
}
await Promise.allSettled(
Array.from(connections).map(async (connection) => {
try {
await connection.disconnect();
} catch (error) {
logger.warn('[MCP Request Context] Failed to disconnect request-scoped connection', error);
}
}),
);
context.connections.clear();
context.pending.clear();
}
function isResponseFinished(res?: MCPResponseLike): boolean {
return Boolean(res?.writableEnded || res?.finished || res?.destroyed);
}
function runCleanup(context: MCPRequestContext): void {
cleanupMCPRequestContext(context).catch((error) => {
logger.warn('[MCP Request Context] Cleanup failed', error);
});
}
function attachResponseCleanup(context: MCPRequestContext, res?: MCPResponseLike): void {
if (!res || context.responseCleanupAttached || context.cleanupOnResponse === false) {
return;
}
const cleanup = () => runCleanup(context);
if (isResponseFinished(res)) {
cleanup();
return;
}
if (typeof res.once !== 'function') {
return;
}
context.responseCleanupAttached = true;
res.once('finish', cleanup);
res.once('close', cleanup);
if (isResponseFinished(res)) {
cleanup();
}
}
export function getMCPRequestContext(
req?: object,
res?: MCPResponseLike,
options: MCPRequestContextOptions = {},
): MCPRequestContext | undefined {
if (!req) {
return undefined;
}
const cleanupOnResponse = options.cleanupOnResponse !== false;
let context = contexts.get(req);
if (!context) {
if (cleanupOnResponse && isResponseFinished(res)) {
return undefined;
}
context = createMCPRequestContext();
context.cleanupOnResponse = cleanupOnResponse;
contexts.set(req, context);
} else if (!cleanupOnResponse) {
context.cleanupOnResponse = false;
}
if (cleanupOnResponse) {
attachResponseCleanup(context, res);
}
return context.cleanupStarted ? undefined : context;
}
export async function cleanupMCPRequestContextForReq(req?: object): Promise<void> {
if (!req) {
return;
}
const context = contexts.get(req);
if (!context) {
return;
}
try {
await cleanupMCPRequestContext(context);
} finally {
contexts.delete(req);
}
}