mcp服务器必要知识-TypeScript

mcp现在有好几种开发包,python/TypeScript/Java/Kotlin

本文以TypeScript的服务器为例,相对来说构建和部署过程更为简洁直观。

mcp自建服务器基本要涉及一些代码,需要一点点基础(我也只有一点点。。。)

ts相关的一些工具

mcp ts 官方sdk:https://github.com/modelcontextprotocol/typescript-sdk

mcp ts 快速开发框架:https://github.com/wong2/litemcp

mcp cli 测试服务器:https://github.com/wong2/mcp-cli

通过 see运行mcp:https://github.com/supercorp-ai/supergateway

基本结构

官方的sdk示例代码

一个简单的结构,大致案例为下面,感觉没什么好讲解的,就是基本的js/ts结构代码,可以做http请求什么的

  • 引用依赖

  • 创建mcp服务器

  • 添加工具

    • 可以添加多个

  • 启动服务器

import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";

// 创建MCP服务器
const server = new McpServer({
    name: "example-server",
    version: "1.0.0"
});

// 注册第一个工具
server.tool("tool1", "工具1描述", {...}, async (params) => {...});

// 注册第二个工具
server.tool("tool2", "工具2描述", {...}, async (params) => {...});

// 启动服务器
const transport = new StdioServerTransport();
await server.connect(transport);

示例1:网络搜索工具

并不能直接使用,仅供参考语法

import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { z } from "zod";
import fetch from "node-fetch";

const server = new McpServer({
    name: "web-search-server",
    version: "1.0.0"
});

server.tool(
    "search_web",
    "搜索网络获取最新信息",
    {
        query: z.string().describe("搜索查询"),
        maxResults: z.number().optional().describe("返回结果的最大数量,默认为5")
    },
    async ({ query, maxResults = 5 }) => {
        try {
            const apiKey = process.env.SEARCH_API_KEY;
            if (!apiKey) {
                throw new Error("未配置搜索API密钥");
            }

            const response = await fetch(`https://api.search.com/v1/search?q=${encodeURIComponent(query)}&limit=${maxResults}`, {
                headers: { "Authorization": `Bearer ${apiKey}` }
            });

            if (!response.ok) {
                throw new Error(`搜索请求失败: ${response.statusText}`);
            }

            const data = await response.json();
            const results = data.results.map(item => `- ${item.title}: ${item.snippet}`).join("\n");

            return {
                content: [
                    {
                        type: "text",
                        text: `搜索结果:\n${results}`
                    }
                ]
            };
        } catch (error) {
            const errorMessage = error instanceof Error ? error.message : '未知错误';
            return {
                content: [
                    {
                        type: "text",
                        text: `搜索执行出错: ${errorMessage}`
                    }
                ]
            };
        }
    }
);

async function main() {
    const transport = new StdioServerTransport();
    await server.connect(transport);
    console.error("搜索服务器已启动");
}

main().catch((error) => {
    console.error("服务器启动失败:", error);
    process.exit(1);
});

示例2:数据库查询工具

并不能直接使用,仅供参考语法

import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { z } from "zod";
import { Pool } from "pg";

// 创建数据库连接池
const pool = new Pool({
    user: process.env.DB_USER,
    host: process.env.DB_HOST,
    database: process.env.DB_NAME,
    password: process.env.DB_PASSWORD,
    port: parseInt(process.env.DB_PORT || "5432")
});

const server = new McpServer({
    name: "database-server",
    version: "1.0.0"
});

server.tool(
    "query_database",
    "执行SQL查询并返回结果",
    {
        query: z.string().describe("SQL查询语句"),
        params: z.array(z.any()).optional().describe("查询参数数组")
    },
    async ({ query, params = [] }) => {
        try {
            // 安全检查,防止危险的查询
            if (/DROP|DELETE|UPDATE|INSERT|ALTER/i.test(query)) {
                throw new Error("仅允许SELECT查询");
            }

            const result = await pool.query(query, params);
            const rows = result.rows;

            if (rows.length === 0) {
                return {
                    content: [
                        { type: "text", text: "查询未返回结果" }
                    ]
                };
            }

            // 创建表格形式的结果
            const headers = Object.keys(rows[0]);
            const tableHeader = `| ${headers.join(' | ')} |`;
            const separator = `| ${headers.map(() => '---').join(' | ')} |`;
            const tableRows = rows.map(row => 
                `| ${headers.map(h => String(row[h] || '')).join(' | ')} |`
            ).join('\n');

            const tableResult = `${tableHeader}\n${separator}\n${tableRows}`;

            return {
                content: [
                    { type: "text", text: `查询结果:\n\n${tableResult}` }
                ]
            };
        } catch (error) {
            const errorMessage = error instanceof Error ? error.message : '未知错误';
            return {
                content: [
                    { type: "text", text: `数据库查询出错: ${errorMessage}` }
                ]
            };
        }
    }
);

async function main() {
    const transport = new StdioServerTransport();
    await server.connect(transport);
    console.error("数据库服务器已启动");
}

main().catch((error) => {
    console.error("服务器启动失败:", error);
    process.exit(1);
});

示例3:dify chat调用工具

import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { z } from "zod";

// 定义 Dify API 配置类型
type DifyConfig = {
    apiKey: string;
    apiEndpoint: string;
};

// Dify API 配置
const difyConfig: DifyConfig = {
    apiKey: 'app-OozuuCxxxxxxxxxG',
    apiEndpoint: 'https://api.dify.ai/v1',
};

// 创建 MCP 服务器
const server = new McpServer({
    name: "dify-diffy-server",
    version: "1.0.0"
});

// 注册 Dify 对话工具
server.tool(
    "generate_dify",
    "使用dify调用halo相关技术文档",
    {
        message: z.string().describe("查询halo相关技术文档"),
    },
    async ({ message }) => {
        try {
            const response = await fetch(`${difyConfig.apiEndpoint}/chat-messages`, {
                method: 'POST',
                headers: {
                    'Authorization': `Bearer ${difyConfig.apiKey}`,
                    'Content-Type': 'application/json',
                },
                body: JSON.stringify({
                    inputs: {},
                    query: message,
                    user: "user",
                    stream: false,
                    conversation_id: null,
                })
            });

            if (!response.ok) {
                throw new Error(`HTTP error! status: ${response.status} ${response.statusText} ${response.body}`);
            }

            const data = await response.json();
            return {
                content: [
                    {
                        type: "text",
                        text: data.answer
                    }
                ]
            };
        } catch (error: unknown) {
            const errorMessage = error instanceof Error ? error.message : '未知错误';
            return {
                content: [
                    {
                        type: "text",
                        text: `调用 Dify API 时发生错误: ${errorMessage}`
                    }
                ]
            };
        }
    }
);

// 启动服务器
async function main() {
    const transport = new StdioServerTransport();
    await server.connect(transport);
    console.error("服务器已启动");
}

main().catch((error) => {
    console.error("服务器启动失败:", error);
    process.exit(1);
});

lite mcp 快速开发框架

wong2/litemcp 是一个构建于官方SDK之上的高级框架,旨在简化MCP服务器开发过程,提供更优雅的API接口。

核心特性

  • 简化API:提供更简洁、语义化的API,降低开发难度

  • 全面集成:工具、资源、提示定义更加直观

  • 内置功能:包含日志记录、错误处理、CLI工具等实用功能

  • TypeScript支持:完整的TypeScript类型定义

  • SSE传输支持:内置对SSE传输协议的支持

示例代码

import { LiteMCP } from "litemcp";
import { z } from "zod";

const server = new LiteMCP("demo", "1.0.0");

// 添加工具更加简洁
server.addTool({
  name: "add",
  description: "Add two numbers",
  parameters: z.object({
    a: z.number(),
    b: z.number(),
  }),
  execute: async (args) => {
    return args.a + args.b;
  },
});

// 添加资源
server.addResource({
  uri: "file:///logs/app.log",
  name: "Application Logs",
  mimeType: "text/plain",
  async load() {
    return { text: "Example log content" };
  },
});

server.start();

litemcp和官方sdk应该选择哪一个?

工具没有好坏,挑你会用的即可。市面上大部分还是官方原版sdk更多,新手学习建议从官方版入手,

一个同样功能做一下对比

// 基于官方SDK开发自定义工具
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { z } from "zod";

const server = new McpServer({
    name: "custom-analyzer",
    version: "1.0.0"
});

server.tool(
    "analyze_code",
    "分析代码复杂度",
    {
        code: z.string().describe("要分析的代码"),
        language: z.string().describe("代码语言")
    },
    async ({ code, language }) => {
        // 实现代码分析逻辑
        return { content: [{ type: "text", text: "分析结果..." }] };
    }
);

async function main() {
    const transport = new StdioServerTransport();
    await server.connect(transport);
}

main();
// 使用LiteMCP框架快速开发工具
import { LiteMCP } from "litemcp";
import { z } from "zod";

const server = new LiteMCP("code-helper", "1.0.0");

server.addTool({
  name: "generate_unit_test",
  description: "为给定代码生成单元测试",
  parameters: z.object({
    code: z.string(),
    framework: z.enum(["jest", "mocha", "pytest"]),
  }),
  execute: async (args) => {
    // 单元测试生成逻辑
    return `生成的单元测试代码...`;
  },
});

server.start();