feature: support more transport of mcp server

This commit is contained in:
sean
2025-06-12 07:57:07 +08:00
parent ef39222b83
commit 0ff1abecfb
15 changed files with 2799 additions and 11 deletions

View File

@ -0,0 +1,181 @@
const { MCPStreamableHttpCommand } = require('../../lib/commands/MCPStreamableHttpCommand');
const http = require('http');
describe('MCP SSE Server Integration Tests', () => {
let command;
let port;
beforeEach(() => {
command = new MCPStreamableHttpCommand();
port = 3001 + Math.floor(Math.random() * 1000);
});
afterEach(async () => {
if (command.server && command.server.close) {
await new Promise((resolve) => {
command.server.close(resolve);
});
}
});
describe('SSE Transport', () => {
it('should start SSE server and handle dual endpoints', async () => {
// 启动 SSE 服务器
await command.execute({
transport: 'sse',
port,
host: 'localhost'
});
// 等待服务器启动
await new Promise(resolve => setTimeout(resolve, 200));
// 测试健康检查端点
const healthResponse = await makeHttpRequest({
hostname: 'localhost',
port,
path: '/health',
method: 'GET'
});
expect(healthResponse.statusCode).toBe(200);
const healthData = JSON.parse(healthResponse.data);
expect(healthData.status).toBe('ok');
}, 10000);
it('should establish SSE stream on GET /mcp', async () => {
await command.execute({ transport: 'sse', port, host: 'localhost' });
await new Promise(resolve => setTimeout(resolve, 200));
// 尝试建立 SSE 连接
const sseResponse = await makeHttpRequest({
hostname: 'localhost',
port,
path: '/mcp',
method: 'GET',
headers: {
'Accept': 'text/event-stream',
'Cache-Control': 'no-cache'
}
});
expect(sseResponse.statusCode).toBe(200);
expect(sseResponse.headers['content-type']).toContain('text/event-stream');
}, 10000);
it('should handle POST messages to /messages endpoint', async () => {
await command.execute({ transport: 'sse', port, host: 'localhost' });
await new Promise(resolve => setTimeout(resolve, 200));
// 先建立 SSE 连接获取会话ID
const sseResponse = await makeHttpRequest({
hostname: 'localhost',
port,
path: '/mcp',
method: 'GET',
headers: { 'Accept': 'text/event-stream' }
});
// 解析 SSE 响应获取会话ID
const sseData = sseResponse.data;
const endpointMatch = sseData.match(/event: endpoint\ndata: (.+)/);
let sessionId = 'test-session';
if (endpointMatch) {
const endpointData = JSON.parse(endpointMatch[1]);
const urlObj = new URL(endpointData.uri);
sessionId = urlObj.searchParams.get('sessionId');
}
// 发送初始化请求到 /messages 端点
const initRequest = {
jsonrpc: '2.0',
method: 'initialize',
params: {
protocolVersion: '2024-11-05',
capabilities: {},
clientInfo: { name: 'test-client', version: '1.0.0' }
},
id: 1
};
const response = await makeHttpRequest({
hostname: 'localhost',
port,
path: `/messages?sessionId=${sessionId}`,
method: 'POST',
headers: {
'Content-Type': 'application/json'
}
}, JSON.stringify(initRequest));
expect(response.statusCode).toBe(200);
}, 10000);
});
describe('Transport Type Selection', () => {
it('should start different transports based on parameter', async () => {
// 测试默认 HTTP 传输
const httpCommand = new MCPStreamableHttpCommand();
const httpPort = port + 100;
await httpCommand.execute({ transport: 'http', port: httpPort });
const httpHealth = await makeHttpRequest({
hostname: 'localhost',
port: httpPort,
path: '/health',
method: 'GET'
});
expect(httpHealth.statusCode).toBe(200);
// 清理
if (httpCommand.server) {
await new Promise(resolve => httpCommand.server.close(resolve));
}
// 测试 SSE 传输
const sseCommand = new MCPStreamableHttpCommand();
const ssePort = port + 200;
await sseCommand.execute({ transport: 'sse', port: ssePort });
const sseHealth = await makeHttpRequest({
hostname: 'localhost',
port: ssePort,
path: '/health',
method: 'GET'
});
expect(sseHealth.statusCode).toBe(200);
// 清理
if (sseCommand.server) {
await new Promise(resolve => sseCommand.server.close(resolve));
}
}, 15000);
});
});
// Helper function to make HTTP requests
function makeHttpRequest(options, data = null) {
return new Promise((resolve, reject) => {
const req = http.request(options, (res) => {
let responseData = '';
res.on('data', (chunk) => {
responseData += chunk;
});
res.on('end', () => {
resolve({
statusCode: res.statusCode,
headers: res.headers,
data: responseData
});
});
});
req.on('error', reject);
if (data) {
req.write(data);
}
req.end();
});
}

View File

@ -0,0 +1,261 @@
const { MCPStreamableHttpCommand } = require('../../lib/commands/MCPStreamableHttpCommand');
const http = require('http');
describe('MCPStreamableHttpCommand Integration Tests', () => {
let command;
let server;
let port;
beforeEach(() => {
command = new MCPStreamableHttpCommand();
port = 3001 + Math.floor(Math.random() * 1000); // 随机端口避免冲突
});
afterEach(async () => {
if (server && server.close) {
await new Promise((resolve) => {
server.close(resolve);
});
}
});
describe('Streamable HTTP Server', () => {
it('should start server and respond to health check', async () => {
// 启动服务器
const serverPromise = command.execute({
transport: 'http',
port,
host: 'localhost'
});
// 等待服务器启动
await new Promise(resolve => setTimeout(resolve, 100));
// 发送健康检查请求
const response = await makeHttpRequest({
hostname: 'localhost',
port,
path: '/health',
method: 'GET'
});
expect(response.statusCode).toBe(200);
}, 5000);
it('should handle MCP initialize request', async () => {
// 启动服务器
await command.execute({
transport: 'http',
port,
host: 'localhost'
});
// 等待服务器启动
await new Promise(resolve => setTimeout(resolve, 100));
// 发送初始化请求
const initRequest = {
jsonrpc: '2.0',
method: 'initialize',
params: {
protocolVersion: '2024-11-05',
capabilities: {},
clientInfo: {
name: 'test-client',
version: '1.0.0'
}
},
id: 1
};
const response = await makeHttpRequest({
hostname: 'localhost',
port,
path: '/mcp',
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'application/json, text/event-stream'
}
}, JSON.stringify(initRequest));
expect(response.statusCode).toBe(200);
const responseData = JSON.parse(response.data);
expect(responseData.jsonrpc).toBe('2.0');
expect(responseData.id).toBe(1);
}, 5000);
it('should handle tools/list request', async () => {
// 启动服务器
await command.execute({
transport: 'http',
port,
host: 'localhost'
});
// 等待服务器启动
await new Promise(resolve => setTimeout(resolve, 100));
// 先初始化
const initRequest = {
jsonrpc: '2.0',
method: 'initialize',
params: {
protocolVersion: '2024-11-05',
capabilities: {},
clientInfo: { name: 'test-client', version: '1.0.0' }
},
id: 1
};
const initResponse = await makeHttpRequest({
hostname: 'localhost',
port,
path: '/mcp',
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'application/json, text/event-stream'
}
}, JSON.stringify(initRequest));
const sessionId = JSON.parse(initResponse.data).result?.sessionId;
// 发送工具列表请求
const toolsRequest = {
jsonrpc: '2.0',
method: 'tools/list',
params: {},
id: 2
};
const response = await makeHttpRequest({
hostname: 'localhost',
port,
path: '/mcp',
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'application/json, text/event-stream',
'mcp-session-id': sessionId || 'test-session'
}
}, JSON.stringify(toolsRequest));
expect(response.statusCode).toBe(200);
const responseData = JSON.parse(response.data);
expect(responseData.result.tools).toBeDefined();
expect(Array.isArray(responseData.result.tools)).toBe(true);
expect(responseData.result.tools.length).toBe(6);
}, 5000);
it('should handle tool call request', async () => {
// 启动服务器
await command.execute({
transport: 'http',
port,
host: 'localhost'
});
// 等待服务器启动
await new Promise(resolve => setTimeout(resolve, 100));
// 发送工具调用请求
const toolCallRequest = {
jsonrpc: '2.0',
method: 'tools/call',
params: {
name: 'promptx_hello',
arguments: {}
},
id: 3
};
const response = await makeHttpRequest({
hostname: 'localhost',
port,
path: '/mcp',
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'application/json, text/event-stream',
'mcp-session-id': 'test-session'
}
}, JSON.stringify(toolCallRequest));
expect(response.statusCode).toBe(200);
const responseData = JSON.parse(response.data);
expect(responseData.result).toBeDefined();
}, 5000);
});
describe('Error Handling', () => {
it('should handle invalid JSON requests', async () => {
await command.execute({ transport: 'http', port, host: 'localhost' });
await new Promise(resolve => setTimeout(resolve, 100));
const response = await makeHttpRequest({
hostname: 'localhost',
port,
path: '/mcp',
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'application/json, text/event-stream'
}
}, 'invalid json');
expect(response.statusCode).toBe(400);
}, 5000);
it('should handle missing session ID for non-initialize requests', async () => {
await command.execute({ transport: 'http', port, host: 'localhost' });
await new Promise(resolve => setTimeout(resolve, 100));
const request = {
jsonrpc: '2.0',
method: 'tools/list',
params: {},
id: 1
};
const response = await makeHttpRequest({
hostname: 'localhost',
port,
path: '/mcp',
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'application/json, text/event-stream'
}
}, JSON.stringify(request));
expect(response.statusCode).toBe(400);
}, 5000);
});
});
// Helper function to make HTTP requests
function makeHttpRequest(options, data = null) {
return new Promise((resolve, reject) => {
const req = http.request(options, (res) => {
let responseData = '';
res.on('data', (chunk) => {
responseData += chunk;
});
res.on('end', () => {
resolve({
statusCode: res.statusCode,
headers: res.headers,
data: responseData
});
});
});
req.on('error', reject);
if (data) {
req.write(data);
}
req.end();
});
}

View File

@ -0,0 +1,178 @@
const { MCPStreamableHttpCommand } = require('../../lib/commands/MCPStreamableHttpCommand');
describe('MCPStreamableHttpCommand', () => {
let command;
beforeEach(() => {
command = new MCPStreamableHttpCommand();
});
describe('constructor', () => {
it('should initialize with correct name and version', () => {
expect(command.name).toBe('promptx-mcp-streamable-http-server');
expect(command.version).toBe('1.0.0');
});
it('should have default configuration', () => {
expect(command.transport).toBe('http');
expect(command.port).toBe(3000);
expect(command.host).toBe('localhost');
});
});
describe('execute', () => {
it('should throw error when transport type is unsupported', async () => {
await expect(command.execute({ transport: 'unsupported' }))
.rejects
.toThrow('Unsupported transport: unsupported');
});
it('should start Streamable HTTP server with default options', async () => {
const mockStartStreamableHttpServer = jest.fn().mockResolvedValue();
command.startStreamableHttpServer = mockStartStreamableHttpServer;
await command.execute();
expect(mockStartStreamableHttpServer).toHaveBeenCalledWith(3000, 'localhost');
});
it('should start Streamable HTTP server with custom options', async () => {
const mockStartStreamableHttpServer = jest.fn().mockResolvedValue();
command.startStreamableHttpServer = mockStartStreamableHttpServer;
await command.execute({ transport: 'http', port: 4000, host: '0.0.0.0' });
expect(mockStartStreamableHttpServer).toHaveBeenCalledWith(4000, '0.0.0.0');
});
it('should start SSE server when transport is sse', async () => {
const mockStartSSEServer = jest.fn().mockResolvedValue();
command.startSSEServer = mockStartSSEServer;
await command.execute({ transport: 'sse', port: 3001 });
expect(mockStartSSEServer).toHaveBeenCalledWith(3001, 'localhost');
});
});
describe('startStreamableHttpServer', () => {
it('should create Express app and listen on specified port', async () => {
// Mock Express
const mockApp = {
use: jest.fn(),
post: jest.fn(),
get: jest.fn(),
delete: jest.fn(),
listen: jest.fn((port, callback) => callback())
};
const mockExpress = jest.fn(() => mockApp);
mockExpress.json = jest.fn();
// Mock the method to avoid actual server startup
const originalMethod = command.startStreamableHttpServer;
command.startStreamableHttpServer = jest.fn().mockImplementation(async (port, host) => {
expect(port).toBe(3000);
expect(host).toBe('localhost');
return Promise.resolve();
});
await command.startStreamableHttpServer(3000, 'localhost');
expect(command.startStreamableHttpServer).toHaveBeenCalledWith(3000, 'localhost');
});
});
describe('startSSEServer', () => {
it('should create Express app with dual endpoints', async () => {
// Mock the method to avoid actual server startup
command.startSSEServer = jest.fn().mockImplementation(async (port, host) => {
expect(port).toBe(3000);
expect(host).toBe('localhost');
return Promise.resolve();
});
await command.startSSEServer(3000, 'localhost');
expect(command.startSSEServer).toHaveBeenCalledWith(3000, 'localhost');
});
});
describe('setupMCPServer', () => {
it('should create MCP server with correct configuration', () => {
const server = command.setupMCPServer();
expect(server).toBeDefined();
// We'll verify the server has the correct tools in integration tests
});
});
describe('getToolDefinitions', () => {
it('should return all PromptX tools', () => {
const tools = command.getToolDefinitions();
expect(Array.isArray(tools)).toBe(true);
expect(tools.length).toBe(6); // All PromptX tools
const toolNames = tools.map(tool => tool.name);
expect(toolNames).toContain('promptx_init');
expect(toolNames).toContain('promptx_hello');
expect(toolNames).toContain('promptx_action');
expect(toolNames).toContain('promptx_learn');
expect(toolNames).toContain('promptx_recall');
expect(toolNames).toContain('promptx_remember');
});
});
describe('handleMCPRequest', () => {
it('should handle tool calls correctly', async () => {
const mockReq = {
body: {
jsonrpc: '2.0',
method: 'tools/call',
params: {
name: 'promptx_hello',
arguments: {}
},
id: 1
},
headers: {}
};
const mockRes = {
json: jest.fn(),
status: jest.fn().mockReturnThis(),
headersSent: false
};
// Mock CLI execution
const mockCli = {
execute: jest.fn().mockResolvedValue('Hello response')
};
command.cli = mockCli;
command.handleMCPRequest = jest.fn().mockImplementation(async (req, res) => {
expect(req.body.method).toBe('tools/call');
res.json({ result: 'success' });
});
await command.handleMCPRequest(mockReq, mockRes);
expect(command.handleMCPRequest).toHaveBeenCalledWith(mockReq, mockRes);
});
});
describe('configuration validation', () => {
it('should validate port number', () => {
expect(() => command.validatePort(3000)).not.toThrow();
expect(() => command.validatePort('invalid')).toThrow('Port must be a number');
expect(() => command.validatePort(70000)).toThrow('Port must be between 1 and 65535');
});
it('should validate host address', () => {
expect(() => command.validateHost('localhost')).not.toThrow();
expect(() => command.validateHost('0.0.0.0')).not.toThrow();
expect(() => command.validateHost('192.168.1.1')).not.toThrow();
expect(() => command.validateHost('')).toThrow('Host cannot be empty');
});
});
});