forked from modelcontextprotocol/typescript-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsseAndStreamableHttpCompatibleServer.ts
More file actions
251 lines (229 loc) · 9.24 KB
/
sseAndStreamableHttpCompatibleServer.ts
File metadata and controls
251 lines (229 loc) · 9.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
import { Request, Response } from 'express';
import { randomUUID } from 'node:crypto';
import { McpServer } from '../../server/mcp.js';
import { StreamableHTTPServerTransport } from '../../server/streamableHttp.js';
import { SSEServerTransport } from '../../server/sse.js';
import * as z from 'zod/v4';
import { CallToolResult, isInitializeRequest } from '../../types.js';
import { InMemoryEventStore } from '../shared/inMemoryEventStore.js';
import { createMcpExpressApp } from '../../server/express.js';
/**
* This example server demonstrates backwards compatibility with both:
* 1. The deprecated HTTP+SSE transport (protocol version 2024-11-05)
* 2. The Streamable HTTP transport (protocol version 2025-03-26)
*
* It maintains a single MCP server instance but exposes two transport options:
* - /mcp: The new Streamable HTTP endpoint (supports GET/POST/DELETE)
* - /sse: The deprecated SSE endpoint for older clients (GET to establish stream)
* - /messages: The deprecated POST endpoint for older clients (POST to send messages)
*/
const getServer = () => {
const server = new McpServer(
{
name: 'backwards-compatible-server',
version: '1.0.0'
},
{ capabilities: { logging: {} } }
);
// Register a simple tool that sends notifications over time
server.tool(
'start-notification-stream',
'Starts sending periodic notifications for testing resumability',
{
interval: z.number().describe('Interval in milliseconds between notifications').default(100),
count: z.number().describe('Number of notifications to send (0 for 100)').default(50)
},
async ({ interval, count }, extra): Promise<CallToolResult> => {
const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
let counter = 0;
while (count === 0 || counter < count) {
counter++;
try {
await server.sendLoggingMessage(
{
level: 'info',
data: `Periodic notification #${counter} at ${new Date().toISOString()}`
},
extra.sessionId
);
} catch (error) {
console.error('Error sending notification:', error);
}
// Wait for the specified interval
await sleep(interval);
}
return {
content: [
{
type: 'text',
text: `Started sending periodic notifications every ${interval}ms`
}
]
};
}
);
return server;
};
// Create Express application
const app = createMcpExpressApp();
// Store transports by session ID
const transports: Record<string, StreamableHTTPServerTransport | SSEServerTransport> = {};
//=============================================================================
// STREAMABLE HTTP TRANSPORT (PROTOCOL VERSION 2025-03-26)
//=============================================================================
// Handle all MCP Streamable HTTP requests (GET, POST, DELETE) on a single endpoint
app.all('/mcp', async (req: Request, res: Response) => {
console.log(`Received ${req.method} request to /mcp`);
try {
// Check for existing session ID
const sessionId = req.headers['mcp-session-id'] as string | undefined;
let transport: StreamableHTTPServerTransport;
if (sessionId && transports[sessionId]) {
// Check if the transport is of the correct type
const existingTransport = transports[sessionId];
if (existingTransport instanceof StreamableHTTPServerTransport) {
// Reuse existing transport
transport = existingTransport;
} else {
// Transport exists but is not a StreamableHTTPServerTransport (could be SSEServerTransport)
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Bad Request: Session exists but uses a different transport protocol'
},
id: null
});
return;
}
} else if (!sessionId && req.method === 'POST' && isInitializeRequest(req.body)) {
const eventStore = new InMemoryEventStore();
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
eventStore, // Enable resumability
onsessioninitialized: sessionId => {
// Store the transport by session ID when session is initialized
console.log(`StreamableHTTP session initialized with ID: ${sessionId}`);
transports[sessionId] = transport;
}
});
// Set up onclose handler to clean up transport when closed
transport.onclose = () => {
const sid = transport.sessionId;
if (sid && transports[sid]) {
console.log(`Transport closed for session ${sid}, removing from transports map`);
delete transports[sid];
}
};
// Connect the transport to the MCP server
const server = getServer();
await server.connect(transport);
} else {
// Invalid request - no session ID or not initialization request
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Bad Request: No valid session ID provided'
},
id: null
});
return;
}
// Handle the request with the transport
await transport.handleRequest(req, res, req.body);
} catch (error) {
console.error('Error handling MCP request:', error);
if (!res.headersSent) {
res.status(500).json({
jsonrpc: '2.0',
error: {
code: -32603,
message: 'Internal server error'
},
id: null
});
}
}
});
//=============================================================================
// DEPRECATED HTTP+SSE TRANSPORT (PROTOCOL VERSION 2024-11-05)
//=============================================================================
app.get('/sse', async (req: Request, res: Response) => {
console.log('Received GET request to /sse (deprecated SSE transport)');
const transport = new SSEServerTransport('/messages', res);
transports[transport.sessionId] = transport;
res.on('close', () => {
delete transports[transport.sessionId];
});
const server = getServer();
await server.connect(transport);
});
app.post('/messages', async (req: Request, res: Response) => {
const sessionId = req.query.sessionId as string;
let transport: SSEServerTransport;
const existingTransport = transports[sessionId];
if (existingTransport instanceof SSEServerTransport) {
// Reuse existing transport
transport = existingTransport;
} else {
// Transport exists but is not a SSEServerTransport (could be StreamableHTTPServerTransport)
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Bad Request: Session exists but uses a different transport protocol'
},
id: null
});
return;
}
if (transport) {
await transport.handlePostMessage(req, res, req.body);
} else {
res.status(400).send('No transport found for sessionId');
}
});
// Start the server
const PORT = 3000;
app.listen(PORT, error => {
if (error) {
console.error('Failed to start server:', error);
process.exit(1);
}
console.log(`Backwards compatible MCP server listening on port ${PORT}`);
console.log(`
==============================================
SUPPORTED TRANSPORT OPTIONS:
1. Streamable Http(Protocol version: 2025-03-26)
Endpoint: /mcp
Methods: GET, POST, DELETE
Usage:
- Initialize with POST to /mcp
- Establish SSE stream with GET to /mcp
- Send requests with POST to /mcp
- Terminate session with DELETE to /mcp
2. Http + SSE (Protocol version: 2024-11-05)
Endpoints: /sse (GET) and /messages (POST)
Usage:
- Establish SSE stream with GET to /sse
- Send requests with POST to /messages?sessionId=<id>
==============================================
`);
});
// Handle server shutdown
process.on('SIGINT', async () => {
console.log('Shutting down server...');
// Close all active transports to properly clean up resources
for (const sessionId in transports) {
try {
console.log(`Closing transport for session ${sessionId}`);
await transports[sessionId].close();
delete transports[sessionId];
} catch (error) {
console.error(`Error closing transport for session ${sessionId}:`, error);
}
}
console.log('Server shutdown complete');
process.exit(0);
});