mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-06-16 07:51:32 +03:00
* fix(mcp): reuse request-scoped connections per run * test(mcp): update connection factory defaults
1064 lines
35 KiB
JavaScript
1064 lines
35 KiB
JavaScript
const { tool } = require('@librechat/agents/langchain/tools');
|
|
const { logger, getTenantId } = require('@librechat/data-schemas');
|
|
const { Providers, Constants: AgentConstants } = require('@librechat/agents');
|
|
const {
|
|
sendEvent,
|
|
PENDING_STALE_MS,
|
|
MCPOAuthHandler,
|
|
isMCPDomainAllowed,
|
|
normalizeServerName,
|
|
normalizeJsonSchema,
|
|
GenerationJobManager,
|
|
resolveJsonSchemaRefs,
|
|
sanitizeGeminiSchema,
|
|
buildMCPAuthStepId,
|
|
buildMCPAuthToolCall,
|
|
processMCPEnv,
|
|
buildMCPAuthRunStepEvent,
|
|
buildMCPAuthRunStepDeltaEvent,
|
|
buildMCPAuthRunStepEndDeltaEvent,
|
|
isUserSourced,
|
|
checkAccessWithRequestCache,
|
|
requiresEphemeralUserConnection,
|
|
containsGraphTokenPlaceholder,
|
|
} = require('@librechat/api');
|
|
const {
|
|
Time,
|
|
CacheKeys,
|
|
Constants,
|
|
Permissions,
|
|
PermissionTypes,
|
|
isAssistantsEndpoint,
|
|
} = require('librechat-data-provider');
|
|
const {
|
|
getOAuthReconnectionManager,
|
|
getMCPServersRegistry,
|
|
getFlowStateManager,
|
|
getMCPManager,
|
|
} = require('~/config');
|
|
const db = require('~/models');
|
|
const { findToken, createToken, updateToken, deleteTokens } = db;
|
|
const { getGraphApiToken } = require('./GraphTokenService');
|
|
const { exchangeOboToken } = require('./OboTokenService');
|
|
const { createOboTrustChecker } = require('./OboPolicyService');
|
|
const { reinitMCPServer } = require('./Tools/mcp');
|
|
const { getAppConfig } = require('./Config');
|
|
const { getLogStores } = require('~/cache');
|
|
|
|
const MAX_CACHE_SIZE = 1000;
|
|
const lastReconnectAttempts = new Map();
|
|
const RECONNECT_THROTTLE_MS = 10_000;
|
|
|
|
const missingToolCache = new Map();
|
|
const MISSING_TOOL_TTL_MS = 10_000;
|
|
|
|
async function userCanUseMCPServers(user, req) {
|
|
if (!user?.id || !user?.role) {
|
|
return false;
|
|
}
|
|
|
|
try {
|
|
return await checkAccessWithRequestCache({
|
|
req,
|
|
user,
|
|
permissionType: PermissionTypes.MCP_SERVERS,
|
|
permissions: [Permissions.USE],
|
|
getRoleByName: db.getRoleByName,
|
|
});
|
|
} catch (error) {
|
|
logger.error(`[MCP][User: ${user.id}] Failed MCP permission check`, error);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
function createMCPPermissionContext(req) {
|
|
return {
|
|
canUseServers: (user = req?.user) => userCanUseMCPServers(user, req),
|
|
};
|
|
}
|
|
|
|
function evictStale(map, ttl) {
|
|
if (map.size <= MAX_CACHE_SIZE) {
|
|
return;
|
|
}
|
|
const now = Date.now();
|
|
for (const [key, timestamp] of map) {
|
|
if (now - timestamp >= ttl) {
|
|
map.delete(key);
|
|
}
|
|
if (map.size <= MAX_CACHE_SIZE) {
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
const unavailableMsg =
|
|
"This tool's MCP server is temporarily unavailable. Please try again shortly.";
|
|
|
|
function getOAuthFlowId(userId, serverName, tenantId = getTenantId()) {
|
|
if (!tenantId) {
|
|
return MCPOAuthHandler.generateFlowId(userId, serverName);
|
|
}
|
|
return MCPOAuthHandler.generateFlowId(userId, serverName, tenantId);
|
|
}
|
|
|
|
async function getAppConfigForRequest(req) {
|
|
const user = req?.user;
|
|
return await getAppConfigForUser(user?.id, user);
|
|
}
|
|
|
|
async function getAppConfigForUser(userId, user) {
|
|
return await getAppConfig({ role: user?.role, tenantId: getTenantId(), userId });
|
|
}
|
|
|
|
/**
|
|
* Resolves config-source MCP servers from admin Config overrides for the current
|
|
* request context. Returns the parsed configs keyed by server name.
|
|
* @param {import('express').Request} req - Express request with user context
|
|
* @returns {Promise<Record<string, import('@librechat/api').ParsedServerConfig>>}
|
|
*/
|
|
async function resolveConfigServers(req) {
|
|
try {
|
|
const registry = getMCPServersRegistry();
|
|
const appConfig = await getAppConfigForRequest(req);
|
|
return await registry.ensureConfigServers(appConfig?.mcpConfig || {});
|
|
} catch (error) {
|
|
logger.warn(
|
|
'[resolveConfigServers] Failed to resolve config servers, degrading to empty:',
|
|
error,
|
|
);
|
|
return {};
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Resolves operator-managed MCP server names from admin Config overrides for the current request.
|
|
* Returns a request-time snapshot for DB server creation, not a cross-process lock.
|
|
* @throws Propagates app config lookup errors to keep DB server creation fail-closed.
|
|
* @param {import('express').Request} req - Express request with user context
|
|
* @returns {Promise<string[]>}
|
|
*/
|
|
async function resolveMcpConfigNames(req) {
|
|
const appConfig = await getAppConfigForRequest(req);
|
|
return Object.keys(appConfig?.mcpConfig || {});
|
|
}
|
|
|
|
/**
|
|
* Resolves config-source servers and merges all server configs (YAML + config + user DB)
|
|
* for the given user context. Shared helper for controllers needing the full merged config.
|
|
* @param {string} userId
|
|
* @param {{ id?: string, role?: string }} [user]
|
|
* @returns {Promise<Record<string, import('@librechat/api').ParsedServerConfig>>}
|
|
*/
|
|
async function resolveAllMcpConfigs(userId, user) {
|
|
const registry = getMCPServersRegistry();
|
|
const appConfig = await getAppConfigForUser(userId, user);
|
|
let configServers = {};
|
|
try {
|
|
configServers = await registry.ensureConfigServers(appConfig?.mcpConfig || {});
|
|
} catch (error) {
|
|
logger.warn(
|
|
'[resolveAllMcpConfigs] Config server resolution failed, continuing without:',
|
|
error,
|
|
);
|
|
}
|
|
if (user?.role) {
|
|
return await registry.getAllServerConfigs(userId, configServers, user.role);
|
|
}
|
|
|
|
return await registry.getAllServerConfigs(userId, configServers);
|
|
}
|
|
|
|
function getServerCustomUserVars(userMCPAuthMap, serverName) {
|
|
return userMCPAuthMap?.[`${Constants.mcp_prefix}${serverName}`];
|
|
}
|
|
|
|
/**
|
|
* Best-effort early gate; the authoritative check is
|
|
* `assertResolvedRuntimeConfigAllowed` in `@librechat/api`, whose resolution
|
|
* this must mirror. Graph placeholders resolve later (async), so a URL still
|
|
* carrying one defers to the authoritative check instead of rejecting here.
|
|
*/
|
|
async function isEarlyDomainAllowed({
|
|
serverConfig,
|
|
user,
|
|
requestBody,
|
|
userMCPAuthMap,
|
|
serverName,
|
|
allowedDomains,
|
|
allowedAddresses,
|
|
}) {
|
|
const validationConfig = processMCPEnv({
|
|
user,
|
|
body: requestBody,
|
|
dbSourced: isUserSourced(serverConfig),
|
|
options: serverConfig,
|
|
customUserVars: getServerCustomUserVars(userMCPAuthMap, serverName),
|
|
});
|
|
if (
|
|
typeof validationConfig?.url === 'string' &&
|
|
containsGraphTokenPlaceholder(validationConfig.url)
|
|
) {
|
|
return true;
|
|
}
|
|
return await isMCPDomainAllowed(validationConfig, allowedDomains, allowedAddresses);
|
|
}
|
|
|
|
/**
|
|
* @param {string} toolName
|
|
* @param {string} serverName
|
|
*/
|
|
function createUnavailableToolStub(toolName, serverName) {
|
|
const normalizedToolKey = `${toolName}${Constants.mcp_delimiter}${normalizeServerName(serverName)}`;
|
|
const _call = async () => [unavailableMsg, null];
|
|
const toolInstance = tool(_call, {
|
|
schema: {
|
|
type: 'object',
|
|
properties: {
|
|
input: { type: 'string', description: 'Input for the tool' },
|
|
},
|
|
required: [],
|
|
},
|
|
name: normalizedToolKey,
|
|
description: unavailableMsg,
|
|
responseFormat: AgentConstants.CONTENT_AND_ARTIFACT,
|
|
});
|
|
toolInstance.mcp = true;
|
|
toolInstance.mcpRawServerName = serverName;
|
|
return toolInstance;
|
|
}
|
|
|
|
function isEmptyObjectSchema(jsonSchema) {
|
|
return (
|
|
jsonSchema != null &&
|
|
typeof jsonSchema === 'object' &&
|
|
jsonSchema.type === 'object' &&
|
|
(jsonSchema.properties == null || Object.keys(jsonSchema.properties).length === 0) &&
|
|
!jsonSchema.additionalProperties
|
|
);
|
|
}
|
|
|
|
/**
|
|
* @param {object} params
|
|
* @param {ServerResponse} params.res - The Express response object for sending events.
|
|
* @param {string} params.stepId - The ID of the step in the flow.
|
|
* @param {ToolCallChunk} params.toolCall - The tool call object containing tool information.
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
|
|
*/
|
|
function createRunStepDeltaEmitter({ res, stepId, toolCall, streamId = null }) {
|
|
/**
|
|
* @param {string} authURL - The URL to redirect the user for OAuth authentication.
|
|
* @param {{ expiresAt?: number }} [options]
|
|
* @returns {Promise<void>}
|
|
*/
|
|
return async function (authURL, options) {
|
|
const eventData = buildMCPAuthRunStepDeltaEvent({ authURL, stepId, toolCall, options });
|
|
if (streamId) {
|
|
await GenerationJobManager.emitChunk(streamId, eventData);
|
|
} else {
|
|
sendEvent(res, eventData);
|
|
}
|
|
};
|
|
}
|
|
|
|
/**
|
|
* @param {object} params
|
|
* @param {ServerResponse} params.res - The Express response object for sending events.
|
|
* @param {string} params.runId - The Run ID, i.e. message ID
|
|
* @param {string} params.stepId - The ID of the step in the flow.
|
|
* @param {ToolCallChunk} params.toolCall - The tool call object containing tool information.
|
|
* @param {number} [params.index]
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
|
|
* @returns {() => Promise<void>}
|
|
*/
|
|
function createRunStepEmitter({ res, runId, stepId, toolCall, index, streamId = null }) {
|
|
return async function () {
|
|
const eventData = buildMCPAuthRunStepEvent({ runId, stepId, toolCall, index });
|
|
if (streamId) {
|
|
await GenerationJobManager.emitChunk(streamId, eventData);
|
|
} else {
|
|
sendEvent(res, eventData);
|
|
}
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Creates a function used to ensure the flow handler is only invoked once
|
|
* @param {object} params
|
|
* @param {string} params.flowId - The ID of the login flow.
|
|
* @param {FlowStateManager<any>} params.flowManager - The flow manager instance.
|
|
* @param {(authURL: string, options?: { expiresAt?: number }) => void | Promise<void>} [params.callback]
|
|
*/
|
|
function createOAuthStart({ flowId, flowManager, callback }) {
|
|
/**
|
|
* Creates a function to handle OAuth login requests.
|
|
* @param {string} authURL - The URL to redirect the user for OAuth authentication.
|
|
* @param {{ expiresAt?: number }} [options]
|
|
* @returns {Promise<boolean>} Returns true to indicate the event was sent successfully.
|
|
*/
|
|
return async function (authURL, options) {
|
|
let emitted = false;
|
|
const emitOAuthStart = async (message) => {
|
|
if (options) {
|
|
await callback?.(authURL, options);
|
|
} else {
|
|
await callback?.(authURL);
|
|
}
|
|
emitted = true;
|
|
logger.debug(message);
|
|
};
|
|
|
|
const existingFlow = await flowManager.getFlowState(flowId, 'oauth_login');
|
|
if (existingFlow) {
|
|
await emitOAuthStart('Re-sent OAuth login request to client');
|
|
return true;
|
|
}
|
|
|
|
await flowManager.createFlowWithHandler(flowId, 'oauth_login', async () => {
|
|
await emitOAuthStart('Sent OAuth login request to client');
|
|
return true;
|
|
});
|
|
|
|
if (!emitted) {
|
|
await emitOAuthStart('Re-sent OAuth login request to client');
|
|
}
|
|
|
|
return true;
|
|
};
|
|
}
|
|
|
|
/**
|
|
* @param {object} params
|
|
* @param {ServerResponse} params.res - The Express response object for sending events.
|
|
* @param {string} params.stepId - The ID of the step in the flow.
|
|
* @param {ToolCallChunk} params.toolCall - The tool call object containing tool information.
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
|
|
*/
|
|
function createOAuthEnd({ res, stepId, toolCall, streamId = null }) {
|
|
return async function () {
|
|
const eventData = buildMCPAuthRunStepEndDeltaEvent({ stepId, toolCall });
|
|
if (streamId) {
|
|
await GenerationJobManager.emitChunk(streamId, eventData);
|
|
} else {
|
|
sendEvent(res, eventData);
|
|
}
|
|
logger.debug('Sent OAuth login success to client');
|
|
};
|
|
}
|
|
|
|
/**
|
|
* @param {object} params
|
|
* @param {string} params.userId - The ID of the user.
|
|
* @param {string} params.serverName - The name of the server.
|
|
* @param {string} params.toolName - The name of the tool.
|
|
* @param {string} [params.tenantId] - The tenant ID for the current request.
|
|
* @param {FlowStateManager<any>} params.flowManager - The flow manager instance.
|
|
*/
|
|
function createAbortHandler({ userId, serverName, toolName, tenantId, flowManager }) {
|
|
return function () {
|
|
logger.info(`[MCP][User: ${userId}][${serverName}][${toolName}] Tool call aborted`);
|
|
const flowId = getOAuthFlowId(userId, serverName, tenantId);
|
|
// Clean up both mcp_oauth and mcp_get_tokens flows
|
|
flowManager.failFlow(flowId, 'mcp_oauth', new Error('Tool call aborted'));
|
|
flowManager.failFlow(flowId, 'mcp_get_tokens', new Error('Tool call aborted'));
|
|
};
|
|
}
|
|
|
|
/**
|
|
* @param {Object} params
|
|
* @param {() => Promise<void>} params.runStepEmitter
|
|
* @param {(authURL: string, options?: { expiresAt?: number }) => Promise<void>} params.runStepDeltaEmitter
|
|
* @returns {(authURL: string, options?: { expiresAt?: number }) => Promise<void>}
|
|
*/
|
|
function createOAuthCallback({ runStepEmitter, runStepDeltaEmitter }) {
|
|
return async function (authURL, options) {
|
|
await runStepEmitter();
|
|
await runStepDeltaEmitter(authURL, options);
|
|
};
|
|
}
|
|
|
|
/**
|
|
* @param {Object} params
|
|
* @param {ServerResponse} params.res - The Express response object for sending events.
|
|
* @param {IUser} params.user - The user from the request object.
|
|
* @param {string} params.serverName
|
|
* @param {AbortSignal} params.signal
|
|
* @param {string} params.model
|
|
* @param {number} [params.index]
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
|
|
* @param {Record<string, Record<string, string>>} [params.userMCPAuthMap]
|
|
* @param {import('@librechat/api').RequestScopedMCPConnectionStore} [params.requestScopedConnections]
|
|
* @param {import('@librechat/api').ParsedServerConfig} [params.serverConfig] - Used to bypass reconnect throttling for request-scoped servers.
|
|
* @returns { Promise<Array<typeof tool | { _call: (toolInput: Object | string) => unknown}>> } An object with `_call` method to execute the tool input.
|
|
*/
|
|
async function reconnectServer({
|
|
res,
|
|
user,
|
|
index,
|
|
signal,
|
|
serverName,
|
|
serverConfig,
|
|
configServers,
|
|
userMCPAuthMap,
|
|
requestBody,
|
|
requestScopedConnections,
|
|
streamId = null,
|
|
}) {
|
|
logger.debug(
|
|
`[MCP][reconnectServer] serverName: ${serverName}, user: ${user?.id}, hasUserMCPAuthMap: ${!!userMCPAuthMap}`,
|
|
);
|
|
|
|
// Request-scoped servers reconnect on every message by design; throttling them
|
|
// would stub out healthy tools for messages sent within the throttle window.
|
|
const requestScoped = serverConfig ? requiresEphemeralUserConnection(serverConfig) : false;
|
|
if (!requestScoped) {
|
|
const throttleKey = `${user.id}:${serverName}`;
|
|
const now = Date.now();
|
|
const lastAttempt = lastReconnectAttempts.get(throttleKey) ?? 0;
|
|
if (now - lastAttempt < RECONNECT_THROTTLE_MS) {
|
|
logger.debug(`[MCP][reconnectServer] Throttled reconnect for ${serverName}`);
|
|
return null;
|
|
}
|
|
lastReconnectAttempts.set(throttleKey, now);
|
|
evictStale(lastReconnectAttempts, RECONNECT_THROTTLE_MS);
|
|
}
|
|
|
|
const runId = Constants.USE_PRELIM_RESPONSE_MESSAGE_ID;
|
|
const flowId = `${user.id}:${serverName}:${Date.now()}`;
|
|
const flowManager = getFlowStateManager(getLogStores(CacheKeys.FLOWS));
|
|
const stepId = buildMCPAuthStepId(serverName);
|
|
const toolCall = buildMCPAuthToolCall({
|
|
id: flowId,
|
|
serverName,
|
|
});
|
|
|
|
// Set up abort handler to clean up OAuth flows if request is aborted
|
|
const tenantId = user?.tenantId ?? getTenantId();
|
|
const oauthFlowId = getOAuthFlowId(user.id, serverName, tenantId);
|
|
const abortHandler = () => {
|
|
logger.info(
|
|
`[MCP][User: ${user.id}][${serverName}] Tool loading aborted, cleaning up OAuth flows`,
|
|
);
|
|
// Clean up both mcp_oauth and mcp_get_tokens flows
|
|
flowManager.failFlow(oauthFlowId, 'mcp_oauth', new Error('Tool loading aborted'));
|
|
flowManager.failFlow(oauthFlowId, 'mcp_get_tokens', new Error('Tool loading aborted'));
|
|
};
|
|
|
|
if (signal) {
|
|
signal.addEventListener('abort', abortHandler, { once: true });
|
|
}
|
|
|
|
try {
|
|
const runStepEmitter = createRunStepEmitter({
|
|
res,
|
|
index,
|
|
runId,
|
|
stepId,
|
|
toolCall,
|
|
streamId,
|
|
});
|
|
const runStepDeltaEmitter = createRunStepDeltaEmitter({
|
|
res,
|
|
stepId,
|
|
toolCall,
|
|
streamId,
|
|
});
|
|
const callback = createOAuthCallback({ runStepEmitter, runStepDeltaEmitter });
|
|
const oauthStart = createOAuthStart({
|
|
res,
|
|
flowId,
|
|
callback,
|
|
flowManager,
|
|
});
|
|
return await reinitMCPServer({
|
|
user,
|
|
signal,
|
|
serverName,
|
|
configServers,
|
|
oauthStart,
|
|
flowManager,
|
|
userMCPAuthMap,
|
|
requestBody,
|
|
requestScopedConnections,
|
|
forceNew: true,
|
|
returnOnOAuth: false,
|
|
connectionTimeout: Time.THIRTY_SECONDS,
|
|
});
|
|
} finally {
|
|
// Clean up abort handler to prevent memory leaks
|
|
if (signal) {
|
|
signal.removeEventListener('abort', abortHandler);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Creates all tools from the specified MCP Server via `toolKey`.
|
|
*
|
|
* This function assumes tools could not be aggregated from the cache of tool definitions,
|
|
* i.e. `availableTools`, and will reinitialize the MCP server to ensure all tools are generated.
|
|
*
|
|
* @param {Object} params
|
|
* @param {ServerResponse} params.res - The Express response object for sending events.
|
|
* @param {{ canUseServers: (user?: IUser) => Promise<boolean> }} [params.mcpPermissionContext] - Request-scoped MCP permission context.
|
|
* @param {IUser} params.user - The user from the request object.
|
|
* @param {string} params.serverName
|
|
* @param {string} params.model
|
|
* @param {Providers | EModelEndpoint} params.provider - The provider for the tool.
|
|
* @param {number} [params.index]
|
|
* @param {AbortSignal} [params.signal]
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
|
|
* @param {import('@librechat/api').ParsedServerConfig} [params.config]
|
|
* @param {import('@librechat/api').RequestBody} [params.requestBody]
|
|
* @param {import('@librechat/api').RequestScopedMCPConnectionStore} [params.requestScopedConnections]
|
|
* @param {Record<string, Record<string, string>>} [params.userMCPAuthMap]
|
|
* @returns { Promise<Array<typeof tool | { _call: (toolInput: Object | string) => unknown}>> } An object with `_call` method to execute the tool input.
|
|
*/
|
|
async function createMCPTools({
|
|
res,
|
|
mcpPermissionContext,
|
|
user,
|
|
index,
|
|
signal,
|
|
config,
|
|
provider,
|
|
serverName,
|
|
configServers,
|
|
userMCPAuthMap,
|
|
requestBody,
|
|
requestScopedConnections,
|
|
streamId = null,
|
|
}) {
|
|
const serverConfig =
|
|
config ?? (await getMCPServersRegistry().getServerConfig(serverName, user?.id, configServers));
|
|
|
|
if (serverConfig?.url) {
|
|
const appConfig = await getAppConfig({
|
|
role: user?.role,
|
|
tenantId: user?.tenantId,
|
|
userId: user?.id,
|
|
});
|
|
const allowedDomains = appConfig?.mcpSettings?.allowedDomains;
|
|
const allowedAddresses = appConfig?.mcpSettings?.allowedAddresses;
|
|
const isDomainAllowed = await isEarlyDomainAllowed({
|
|
serverConfig,
|
|
user,
|
|
requestBody,
|
|
userMCPAuthMap,
|
|
serverName,
|
|
allowedDomains,
|
|
allowedAddresses,
|
|
});
|
|
if (!isDomainAllowed) {
|
|
logger.warn(`[MCP][${serverName}] Domain not allowed, skipping all tools`);
|
|
return [];
|
|
}
|
|
}
|
|
|
|
const result = await reconnectServer({
|
|
res,
|
|
user,
|
|
index,
|
|
signal,
|
|
serverName,
|
|
serverConfig,
|
|
configServers,
|
|
userMCPAuthMap,
|
|
requestBody,
|
|
requestScopedConnections,
|
|
streamId,
|
|
});
|
|
if (result === null) {
|
|
logger.debug(`[MCP][${serverName}] Reconnect throttled, skipping tool creation.`);
|
|
return [];
|
|
}
|
|
if (!result || !result.tools) {
|
|
logger.warn(`[MCP][${serverName}] Failed to reinitialize MCP server.`);
|
|
return [];
|
|
}
|
|
|
|
const serverTools = [];
|
|
for (const tool of result.tools) {
|
|
const toolInstance = await createMCPTool({
|
|
res,
|
|
mcpPermissionContext,
|
|
user,
|
|
provider,
|
|
userMCPAuthMap,
|
|
configServers,
|
|
streamId,
|
|
availableTools: result.availableTools,
|
|
toolKey: `${tool.name}${Constants.mcp_delimiter}${serverName}`,
|
|
requestBody,
|
|
requestScopedConnections,
|
|
config: serverConfig,
|
|
});
|
|
if (toolInstance) {
|
|
serverTools.push(toolInstance);
|
|
}
|
|
}
|
|
|
|
return serverTools;
|
|
}
|
|
|
|
/**
|
|
* Creates a single tool from the specified MCP Server via `toolKey`.
|
|
* @param {Object} params
|
|
* @param {ServerResponse} params.res - The Express response object for sending events.
|
|
* @param {{ canUseServers: (user?: IUser) => Promise<boolean> }} [params.mcpPermissionContext] - Request-scoped MCP permission context.
|
|
* @param {IUser} params.user - The user from the request object.
|
|
* @param {string} params.toolKey - The toolKey for the tool.
|
|
* @param {string} params.model - The model for the tool.
|
|
* @param {number} [params.index]
|
|
* @param {AbortSignal} [params.signal]
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
|
|
* @param {Providers | EModelEndpoint} params.provider - The provider for the tool.
|
|
* @param {LCAvailableTools} [params.availableTools]
|
|
* @param {import('@librechat/api').RequestBody} [params.requestBody]
|
|
* @param {import('@librechat/api').RequestScopedMCPConnectionStore} [params.requestScopedConnections]
|
|
* @param {Record<string, Record<string, string>>} [params.userMCPAuthMap]
|
|
* @param {import('@librechat/api').ParsedServerConfig} [params.config]
|
|
* @param {(availableTools: LCAvailableTools) => void} [params.onAvailableTools]
|
|
* @returns { Promise<typeof tool | { _call: (toolInput: Object | string) => unknown}> } An object with `_call` method to execute the tool input.
|
|
*/
|
|
async function createMCPTool({
|
|
res,
|
|
mcpPermissionContext,
|
|
user,
|
|
index,
|
|
signal,
|
|
toolKey,
|
|
provider,
|
|
userMCPAuthMap,
|
|
availableTools,
|
|
requestBody,
|
|
requestScopedConnections,
|
|
config,
|
|
configServers,
|
|
onAvailableTools,
|
|
streamId = null,
|
|
}) {
|
|
const [toolName, serverName] = toolKey.split(Constants.mcp_delimiter);
|
|
|
|
const serverConfig =
|
|
config ?? (await getMCPServersRegistry().getServerConfig(serverName, user?.id, configServers));
|
|
const requestScopedTools = serverConfig ? requiresEphemeralUserConnection(serverConfig) : false;
|
|
const useMissingToolCache = !requestScopedTools;
|
|
|
|
if (serverConfig?.url) {
|
|
const appConfig = await getAppConfig({
|
|
role: user?.role,
|
|
tenantId: user?.tenantId,
|
|
userId: user?.id,
|
|
});
|
|
const allowedDomains = appConfig?.mcpSettings?.allowedDomains;
|
|
const allowedAddresses = appConfig?.mcpSettings?.allowedAddresses;
|
|
const isDomainAllowed = await isEarlyDomainAllowed({
|
|
serverConfig,
|
|
user,
|
|
requestBody,
|
|
userMCPAuthMap,
|
|
serverName,
|
|
allowedDomains,
|
|
allowedAddresses,
|
|
});
|
|
if (!isDomainAllowed) {
|
|
logger.warn(`[MCP][${serverName}] Domain no longer allowed, skipping tool: ${toolName}`);
|
|
return undefined;
|
|
}
|
|
}
|
|
|
|
/** @type {LCTool | undefined} */
|
|
let toolDefinition = availableTools?.[toolKey]?.function;
|
|
if (!toolDefinition) {
|
|
const cachedAt = useMissingToolCache ? missingToolCache.get(toolKey) : undefined;
|
|
if (cachedAt && Date.now() - cachedAt < MISSING_TOOL_TTL_MS) {
|
|
logger.debug(
|
|
`[MCP][${serverName}][${toolName}] Tool in negative cache, returning unavailable stub.`,
|
|
);
|
|
return createUnavailableToolStub(toolName, serverName);
|
|
}
|
|
|
|
logger.warn(
|
|
`[MCP][${serverName}][${toolName}] Requested tool not found in available tools, re-initializing MCP server.`,
|
|
);
|
|
const result = await reconnectServer({
|
|
res,
|
|
user,
|
|
index,
|
|
signal,
|
|
serverName,
|
|
serverConfig,
|
|
configServers,
|
|
userMCPAuthMap,
|
|
requestBody,
|
|
requestScopedConnections,
|
|
streamId,
|
|
});
|
|
if (result?.availableTools) {
|
|
onAvailableTools?.(result.availableTools);
|
|
}
|
|
toolDefinition = result?.availableTools?.[toolKey]?.function;
|
|
|
|
if (!toolDefinition && useMissingToolCache) {
|
|
missingToolCache.set(toolKey, Date.now());
|
|
evictStale(missingToolCache, MISSING_TOOL_TTL_MS);
|
|
}
|
|
}
|
|
|
|
if (!toolDefinition) {
|
|
logger.warn(
|
|
`[MCP][${serverName}][${toolName}] Tool definition not found, returning unavailable stub.`,
|
|
);
|
|
return createUnavailableToolStub(toolName, serverName);
|
|
}
|
|
|
|
return createToolInstance({
|
|
res,
|
|
mcpPermissionContext,
|
|
user,
|
|
requestBody,
|
|
requestScopedConnections,
|
|
provider,
|
|
toolName,
|
|
serverName,
|
|
serverConfig,
|
|
toolDefinition,
|
|
streamId,
|
|
});
|
|
}
|
|
|
|
function createToolInstance({
|
|
res,
|
|
mcpPermissionContext,
|
|
user: capturedUser = null,
|
|
requestBody: capturedRequestBody,
|
|
requestScopedConnections: capturedRequestScopedConnections,
|
|
toolName,
|
|
serverName,
|
|
serverConfig: capturedServerConfig,
|
|
toolDefinition,
|
|
provider: capturedProvider,
|
|
streamId = null,
|
|
}) {
|
|
/** @type {LCTool} */
|
|
const { description, parameters } = toolDefinition;
|
|
const isGoogle = capturedProvider === Providers.VERTEXAI || capturedProvider === Providers.GOOGLE;
|
|
|
|
let schema = parameters ? normalizeJsonSchema(resolveJsonSchemaRefs(parameters)) : null;
|
|
|
|
if (schema && isGoogle) {
|
|
// Gemini/Vertex AI accept only a subset of JSON Schema; sanitize so MCP tools with
|
|
// unions, non-string enums, etc. don't 400 (they work as-is on OpenAI/Claude).
|
|
schema = sanitizeGeminiSchema(schema);
|
|
}
|
|
|
|
if (!schema || (isGoogle && isEmptyObjectSchema(schema))) {
|
|
schema = {
|
|
type: 'object',
|
|
properties: {
|
|
input: { type: 'string', description: 'Input for the tool' },
|
|
},
|
|
required: [],
|
|
};
|
|
}
|
|
|
|
const normalizedToolKey = `${toolName}${Constants.mcp_delimiter}${normalizeServerName(serverName)}`;
|
|
|
|
/** @type {(toolArguments: Object | string, config?: GraphRunnableConfig) => Promise<unknown>} */
|
|
const _call = async (toolArguments, config) => {
|
|
const effectiveUser = config?.configurable?.user ?? capturedUser;
|
|
const permissionUser = effectiveUser;
|
|
const userId = effectiveUser?.id || config?.configurable?.user_id || capturedUser?.id;
|
|
/** @type {ReturnType<typeof createAbortHandler>} */
|
|
let abortHandler = null;
|
|
/** @type {AbortSignal} */
|
|
let derivedSignal = null;
|
|
|
|
try {
|
|
const provider = (config?.metadata?.provider || capturedProvider)?.toLowerCase();
|
|
const canUseMCP = mcpPermissionContext
|
|
? await mcpPermissionContext.canUseServers(permissionUser)
|
|
: await userCanUseMCPServers(permissionUser);
|
|
if (!canUseMCP) {
|
|
throw new Error('Forbidden: Insufficient MCP server permissions');
|
|
}
|
|
const flowsCache = getLogStores(CacheKeys.FLOWS);
|
|
const flowManager = getFlowStateManager(flowsCache);
|
|
derivedSignal = config?.signal ? AbortSignal.any([config.signal]) : undefined;
|
|
const mcpManager = getMCPManager(userId);
|
|
|
|
const { args: _args, stepId, ...toolCall } = config.toolCall ?? {};
|
|
const flowId = `${serverName}:oauth_login:${config.metadata.thread_id}:${config.metadata.run_id}`;
|
|
const runStepDeltaEmitter = createRunStepDeltaEmitter({
|
|
res,
|
|
stepId,
|
|
toolCall,
|
|
streamId,
|
|
});
|
|
const oauthStart = createOAuthStart({
|
|
flowId,
|
|
flowManager,
|
|
callback: runStepDeltaEmitter,
|
|
});
|
|
const oauthEnd = createOAuthEnd({
|
|
res,
|
|
stepId,
|
|
toolCall,
|
|
streamId,
|
|
});
|
|
|
|
if (derivedSignal) {
|
|
const tenantId = config?.configurable?.user?.tenantId ?? getTenantId();
|
|
abortHandler = createAbortHandler({ userId, serverName, toolName, tenantId, flowManager });
|
|
derivedSignal.addEventListener('abort', abortHandler, { once: true });
|
|
}
|
|
|
|
const customUserVars =
|
|
config?.configurable?.userMCPAuthMap?.[`${Constants.mcp_prefix}${serverName}`];
|
|
|
|
const result = await mcpManager.callTool({
|
|
serverName,
|
|
serverConfig: capturedServerConfig,
|
|
toolName,
|
|
provider,
|
|
toolArguments,
|
|
options: {
|
|
signal: derivedSignal,
|
|
},
|
|
user: effectiveUser,
|
|
requestBody: config?.configurable?.requestBody ?? capturedRequestBody,
|
|
requestScopedConnections:
|
|
config?.configurable?.requestScopedConnections ?? capturedRequestScopedConnections,
|
|
customUserVars,
|
|
flowManager,
|
|
tokenMethods: {
|
|
findToken,
|
|
createToken,
|
|
updateToken,
|
|
deleteTokens,
|
|
},
|
|
oauthStart,
|
|
oauthEnd,
|
|
graphTokenResolver: getGraphApiToken,
|
|
oboTokenResolver: exchangeOboToken,
|
|
oboTrustChecker: createOboTrustChecker(),
|
|
});
|
|
|
|
if (isAssistantsEndpoint(provider) && Array.isArray(result)) {
|
|
return result[0];
|
|
}
|
|
return result;
|
|
} catch (error) {
|
|
logger.error(
|
|
`[MCP][${serverName}][${toolName}][User: ${userId}] Error calling MCP tool:`,
|
|
error,
|
|
);
|
|
|
|
/** OAuth error, provide a helpful message */
|
|
const isOAuthError =
|
|
error.message?.includes('401') ||
|
|
error.message?.includes('OAuth') ||
|
|
error.message?.includes('authentication') ||
|
|
error.message?.includes('Non-200 status code (401)');
|
|
|
|
if (isOAuthError) {
|
|
throw new Error(
|
|
`[MCP][${serverName}][${toolName}] OAuth authentication required. Please check the server logs for the authentication URL.`,
|
|
);
|
|
}
|
|
|
|
throw new Error(
|
|
`[MCP][${serverName}][${toolName}] tool call failed${error?.message ? `: ${error?.message}` : '.'}`,
|
|
);
|
|
} finally {
|
|
// Clean up abort handler to prevent memory leaks
|
|
if (abortHandler && derivedSignal) {
|
|
derivedSignal.removeEventListener('abort', abortHandler);
|
|
}
|
|
}
|
|
};
|
|
|
|
const toolInstance = tool(_call, {
|
|
schema,
|
|
name: normalizedToolKey,
|
|
description: description || '',
|
|
responseFormat: AgentConstants.CONTENT_AND_ARTIFACT,
|
|
});
|
|
toolInstance.mcp = true;
|
|
toolInstance.mcpRawServerName = serverName;
|
|
// On Google/Vertex, propagate the union-flattened schema so definitions extracted
|
|
// from this instance don't reach the Gemini converter with unsupported unions.
|
|
toolInstance.mcpJsonSchema = isGoogle ? schema : parameters;
|
|
return toolInstance;
|
|
}
|
|
|
|
/**
|
|
* Get MCP setup data including config, connections, and OAuth servers.
|
|
* Resolves config-source servers from admin Config overrides when tenant context is available.
|
|
* @param {string} userId - The user ID
|
|
* @param {{ role?: string, tenantId?: string }} [options] - Optional role/tenant context
|
|
* @returns {Object} Object containing mcpConfig, appConnections, userConnections, and oauthServers
|
|
*/
|
|
async function getMCPSetupData(userId, options = {}) {
|
|
const registry = getMCPServersRegistry();
|
|
const { role, tenantId } = options;
|
|
|
|
const appConfig = await getAppConfig({ role, tenantId, userId });
|
|
const configServers = await registry.ensureConfigServers(appConfig?.mcpConfig || {});
|
|
const mcpConfig = role
|
|
? await registry.getAllServerConfigs(userId, configServers, role)
|
|
: await registry.getAllServerConfigs(userId, configServers);
|
|
const mcpManager = getMCPManager(userId);
|
|
/** @type {Map<string, import('@librechat/api').MCPConnection>} */
|
|
let appConnections = new Map();
|
|
try {
|
|
// Use getLoaded() instead of getAll() to avoid forcing connection creation.
|
|
// getAll() creates connections for all servers, which is problematic for servers
|
|
// that require user context (e.g., those with {{LIBRECHAT_USER_ID}} placeholders).
|
|
appConnections = (await mcpManager.appConnections?.getLoaded()) || new Map();
|
|
} catch (error) {
|
|
logger.error(`[MCP][User: ${userId}] Error getting app connections:`, error);
|
|
}
|
|
const userConnections = mcpManager.getUserConnections(userId) || new Map();
|
|
const oauthServers = new Set(
|
|
Object.entries(mcpConfig)
|
|
.filter(([, config]) => config.requiresOAuth)
|
|
.map(([name]) => name),
|
|
);
|
|
|
|
return {
|
|
mcpConfig,
|
|
oauthServers,
|
|
appConnections,
|
|
userConnections,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Check OAuth flow status for a user and server
|
|
* @param {string} userId - The user ID
|
|
* @param {string} serverName - The server name
|
|
* @param {string} [tenantId] - The tenant ID for the current request.
|
|
* @returns {Object} Object containing hasActiveFlow and hasFailedFlow flags
|
|
*/
|
|
async function checkOAuthFlowStatus(userId, serverName, tenantId = getTenantId()) {
|
|
const flowsCache = getLogStores(CacheKeys.FLOWS);
|
|
const flowManager = getFlowStateManager(flowsCache);
|
|
const flowId = getOAuthFlowId(userId, serverName, tenantId);
|
|
|
|
try {
|
|
const flowState = await flowManager.getFlowState(flowId, 'mcp_oauth');
|
|
if (!flowState) {
|
|
return { hasActiveFlow: false, hasFailedFlow: false };
|
|
}
|
|
|
|
const flowAge = Date.now() - flowState.createdAt;
|
|
// Report active only while the flow is still usable (the handling/reuse window),
|
|
// not for the full Keyv retention TTL — otherwise the UI shows "connecting" for a
|
|
// flow the initiate/callback paths already reject, hiding the connect button.
|
|
const flowTTL = flowState.ttl || PENDING_STALE_MS;
|
|
|
|
if (flowState.status === 'FAILED' || flowAge > flowTTL) {
|
|
const wasCancelled = flowState.error && flowState.error.includes('cancelled');
|
|
|
|
if (wasCancelled) {
|
|
logger.debug(`[MCP Connection Status] Found cancelled OAuth flow for ${serverName}`, {
|
|
flowId,
|
|
status: flowState.status,
|
|
error: flowState.error,
|
|
});
|
|
return { hasActiveFlow: false, hasFailedFlow: false };
|
|
} else {
|
|
logger.debug(`[MCP Connection Status] Found failed OAuth flow for ${serverName}`, {
|
|
flowId,
|
|
status: flowState.status,
|
|
flowAge,
|
|
flowTTL,
|
|
timedOut: flowAge > flowTTL,
|
|
error: flowState.error,
|
|
});
|
|
return { hasActiveFlow: false, hasFailedFlow: true };
|
|
}
|
|
}
|
|
|
|
if (flowState.status === 'PENDING') {
|
|
logger.debug(`[MCP Connection Status] Found active OAuth flow for ${serverName}`, {
|
|
flowId,
|
|
flowAge,
|
|
flowTTL,
|
|
});
|
|
return { hasActiveFlow: true, hasFailedFlow: false };
|
|
}
|
|
|
|
return { hasActiveFlow: false, hasFailedFlow: false };
|
|
} catch (error) {
|
|
logger.error(`[MCP Connection Status] Error checking OAuth flows for ${serverName}:`, error);
|
|
return { hasActiveFlow: false, hasFailedFlow: false };
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get connection status for a specific MCP server
|
|
* @param {string} userId - The user ID
|
|
* @param {string} serverName - The server name
|
|
* @param {import('@librechat/api').ParsedServerConfig} config - The server configuration
|
|
* @param {Map<string, import('@librechat/api').MCPConnection>} appConnections - App-level connections
|
|
* @param {Map<string, import('@librechat/api').MCPConnection>} userConnections - User-level connections
|
|
* @param {Set} oauthServers - Set of OAuth servers
|
|
* @returns {Object} Object containing requiresOAuth and connectionState
|
|
*/
|
|
async function getServerConnectionStatus(
|
|
userId,
|
|
serverName,
|
|
config,
|
|
appConnections,
|
|
userConnections,
|
|
oauthServers,
|
|
) {
|
|
const connection = appConnections.get(serverName) || userConnections.get(serverName);
|
|
const isStaleOrDoNotExist = connection ? connection?.isStale(config.updatedAt) : true;
|
|
|
|
const baseConnectionState = isStaleOrDoNotExist
|
|
? 'disconnected'
|
|
: connection?.connectionState || 'disconnected';
|
|
let finalConnectionState = baseConnectionState;
|
|
|
|
// connection state overrides specific to OAuth servers
|
|
if (baseConnectionState === 'disconnected' && oauthServers.has(serverName)) {
|
|
// check if server is actively being reconnected
|
|
const oauthReconnectionManager = getOAuthReconnectionManager();
|
|
if (oauthReconnectionManager.isReconnecting(userId, serverName)) {
|
|
finalConnectionState = 'connecting';
|
|
} else {
|
|
const { hasActiveFlow, hasFailedFlow } = await checkOAuthFlowStatus(userId, serverName);
|
|
|
|
if (hasFailedFlow) {
|
|
finalConnectionState = 'error';
|
|
} else if (hasActiveFlow) {
|
|
finalConnectionState = 'connecting';
|
|
}
|
|
}
|
|
}
|
|
|
|
return {
|
|
requiresOAuth: oauthServers.has(serverName),
|
|
connectionState: finalConnectionState,
|
|
};
|
|
}
|
|
|
|
module.exports = {
|
|
createMCPTool,
|
|
createMCPTools,
|
|
createMCPPermissionContext,
|
|
userCanUseMCPServers,
|
|
getMCPSetupData,
|
|
resolveConfigServers,
|
|
resolveMcpConfigNames,
|
|
resolveAllMcpConfigs,
|
|
createOAuthStart,
|
|
checkOAuthFlowStatus,
|
|
getServerConnectionStatus,
|
|
createUnavailableToolStub,
|
|
};
|