Skip to content

Commit 0fe38c0

Browse files
authored
feat: add sse server implementation for mcp client (#290)
1 parent de6e2cd commit 0fe38c0

File tree

9 files changed

+296
-2
lines changed

9 files changed

+296
-2
lines changed

.changeset/two-bats-judge.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@openai/agents-core': patch
3+
---
4+
5+
feat: add sse server implementation for mcp

examples/mcp/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
"start:hosted-mcp-on-approval": "tsx hosted-mcp-on-approval.ts",
1414
"start:hosted-mcp-human-in-the-loop": "tsx hosted-mcp-human-in-the-loop.ts",
1515
"start:hosted-mcp-simple": "tsx hosted-mcp-simple.ts",
16-
"start:tool-filter": "tsx tool-filter-example.ts"
16+
"start:tool-filter": "tsx tool-filter-example.ts",
17+
"start:sse": "tsx sse-example.ts"
1718
}
1819
}

examples/mcp/sse-example.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import { Agent, run, MCPServerSSE, withTrace } from '@openai/agents';
2+
3+
async function main() {
4+
const mcpServer = new MCPServerSSE({
5+
url: 'https://gitmcp.io/openai/codex',
6+
name: 'SSE MCP Server',
7+
});
8+
9+
const agent = new Agent({
10+
name: 'SSE Assistant',
11+
instructions: 'Use the tools to respond to user requests.',
12+
mcpServers: [mcpServer],
13+
});
14+
15+
try {
16+
await withTrace('SSE MCP Server Example', async () => {
17+
await mcpServer.connect();
18+
const result = await run(
19+
agent,
20+
'Please help me with the available tools.',
21+
);
22+
console.log(result.finalOutput);
23+
});
24+
} finally {
25+
await mcpServer.close();
26+
}
27+
}
28+
29+
main().catch((err) => {
30+
console.error(err);
31+
process.exit(1);
32+
});

packages/agents-core/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ export {
7373
MCPServer,
7474
MCPServerStdio,
7575
MCPServerStreamableHttp,
76+
MCPServerSSE,
7677
} from './mcp';
7778
export {
7879
MCPToolFilterCallable,

packages/agents-core/src/mcp.ts

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { UserError } from './errors';
33
import {
44
MCPServerStdio as UnderlyingMCPServerStdio,
55
MCPServerStreamableHttp as UnderlyingMCPServerStreamableHttp,
6+
MCPServerSSE as UnderlyingMCPServerSSE,
67
} from '@openai/agents-core/_shims';
78
import { getCurrentSpan, withMCPListToolsSpan } from './tracing';
89
import { logger as globalLogger, getLogger, Logger } from './logger';
@@ -24,6 +25,9 @@ export const DEFAULT_STDIO_MCP_CLIENT_LOGGER_NAME =
2425
export const DEFAULT_STREAMABLE_HTTP_MCP_CLIENT_LOGGER_NAME =
2526
'openai-agents:streamable-http-mcp-client';
2627

28+
export const DEFAULT_SSE_MCP_CLIENT_LOGGER_NAME =
29+
'openai-agents:sse-mcp-client';
30+
2731
/**
2832
* Interface for MCP server implementations.
2933
* Provides methods for connecting, listing tools, calling tools, and cleanup.
@@ -113,6 +117,41 @@ export abstract class BaseMCPServerStreamableHttp implements MCPServer {
113117
}
114118
}
115119

120+
export abstract class BaseMCPServerSSE implements MCPServer {
121+
public cacheToolsList: boolean;
122+
protected _cachedTools: any[] | undefined = undefined;
123+
public toolFilter?: MCPToolFilterCallable | MCPToolFilterStatic;
124+
125+
protected logger: Logger;
126+
constructor(options: MCPServerSSEOptions) {
127+
this.logger =
128+
options.logger ?? getLogger(DEFAULT_SSE_MCP_CLIENT_LOGGER_NAME);
129+
this.cacheToolsList = options.cacheToolsList ?? false;
130+
this.toolFilter = options.toolFilter;
131+
}
132+
133+
abstract get name(): string;
134+
abstract connect(): Promise<void>;
135+
abstract close(): Promise<void>;
136+
abstract listTools(): Promise<any[]>;
137+
abstract callTool(
138+
_toolName: string,
139+
_args: Record<string, unknown> | null,
140+
): Promise<CallToolResultContent>;
141+
abstract invalidateToolsCache(): Promise<void>;
142+
143+
/**
144+
* Logs a debug message when debug logging is enabled.
145+
* @param buildMessage A function that returns the message to log.
146+
*/
147+
protected debugLog(buildMessage: () => string): void {
148+
if (debug.enabled(this.logger.namespace)) {
149+
// only when this is true, the function to build the string is called
150+
this.logger.debug(buildMessage());
151+
}
152+
}
153+
}
154+
116155
/**
117156
* Minimum MCP tool data definition.
118157
* This type definition does not intend to cover all possible properties.
@@ -206,6 +245,42 @@ export class MCPServerStreamableHttp extends BaseMCPServerStreamableHttp {
206245
}
207246
}
208247

248+
export class MCPServerSSE extends BaseMCPServerSSE {
249+
private underlying: UnderlyingMCPServerSSE;
250+
constructor(options: MCPServerSSEOptions) {
251+
super(options);
252+
this.underlying = new UnderlyingMCPServerSSE(options);
253+
}
254+
get name(): string {
255+
return this.underlying.name;
256+
}
257+
connect(): Promise<void> {
258+
return this.underlying.connect();
259+
}
260+
close(): Promise<void> {
261+
return this.underlying.close();
262+
}
263+
async listTools(): Promise<MCPTool[]> {
264+
if (this.cacheToolsList && this._cachedTools) {
265+
return this._cachedTools;
266+
}
267+
const tools = await this.underlying.listTools();
268+
if (this.cacheToolsList) {
269+
this._cachedTools = tools;
270+
}
271+
return tools;
272+
}
273+
callTool(
274+
toolName: string,
275+
args: Record<string, unknown> | null,
276+
): Promise<CallToolResultContent> {
277+
return this.underlying.callTool(toolName, args);
278+
}
279+
invalidateToolsCache(): Promise<void> {
280+
return this.underlying.invalidateToolsCache();
281+
}
282+
}
283+
209284
/**
210285
* Fetches and flattens all tools from multiple MCP servers.
211286
* Logs and skips any servers that fail to respond.
@@ -467,6 +542,26 @@ export interface MCPServerStreamableHttpOptions {
467542
// ----------------------------------------------------
468543
}
469544

545+
export interface MCPServerSSEOptions {
546+
url: string;
547+
cacheToolsList?: boolean;
548+
clientSessionTimeoutSeconds?: number;
549+
name?: string;
550+
logger?: Logger;
551+
toolFilter?: MCPToolFilterCallable | MCPToolFilterStatic;
552+
timeout?: number;
553+
554+
// ----------------------------------------------------
555+
// OAuth
556+
// import { OAuthClientProvider } from '@modelcontextprotocol/sdk/client/auth.js';
557+
authProvider?: any;
558+
// RequestInit
559+
requestInit?: any;
560+
// import { SSEReconnectionOptions } from '@modelcontextprotocol/sdk/client/sse.js';
561+
eventSourceInit?: any;
562+
// ----------------------------------------------------
563+
}
564+
470565
/**
471566
* Represents a JSON-RPC request message.
472567
*/

packages/agents-core/src/shims/mcp-server/browser.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import {
2+
BaseMCPServerSSE,
23
BaseMCPServerStdio,
34
BaseMCPServerStreamableHttp,
45
CallToolResultContent,
6+
MCPServerSSEOptions,
57
MCPServerStdioOptions,
68
MCPServerStreamableHttpOptions,
79
MCPTool,
@@ -60,3 +62,33 @@ export class MCPServerStreamableHttp extends BaseMCPServerStreamableHttp {
6062
throw new Error('Method not implemented.');
6163
}
6264
}
65+
66+
export class MCPServerSSE extends BaseMCPServerSSE {
67+
constructor(params: MCPServerSSEOptions) {
68+
super(params);
69+
}
70+
71+
get name(): string {
72+
return 'MCPServerSSE';
73+
}
74+
connect(): Promise<void> {
75+
throw new Error('Method not implemented.');
76+
}
77+
close(): Promise<void> {
78+
throw new Error('Method not implemented.');
79+
}
80+
81+
listTools(): Promise<MCPTool[]> {
82+
throw new Error('Method not implemented.');
83+
}
84+
callTool(
85+
_toolName: string,
86+
_args: Record<string, unknown> | null,
87+
): Promise<CallToolResultContent> {
88+
throw new Error('Method not implemented.');
89+
}
90+
91+
invalidateToolsCache(): Promise<void> {
92+
throw new Error('Method not implemented.');
93+
}
94+
}

packages/agents-core/src/shims/mcp-server/node.ts

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ import { DEFAULT_REQUEST_TIMEOUT_MSEC } from '@modelcontextprotocol/sdk/shared/p
44
import {
55
BaseMCPServerStdio,
66
BaseMCPServerStreamableHttp,
7+
BaseMCPServerSSE,
78
CallToolResultContent,
89
DefaultMCPServerStdioOptions,
910
InitializeResult,
1011
MCPServerStdioOptions,
1112
MCPServerStreamableHttpOptions,
13+
MCPServerSSEOptions,
1214
MCPTool,
1315
invalidateServerToolsCache,
1416
} from '../../mcp';
@@ -166,6 +168,127 @@ export class NodeMCPServerStdio extends BaseMCPServerStdio {
166168
}
167169
}
168170

171+
export class NodeMCPServerSSE extends BaseMCPServerSSE {
172+
protected session: Client | null = null;
173+
protected _cacheDirty = true;
174+
protected _toolsList: any[] = [];
175+
protected serverInitializeResult: InitializeResult | null = null;
176+
protected clientSessionTimeoutSeconds?: number;
177+
protected timeout: number;
178+
179+
params: MCPServerSSEOptions;
180+
private _name: string;
181+
private transport: any = null;
182+
183+
constructor(params: MCPServerSSEOptions) {
184+
super(params);
185+
this.clientSessionTimeoutSeconds = params.clientSessionTimeoutSeconds ?? 5;
186+
this.params = params;
187+
this._name = params.name || `sse: ${this.params.url}`;
188+
this.timeout = params.timeout ?? DEFAULT_REQUEST_TIMEOUT_MSEC;
189+
}
190+
191+
async connect(): Promise<void> {
192+
try {
193+
const { SSEClientTransport } = await import(
194+
'@modelcontextprotocol/sdk/client/sse.js'
195+
).catch(failedToImport);
196+
const { Client } = await import(
197+
'@modelcontextprotocol/sdk/client/index.js'
198+
).catch(failedToImport);
199+
this.transport = new SSEClientTransport(new URL(this.params.url), {
200+
authProvider: this.params.authProvider,
201+
requestInit: this.params.requestInit,
202+
eventSourceInit: this.params.eventSourceInit,
203+
});
204+
this.session = new Client({
205+
name: this._name,
206+
version: '1.0.0', // You may want to make this configurable
207+
});
208+
await this.session.connect(this.transport);
209+
this.serverInitializeResult = {
210+
serverInfo: { name: this._name, version: '1.0.0' },
211+
} as InitializeResult;
212+
} catch (e) {
213+
this.logger.error('Error initializing MCP server:', e);
214+
await this.close();
215+
throw e;
216+
}
217+
this.debugLog(() => `Connected to MCP server: ${this._name}`);
218+
}
219+
220+
async invalidateToolsCache(): Promise<void> {
221+
await invalidateServerToolsCache(this.name);
222+
this._cacheDirty = true;
223+
}
224+
225+
async listTools(): Promise<MCPTool[]> {
226+
const { ListToolsResultSchema } = await import(
227+
'@modelcontextprotocol/sdk/types.js'
228+
).catch(failedToImport);
229+
if (!this.session) {
230+
throw new Error(
231+
'Server not initialized. Make sure you call connect() first.',
232+
);
233+
}
234+
if (this.cacheToolsList && !this._cacheDirty && this._toolsList) {
235+
return this._toolsList;
236+
}
237+
238+
this._cacheDirty = false;
239+
const response = await this.session.listTools();
240+
this.debugLog(() => `Listed tools: ${JSON.stringify(response)}`);
241+
this._toolsList = ListToolsResultSchema.parse(response).tools;
242+
return this._toolsList;
243+
}
244+
245+
async callTool(
246+
toolName: string,
247+
args: Record<string, unknown> | null,
248+
): Promise<CallToolResultContent> {
249+
const { CallToolResultSchema } = await import(
250+
'@modelcontextprotocol/sdk/types.js'
251+
).catch(failedToImport);
252+
if (!this.session) {
253+
throw new Error(
254+
'Server not initialized. Make sure you call connect() first.',
255+
);
256+
}
257+
const response = await this.session.callTool(
258+
{
259+
name: toolName,
260+
arguments: args ?? {},
261+
},
262+
undefined,
263+
{
264+
timeout: this.timeout,
265+
},
266+
);
267+
const parsed = CallToolResultSchema.parse(response);
268+
const result = parsed.content;
269+
this.debugLog(
270+
() =>
271+
`Called tool ${toolName} (args: ${JSON.stringify(args)}, result: ${JSON.stringify(result)})`,
272+
);
273+
return result as CallToolResultContent;
274+
}
275+
276+
get name() {
277+
return this._name;
278+
}
279+
280+
async close(): Promise<void> {
281+
if (this.transport) {
282+
await this.transport.close();
283+
this.transport = null;
284+
}
285+
if (this.session) {
286+
await this.session.close();
287+
this.session = null;
288+
}
289+
}
290+
}
291+
169292
export class NodeMCPServerStreamableHttp extends BaseMCPServerStreamableHttp {
170293
protected session: Client | null = null;
171294
protected _cacheDirty = true;

packages/agents-core/src/shims/shims-browser.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,11 @@ export function isTracingLoopRunningByDefault(): boolean {
109109
return false;
110110
}
111111

112-
export { MCPServerStdio, MCPServerStreamableHttp } from './mcp-server/browser';
112+
export {
113+
MCPServerStdio,
114+
MCPServerStreamableHttp,
115+
MCPServerSSE,
116+
} from './mcp-server/browser';
113117

114118
class BrowserTimer implements Timer {
115119
constructor() {}

packages/agents-core/src/shims/shims-node.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ export function isBrowserEnvironment(): boolean {
4444
export {
4545
NodeMCPServerStdio as MCPServerStdio,
4646
NodeMCPServerStreamableHttp as MCPServerStreamableHttp,
47+
NodeMCPServerSSE as MCPServerSSE,
4748
} from './mcp-server/node';
4849

4950
export { clearTimeout } from 'node:timers';

0 commit comments

Comments
 (0)