MCP Swift SDK
官方Swift SDK 模型上下文协议 (MCP)。
概述
模型上下文协议(MCP)定义了一种标准化的方法 用于应用程序与AI和ML模型通信。 这个Swift SDK实现了客户端和服务器端组件 根据 2025-11-25 (最新)版本 MCP规范。
目录
-
需求
-
安装
-
客户端使用情况
-
基本客户端设置
-
客户的运输选择
-
工具
-
资源
-
提示
-
补全
-
采样
-
引出
-
根
-
日志记录
-
错误处理
-
取消
-
进度跟踪
-
高级客户端功能
-
服务器使用情况
-
基本服务器设置
-
工具
-
资源
-
提示
-
补全
-
采样
-
精英
-
根
-
日志记录
-
进度跟踪
-
初始化钩子
-
优雅地关闭
-
运输
-
认证
-
客户端:客户端凭据流
-
客户端:授权码流
-
客户端:自定义令牌提供程序
-
客户端:自定义令牌存储
-
客户端:private_key_jwt身份验证
-
客户端:端点覆盖
-
服务器:提供受保护的资源元数据
-
服务器:验证承载令牌
-
平台可用性
-
调试和记录
-
其他资源
-
更新日志
-
许可证
需求
- Swift 6.0+(Xcode 16+)
看 平台可用性 下节 针对特定平台的要求。
安装
Swift包管理器
将以下内容添加到您的 Package.swift 文件:
SWIFT``` 1 2 3dependencies: [ .package(url: “https://github.com/modelcontextprotocol/swift-sdk.git”, from: “0.11.0”) ]
然后将依赖关系添加到目标:
SWIFT```
1
2
3
4
5
6.target(
name: "YourTarget",
dependencies: [
.product(name: "MCP", package: "swift-sdk")
]
)
客户端使用情况
客户端组件允许您的应用程序连接到MCP服务器。
基本客户端设置
SWIFT``` 1 2 3 4 5 6 7 8 9 10import MCP // Initialize the client let client = Client(name: “MyApp”, version: “1.0.0”) // Create a transport and connect let transport = StdioTransport() let result = try await client.connect(transport: transport) // Check server capabilities if result.capabilities.tools != nil { // Server supports tools (implicitly including tool calling if the ‘tools’ capability object is present) }
> [!注意]
> 这 `Client.connect(transport:)` 方法返回初始化结果。
> 该返回值是可丢弃的,
> 因此,如果不需要检查服务器功能,可以忽略它。
### 客户的运输选择
#### 标准运输
对于本地子流程通信:
SWIFT```
1
2
3// Create a stdio transport (simplest option)
let transport = StdioTransport()
try await client.connect(transport: transport)
HTTP传输
对于远程服务器通信:
SWIFT``` 1 2 3 4 5 6// Create a streaming HTTP transport let transport = HTTPClientTransport( endpoint: URL(string: “http://localhost:8080”)!, streaming: true // Enable Server-Sent Events for real-time updates ) try await client.connect(transport: transport)
### 工具
工具表示客户端可以调用的函数:
SWIFT```
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32// List available tools
let (tools, cursor) = try await client.listTools()
print("Available tools: \(tools.map { $0.name }.joined(separator: ", "))")
// Call a tool with arguments and get the result
let (content, isError) = try await client.callTool(
name: "image-generator",
arguments: [
"prompt": "A serene mountain landscape at sunset",
"style": "photorealistic",
"width": 1024,
"height": 768
]
)
// Handle tool content
for item in content {
switch item {
case .text(let text):
print("Generated text: \(text)")
case .image(let data, let mimeType, let metadata):
if let width = metadata?["width"] as? Int,
let height = metadata?["height"] as? Int {
print("Generated \(width)x\(height) image of type \(mimeType)")
// Save or display the image data
}
case .audio(let data, let mimeType):
print("Received audio data of type \(mimeType)")
case .resource(let resource, _, _):
print("Received embedded resource: \(resource)")
case .resourceLink(let uri, let name, _, _, let mimeType, _):
print("Resource link: \(name) at \(uri), type: \(mimeType ?? "unknown")")
}
}
资源
资源表示可以访问和潜在订阅的数据:
SWIFT``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18// List available resources let (resources, nextCursor) = try await client.listResources() print(“Available resources: (resources.map { $0.uri }.joined(separator: ”, ”))”) // Read a resource let contents = try await client.readResource(uri: “resource://example”) print(“Resource content: (contents)”) // Subscribe to resource updates if supported if result.capabilities.resources?.subscribe == true { try await client.subscribeToResource(uri: “resource://example”) // Register notification handler await client.onNotification(ResourceUpdatedNotification.self) { message in let uri = message.params.uri print(“Resource (uri) updated with new content”) // Fetch the updated resource content let updatedContents = try await client.readResource(uri: uri) print(“Updated resource content received”) } }
### 提示
提示代表模板化的对话开始者:
SWIFT```
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19// List available prompts
let (prompts, nextCursor) = try await client.listPrompts()
print("Available prompts: \(prompts.map { $0.name }.joined(separator: ", "))")
// Get a prompt with arguments
let (description, messages) = try await client.getPrompt(
name: "customer-service",
arguments: [
"customerName": "Alice",
"orderNumber": "ORD-12345",
"issue": "delivery delay"
]
)
// Use the prompt messages in your application
print("Prompt description: \(description)")
for message in messages {
if case .text(text: let text) = message.content {
print("\(message.role): \(text)")
}
}
补全
补全允许服务器在用户键入时为提示和资源模板参数提供自动补全建议:
SWIFT``` 1 2 3 4 5 6 7 8 9 10 11 12 13// Request completions for a prompt argument let completion = try await client.complete( promptName: “code_review”, argumentName: “language”, argumentValue: “py” ) // Display suggestions to the user for value in completion.values { print(“Suggestion: (value)”) } if completion.hasMore == true { print(“More suggestions available (total: (completion.total ?? 0))”) }
您还可以提供已解析参数的上下文:
SWIFT```
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15// First, user selects a language
let languageCompletion = try await client.complete(
promptName: "code_review",
argumentName: "language",
argumentValue: "py"
)
// User selects "python"
// Then get framework suggestions based on the selected language
let frameworkCompletion = try await client.complete(
promptName: "code_review",
argumentName: "framework",
argumentValue: "fla",
context: ["language": .string("python")]
)
// Returns: ["flask"]
完成资源模板的工作:
SWIFT``` 1 2 3 4 5 6 7// Get path completions for a resource URI template let pathCompletion = try await client.complete( resourceURI: “file:///{path}”, argumentName: “path”, argumentValue: “/usr/” ) // Returns: [“/usr/bin”, “/usr/lib”, “/usr/local”]
### 采样
采样允许服务器通过客户端请求LLM完成,
实现代理行为,同时保持人类在环控制。
客户端注册一个处理程序来处理来自服务器的传入采样请求。
> [!提示]
> 采样请求来自 **服务器到客户端**,
> 不是客户端到服务器。
> 这使服务器能够请求AI协助
> 而客户端保持对模型访问和用户批准的控制。
SWIFT```
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26// Register a sampling handler in the client
await client.withSamplingHandler { parameters in
// Review the sampling request (human-in-the-loop step 1)
print("Server requests completion for: \(parameters.messages)")
// Optionally modify the request based on user input
var messages = parameters.messages
if let systemPrompt = parameters.systemPrompt {
print("System prompt: \(systemPrompt)")
}
// Sample from your LLM (this is where you'd call your AI service)
let completion = try await callYourLLMService(
messages: messages,
maxTokens: parameters.maxTokens,
temperature: parameters.temperature
)
// Review the completion (human-in-the-loop step 2)
print("LLM generated: \(completion)")
// User can approve, modify, or reject the completion here
// Return the result to the server
return CreateSamplingMessage.Result(
model: "your-model-name",
stopReason: .endTurn,
role: .assistant,
content: .text(completion)
)
}
引出
Elicitation允许服务器通过客户端直接向用户请求结构化信息。 当服务器需要原始请求中未提供的用户输入时,这很有用, 例如凭据、配置选择或敏感操作的批准。
[!提示] 征集请求来自 服务器到客户端, 类似于采样。 客户端必须注册一个处理程序来响应来自服务器的启发请求。
客户端:处理激励请求
注册一个启发处理程序来响应服务器请求:
SWIFT``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30// Register an elicitation handler in the client await client.withElicitationHandler { parameters in switch parameters { case .form(let form): // Display the request to the user print(“Server requests: (form.message)”) // If a schema was provided, inspect it if let schema = form.requestedSchema { print(“Required fields: (schema.required ?? [])”) print(“Schema: (schema.properties)”) } // Present UI to collect user input let userResponse = presentElicitationUI(form) // Return the user’s response if userResponse.accepted { return CreateElicitation.Result( action: .accept, content: userResponse.data ) } else if userResponse.canceled { return CreateElicitation.Result(action: .cancel) } else { return CreateElicitation.Result(action: .decline) } case .url(let url): // Direct the user to an external URL (e.g., for OAuth) openURL(url.url) return CreateElicitation.Result(action: .accept) } }
启发的常见用例:
- **认证**:在需要时申请证书,而不是预先申请
- **确认**:在进行敏感操作之前,请获得用户批准
- **配置**:在操作过程中收集偏好或设置
- **缺失信息**:请求最初未提供的其他详细信息
### 根
根定义了客户端向服务器公开的文件系统边界。服务器通过发送 `roots/list` 向客户提出请求;当列表更改时,客户端会通知服务器。
> [!提示]
> 要使用根,请声明 `roots` 创建客户端时的能力。
SWIFT```
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16let client = Client(
name: "MyApp",
version: "1.0.0",
capabilities: .init(
roots: .init(listChanged: true)
)
)
// Register a handler for roots/list requests from servers
await client.withRootsHandler {
return [
Root(uri: "file:///Users/user/projects", name: "Projects"),
Root(uri: "file:///Users/user/documents", name: "Documents")
]
}
// Notify connected servers whenever roots change
try await client.notifyRootsChanged()
日志记录
客户端可以控制服务器日志级别并接收结构化日志消息:
SWIFT``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17// Set the minimum logging level try await client.setLoggingLevel(.warning) // Register a handler for log messages from the server await client.onNotification(LogMessageNotification.self) { message in let level = message.params.level // LogLevel (debug, info, warning, etc.) let logger = message.params.logger // Optional logger name let data = message.params.data // Arbitrary JSON data // Display log message based on level switch level { case .error, .critical, .alert, .emergency: print(”❌ [(logger ?? “server”)] (data)”) case .warning: print(“⚠️ [(logger ?? “server”)] (data)”) default: print(“ℹ️ [(logger ?? “server”)] (data)”) } }
日志级别遵循标准系统日志严重性级别(RFC 5424):
- **调试**:详细的调试信息
- **信息**:一般信息性消息
- **通知**:正常但重要的事件
- **警告**:警告条件
- **错误**:错误条件
- **关键的**:临界条件
- **警报**:必须立即采取行动
- **紧急情况**:系统不可用
### 错误处理
处理常见的客户端错误:
SWIFT```
1
2
3
4
5
6
7
8do {
try await client.connect(transport: transport)
// Success
} catch let error as MCPError {
print("MCP Error: \(error.localizedDescription)")
} catch {
print("Unexpected error: \(error)")
}
取消
任何一方都可以取消正在进行的请求,并优雅地处理传入的取消:
选项1:具有请求上下文过载的便利方法
对于工具调用等常见操作,使用重载方法返回 RequestContext:
SWIFT``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14// Call a tool and get a context for cancellation let context = try client.callTool( name: “long-running-analysis”, arguments: [“data”: largeDataset] ) // You can cancel the request at any time try await client.cancelRequest(context.requestID, reason: “User cancelled”) // Await the result (will throw CancellationError if cancelled) do { let result = try await context.value print(“Result: (result.content)”) } catch is CancellationError { print(“Request was cancelled”) }
#### 选项2:直接发送()以获得最大的灵活性
对于完全控制或自定义请求,请使用 `send()` 直接:
SWIFT```
1
2
3
4
5
6
7
8
9
10
11
12
13// Create any request type
let request = CallTool.request(.init(
name: "long-running-analysis",
arguments: ["data": largeDataset]
))
// Send and get a context for cancellation tracking
let context: RequestContext<CallTool.Result> = try client.send(request)
// Cancel when needed
try await client.cancelRequest(context.requestID, reason: "Timeout")
// Get the result
let result = try await context.value
let content = result.content
let isError = result.isError
进度跟踪
客户端可以将进度令牌附加到请求中,并接收长时间运行的操作的增量进度更新:
SWIFT``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19// Call a tool with progress tracking let progressToken = ProgressToken.unique() // Register a notification handler to receive progress updates await client.onNotification(ProgressNotification.self) { message in let params = message.params // Filter by your progress token if params.progressToken == progressToken { print(“Progress: (params.progress)/(params.total ?? 0)”) if let message = params.message { print(“Status: (message)”) } } } // Make the request with the progress token let (content, isError) = try await client.callTool( name: “long-running-tool”, arguments: [“input”: “value”], meta: Metadata(progressToken: progressToken) )
### 高级客户端功能
#### 严格配置与非严格配置
配置客户端行为以进行能力检查:
SWIFT```
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28// Strict configuration - fail fast if a capability is missing
let strictClient = Client(
name: "StrictClient",
version: "1.0.0",
configuration: .strict
)
// With strict configuration, calling a method for an unsupported capability
// will throw an error immediately without sending a request
do {
// This will throw an error if resources.list capability is not available
let resources = try await strictClient.listResources()
} catch let error as MCPError {
print("Capability not available: \(error.localizedDescription)")
}
// Default (non-strict) configuration - attempt the request anyway
let client = Client(
name: "FlexibleClient",
version: "1.0.0",
configuration: .default
)
// With default configuration, the client will attempt the request
// even if the capability wasn't advertised by the server
do {
let resources = try await client.listResources()
} catch let error as MCPError {
// Still handle the error if the server rejects the request
print("Server rejected request: \(error.localizedDescription)")
}
请求批处理
通过在单个批中发送多个请求来提高性能:
SWIFT``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23// Array to hold tool call tasks var toolTasks: [Task<CallTool.Result, Swift.Error>] = [] // Send a batch of requests try await client.withBatch { batch in // Add multiple tool calls to the batch for i in 0..<10 { toolTasks.append( try await batch.addRequest( CallTool.request(.init(name: “square”, arguments: [“n”: Value(i)])) ) ) } } // Process results after the batch is sent print(“Processing (toolTasks.count) tool results…”) for (index, task) in toolTasks.enumerated() { do { let result = try await task.value print(“(index): (result.content)”) } catch { print(“(index) failed: (error)”) } }
您还可以批量处理不同类型的请求:
SWIFT```
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23// Declare task variables
var pingTask: Task<Ping.Result, Error>?
var promptTask: Task<GetPrompt.Result, Error>?
// Send a batch with different request types
try await client.withBatch { batch in
pingTask = try await batch.addRequest(Ping.request())
promptTask = try await batch.addRequest(
GetPrompt.request(.init(name: "greeting"))
)
}
// Process individual results
do {
if let pingTask = pingTask {
try await pingTask.value
print("Ping successful")
}
if let promptTask = promptTask {
let promptResult = try await promptTask.value
print("Prompt: \(promptResult.description ?? "None")")
}
} catch {
print("Error processing batch results: \(error)")
}
[!注意]
Server自动处理来自MCP客户端的批处理请求。
服务器使用情况
服务器组件允许您的应用程序承载模型功能并响应客户端请求。
基本服务器设置
SWIFT``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17import MCP // Create a server with given capabilities let server = Server( name: “MyModelServer”, version: “1.0.0”, capabilities: .init( completions: .init(), logging: .init(), prompts: .init(listChanged: true), resources: .init(subscribe: true, listChanged: true), tools: .init(listChanged: true) ) ) // Create transport and start server let transport = StdioTransport() try await server.start(transport: transport) // Now register handlers for the capabilities you’ve enabled
### 工具
注册工具处理程序以响应客户端工具调用:
SWIFT```
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47// Register a tool list handler
await server.withMethodHandler(ListTools.self) { _ in
let tools = [
Tool(
name: "weather",
description: "Get current weather for a location",
inputSchema: .object([
"properties": .object([
"location": .string("City name or coordinates"),
"units": .string("Units of measurement, e.g., metric, imperial")
])
])
),
Tool(
name: "calculator",
description: "Perform calculations",
inputSchema: .object([
"properties": .object([
"expression": .string("Mathematical expression to evaluate")
])
])
)
]
return .init(tools: tools)
}
// Register a tool call handler
await server.withMethodHandler(CallTool.self) { params in
switch params.name {
case "weather":
let location = params.arguments?["location"]?.stringValue ?? "Unknown"
let units = params.arguments?["units"]?.stringValue ?? "metric"
let weatherData = getWeatherData(location: location, units: units) // Your implementation
return .init(
content: [.text("Weather for \(location): \(weatherData.temperature)°, \(weatherData.conditions)")],
isError: false
)
case "calculator":
if let expression = params.arguments?["expression"]?.stringValue {
let result = evaluateExpression(expression) // Your implementation
return .init(content: [.text("\(result)")], isError: false)
} else {
return .init(content: [.text("Missing expression parameter")], isError: true)
}
default:
return .init(content: [.text("Unknown tool")], isError: true)
}
}
资源
实现数据访问的资源处理程序:
SWIFT``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48// Register a resource list handler await server.withMethodHandler(ListResources.self) { params in let resources = [ Resource( name: “Knowledge Base Articles”, uri: “resource://knowledge-base/articles”, description: “Collection of support articles and documentation” ), Resource( name: “System Status”, uri: “resource://system/status”, description: “Current system operational status” ) ] return .init(resources: resources, nextCursor: nil) } // Register a resource read handler await server.withMethodHandler(ReadResource.self) { params in switch params.uri { case “resource://knowledge-base/articles”: return .init(contents: [Resource.Content.text(”# Knowledge Base\n\nThis is the content of the knowledge base…”, uri: params.uri)]) case “resource://system/status”: let status = getCurrentSystemStatus() // Your implementation let statusJson = """ { “status”: “(status.overall)”, “components”: { “database”: “(status.database)”, “api”: “(status.api)”, “model”: “(status.model)” }, “lastUpdated”: “(status.timestamp)” } """ return .init(contents: [Resource.Content.text(statusJson, uri: params.uri, mimeType: “application/json”)]) default: throw MCPError.invalidParams(“Unknown resource URI: (params.uri)”) } } // Register a resource subscribe handler await server.withMethodHandler(ResourceSubscribe.self) { params in // Store subscription for later notifications. // Client identity for multi-client scenarios needs to be managed by the server application, // potentially using information from the initialize handshake if the server handles one client post-init. // addSubscription(clientID: /* some_client_identifier */, uri: params.uri) print(“Client subscribed to (params.uri). Server needs to implement logic to track this subscription.”) return .init() }
### 提示
实现提示处理程序:
SWIFT```
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43// Register a prompt list handler
await server.withMethodHandler(ListPrompts.self) { params in
let prompts = [
Prompt(
name: "interview",
description: "Job interview conversation starter",
arguments: [
.init(name: "position", description: "Job position", required: true),
.init(name: "company", description: "Company name", required: true),
.init(name: "interviewee", description: "Candidate name")
]
),
Prompt(
name: "customer-support",
description: "Customer support conversation starter",
arguments: [
.init(name: "issue", description: "Customer issue", required: true),
.init(name: "product", description: "Product name", required: true)
]
)
]
return .init(prompts: prompts, nextCursor: nil)
}
// Register a prompt get handler
await server.withMethodHandler(GetPrompt.self) { params in
switch params.name {
case "interview":
let position = params.arguments?["position"]?.stringValue ?? "Software Engineer"
let company = params.arguments?["company"]?.stringValue ?? "Acme Corp"
let interviewee = params.arguments?["interviewee"]?.stringValue ?? "Candidate"
let description = "Job interview for \(position) position at \(company)"
let messages: [Prompt.Message] = [
.user(.text(text: "You are an interviewer for the \(position) position at \(company).")),
.user(.text(text: "Hello, I'm \(interviewee) and I'm here for the \(position) interview.")),
.assistant(.text(text: "Hi \(interviewee), welcome to \(company)! I'd like to start by asking about your background and experience."))
]
return .init(description: description, messages: messages)
case "customer-support":
// Similar implementation for customer support prompt
default:
throw MCPError.invalidParams("Unknown prompt name: \(params.name)")
}
}
补全
服务器可以为提示和资源模板参数提供自动补全建议:
SWIFT``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47// Enable completions capability let server = Server( name: “MyServer”, version: “1.0.0”, capabilities: .init( completions: .init(), prompts: .init(listChanged: true) ) ) // Register a completion handler await server.withMethodHandler(Complete.self) { params in // Get the argument being completed let argumentName = params.argument.name let currentValue = params.argument.value // Check which prompt or resource is being completed switch params.ref { case .prompt(let promptRef): // Provide completions for a prompt argument if promptRef.name == “code_review” && argumentName == “language” { // Simple prefix matching let allLanguages = [“python”, “perl”, “php”, “javascript”, “java”, “swift”] let matches = allLanguages.filter { $0.hasPrefix(currentValue.lowercased()) } return .init( completion: .init( values: Array(matches.prefix(100)), // Max 100 items total: matches.count, hasMore: matches.count > 100 ) ) } case .resource(let resourceRef): // Provide completions for a resource template argument if resourceRef.uri == “file:///{path}” && argumentName == “path” { // Return directory suggestions let suggestions = try getDirectoryCompletions(for: currentValue) return .init( completion: .init( values: suggestions, total: suggestions.count, hasMore: false ) ) } } // No completions available return .init(completion: .init(values: [], total: 0, hasMore: false)) }
您还可以使用已解析参数的上下文:
SWIFT```
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17await server.withMethodHandler(Complete.self) { params in
// Access context from previous argument completions
if let context = params.context,
let language = context.arguments["language"]?.stringValue {
// Provide framework suggestions based on selected language
if language == "python" {
let frameworks = ["flask", "django", "fastapi", "tornado"]
let matches = frameworks.filter {
$0.hasPrefix(params.argument.value.lowercased())
}
return .init(
completion: .init(values: matches, total: matches.count, hasMore: false)
)
}
}
return .init(completion: .init(values: [], total: 0, hasMore: false))
}
采样
服务器可以通过采样向客户端请求LLM完成。这使得代理行为成为可能,服务器可以在保持人工监督的同时请求人工智能的帮助。
SWIFT``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24// Enable sampling capability in server let server = Server( name: “MyModelServer”, version: “1.0.0”, capabilities: .init( sampling: .init(), // Enable sampling capability tools: .init(listChanged: true) ) ) // Request sampling from the connected client do { let result = try await server.requestSampling( messages: [ .user(“Analyze this data and suggest next steps”) ], systemPrompt: “You are a helpful data analyst”, temperature: 0.7, maxTokens: 150 ) // Use the LLM completion in your server logic print(“LLM suggested: (result.content)”) } catch { print(“Sampling request failed: (error)”) }
采样支持强大的代理工作流程:
- **决策**:要求法学硕士在选项之间做出选择
- **内容生成**:请求草稿供用户批准
- **数据分析**:获取复杂数据的人工智能见解
- **多步推理**:通过工具调用实现链式AI完成
### 精英
服务器可以通过启发向用户请求信息:
SWIFT```
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32// Ask the user to provide some additional information
let schema = Elicitation.RequestSchema(
title: "Additional Information Required",
description: "Please provide the following details to continue",
properties: [
"name": .object([
"type": .string("string"),
"description": .string("Your full name")
]),
"confirmed": .object([
"type": .string("boolean"),
"description": .string("Do you confirm the provided information?")
])
],
required: ["name", "confirmed"]
)
let result = try await server.requestElicitation(
message: "Some details are needed before proceeding",
requestedSchema: schema
)
switch result.action {
case .accept:
if let content = result.content {
let name = content["name"]?.stringValue
let confirmed = content["confirmed"]?.boolValue
// Use the collected data...
}
case .decline:
throw MCPError.invalidRequest("User declined to provide information")
case .cancel:
throw MCPError.invalidRequest("Operation canceled by user")
}
对于基于URL的启发(例如OAuth流),使用URL重载:
SWIFT``` 1 2 3 4 5let result = try await server.requestElicitation( message: “Please sign in to continue”, url: “https://example.com/oauth/authorize?client_id=…”, elicitationId: UUID().uuidString )
### 根
服务器可以请求客户端公开的文件系统根列表:
SWIFT```
1
2
3
4
5
6
7
8
9
10
11// Request roots from the connected client
// (requires the client to declare the roots capability)
let roots = try await server.listRoots()
for root in roots {
print("Root: \(root.name ?? root.uri) at \(root.uri)")
}
// React to root list changes
await server.onNotification(RootsListChangedNotification.self) { _ in
let updatedRoots = try await server.listRoots()
print("Roots updated: \(updatedRoots.map { $0.uri })")
}
日志记录
服务器可以向客户端发送结构化日志消息:
SWIFT``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40// Enable logging capability let server = Server( name: “MyServer”, version: “1.0.0”, capabilities: .init( logging: .init(), tools: .init(listChanged: true) ) ) // Send log messages at different severity levels try await server.log( level: .info, logger: “database”, data: Value.object([ “message”: .string(“Database connected successfully”), “host”: .string(“localhost”), “port”: .int(5432) ]) ) try await server.log( level: .error, logger: “api”, data: Value.object([ “message”: .string(“Request failed”), “statusCode”: .int(500), “error”: .string(“Internal server error”) ]) ) // You can also use codable types directly struct ErrorLog: Codable { let message: String let code: Int let timestamp: String } let errorLog = ErrorLog( message: “Operation failed”, code: 500, timestamp: ISO8601DateFormatter().string(from: Date()) ) try await server.log(level: .error, logger: “operations”, data: errorLog)
客户端可以控制接收哪些日志级别:
SWIFT```
1
2
3
4
5
6
7
8// Register a handler for client's logging level preferences
await server.withMethodHandler(SetLoggingLevel.self) { params in
let minimumLevel = params.level
// Store the client's preference and filter log messages accordingly
// (Implementation depends on your server architecture)
storeLogLevel(minimumLevel)
return Empty()
}
进度跟踪
服务器可以在长时间运行的工具调用期间通过读取 progressToken 从请求元数据和发送 ProgressNotification 信息:
SWIFT``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25await server.withMethodHandler(CallTool.self) { params in // Read the progress token from request metadata guard let token = params._meta?.progressToken else { // No progress token provided — run without reporting progress return .init(content: [.text(“Done”)], isError: false) } // Report initial progress let started = ProgressNotification.message( .init(progressToken: token, progress: 0, total: 100) ) try await server.notify(started) // … do work … // Report intermediate progress let halfway = ProgressNotification.message( .init(progressToken: token, progress: 50, total: 100, message: “Halfway there”) ) try await server.notify(halfway) // … do more work … // Report completion let done = ProgressNotification.message( .init(progressToken: token, progress: 100, total: 100, message: “Complete”) ) try await server.notify(done) return .init(content: [.text(“Done”)], isError: false) }
#### 初始化钩子
使用初始化挂钩控制客户端连接:
SWIFT```
1
2
3
4
5
6
7
8
9
10
11
12
13
14// Start the server with an initialize hook
try await server.start(transport: transport) { clientInfo, clientCapabilities in
// Validate client info
guard clientInfo.name != "BlockedClient" else {
throw MCPError.invalidRequest("This client is not allowed")
}
// You can also inspect client capabilities
if clientCapabilities.sampling == nil {
print("Client does not support sampling")
}
// Perform any server-side setup based on client info
print("Client \(clientInfo.name) v\(clientInfo.version) connected")
// If the hook completes without throwing, initialization succeeds
}
优雅地关闭
我们建议使用 Swift服务生命周期 用于管理服务的启动和关闭。
首先,将依赖关系添加到您的 Package.swift:
SWIFT``` 1.package(url: “https://github.com/swift-server/swift-service-lifecycle.git”, from: “2.3.0”),
然后将MCP服务器实现为 `Service`:
SWIFT```
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21import MCP
import ServiceLifecycle
import Logging
struct MCPService: Service {
let server: Server
let transport: Transport
init(server: Server, transport: Transport) {
self.server = server
self.transport = transport
}
func run() async throws {
// Start the server
try await server.start(transport: transport)
// Keep running until external cancellation
try await Task.sleep(for: .days(365 * 100)) // Effectively forever
}
func shutdown() async throws {
// Gracefully shutdown the server
await server.stop()
}
}
然后在您的应用程序中使用它:
SWIFT``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39import MCP import ServiceLifecycle import Logging let logger = Logger(label: “com.example.mcp-server”) // Create the MCP server let server = Server( name: “MyModelServer”, version: “1.0.0”, capabilities: .init( prompts: .init(listChanged: true), resources: .init(subscribe: true, listChanged: true), tools: .init(listChanged: true) ) ) // Add handlers directly to the server await server.withMethodHandler(ListTools.self) { _ in // Your implementation return .init(tools: [ Tool(name: “example”, description: “An example tool”) ]) } await server.withMethodHandler(CallTool.self) { params in // Your implementation return .init(content: [.text(“Tool result”)], isError: false) } // Create MCP service and other services let transport = StdioTransport(logger: logger) let mcpService = MCPService(server: server, transport: transport) let databaseService = DatabaseService() // Your other services // Create service group with signal handling let serviceGroup = ServiceGroup( services: [mcpService, databaseService], configuration: .init( gracefulShutdownSignals: [.sigterm, .sigint] ), logger: logger ) // Run the service group - this blocks until shutdown try await serviceGroup.run()
这种方法有几个好处:
- **信号处理**:
自动捕获信号情报、信号终端并触发优雅关机
- **优雅关闭**:
正确关闭MCP服务器和其他服务
- **基于超时的关机**:
可配置的关机超时,以防止挂起进程
- **高级服务管理**:
`[ServiceLifecycle](https://swiftpackageindex.com/swift-server/swift-service-lifecycle/documentation/servicelifecycle)`
还支持服务依赖性、条件服务等,
以及其他有用的功能。
## 运输
MCP的传输层处理客户端和服务器之间的通信。
Swift SDK提供了多种内置传输:
| 交通 | 描述 | 平台 | 最适合 |
| --- | --- | --- | --- |
| StdioTransport | 机具 [stdio传输](https://modelcontextprotocol.io/specification/2025-06-18/basic/transports#stdio) 使用标准输入/输出流 | 苹果平台、带glibc的Linux | 本地子流程、CLI工具 |
| HTTPClientTransport | 机具 [可流式HTTP传输](https://modelcontextprotocol.io/specification/2025-06-18/basic/transports#streamable-http) 使用Foundation的URL加载系统 | 所有有Foundation的平台 | 远程服务器、web应用程序 |
| StatelessHTTPServerTransport | 具有简单请求-响应语义的HTTP服务器传输;无会话管理或SSE流媒体 | 所有具有Foundation的平台 | 简单的HTTP服务器、无服务器/边缘功能 |
| StatefulHTTPServerTransport | 具有完整会话管理和服务器发起消息的SSE流式传输的HTTP服务器传输 | 所有具有Foundation的平台 | 功能齐全的HTTP服务器,流式通知 |
| InMemoryTransport | 用于同一进程内直接通信的自定义内存传输 | 所有平台 | 测试、调试、同一进程客户端-服务器通信 |
| NetworkTransport | 使用Apple的网络框架进行TCP/UDP连接的自定义传输 | 仅限Apple平台 | 低级网络,自定义协议 |
### 自定义运输实施
您可以通过符合以下要求来实现自定义传输 `Transport` 协议:
SWIFT```
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29import MCP
import Foundation
public actor MyCustomTransport: Transport {
public nonisolated let logger: Logger
private var isConnected = false
private let messageStream: AsyncThrowingStream<Data, any Swift.Error>
private let messageContinuation: AsyncThrowingStream<Data, any Swift.Error>.Continuation
public init(logger: Logger? = nil) {
self.logger = logger ?? Logger(label: "my.custom.transport")
var continuation: AsyncThrowingStream<Data, any Swift.Error>.Continuation!
self.messageStream = AsyncThrowingStream { continuation = $0 }
self.messageContinuation = continuation
}
public func connect() async throws {
// Implement your connection logic
isConnected = true
}
public func disconnect() async {
// Implement your disconnection logic
isConnected = false
messageContinuation.finish()
}
public func send(_ data: Data) async throws {
// Implement your message sending logic
}
public func receive() -> AsyncThrowingStream<Data, any Swift.Error> {
return messageStream
}
}
认证
HTTPClientTransport 支持OAuth 2.1承载令牌授权
MCP授权规范.
当服务器返回时 401 Unauthorized 或 403 Forbidden,运输自动:
- 在以下位置发现受保护的资源元数据(RFC 9728)
/.well-known/oauth-protected-resource - 发现授权服务器元数据(RFC 8414/OIDC发现1.0)
- 如果需要,动态注册客户端(RFC 7591)
- 使用配置的授权流(强制执行PKCE)获取承载令牌
- 返回附有令牌的原始请求
默认情况下,授权是选择加入和禁用的。
通过A OAuthAuthorizer 到 HTTPClientTransport(authorizer:) 以启用它。
客户端:客户端凭据流
使用预先共享的客户端密钥进行机器对机器身份验证:
SWIFT``` 1 2 3 4 5 6 7 8 9 10 11let config = OAuthConfiguration( grantType: .clientCredentials, authentication: .clientSecretBasic(clientID: “my-app”, clientSecret: “s3cr3t”) ) let authorizer = OAuthAuthorizer(configuration: config) let transport = HTTPClientTransport( endpoint: URL(string: “https://api.example.com/mcp”)!, authorizer: authorizer ) let client = Client(name: “MyClient”, version: “1.0.0”) try await client.connect(transport: transport)
### 客户端:授权码流
使用PKCE进行交互式、基于浏览器的身份验证。
实施 `OAuthAuthorizationDelegate` 打开授权URL并捕获重定向:
SWIFT```
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17struct MyAuthDelegate: OAuthAuthorizationDelegate {
func presentAuthorizationURL(_ url: URL) async throws -> URL {
// Open the URL in a browser/webview and wait for the callback redirect URI.
// The returned URL must include the authorization code and state parameters.
return try await openBrowserAndWaitForCallback(url)
}
}
let config = OAuthConfiguration(
grantType: .authorizationCode,
authentication: .none(clientID: "my-app"),
authorizationDelegate: MyAuthDelegate()
)
let authorizer = OAuthAuthorizer(configuration: config)
let transport = HTTPClientTransport(
endpoint: URL(string: "https://api.example.com/mcp")!,
authorizer: authorizer
)
客户端:自定义令牌提供程序
通过以下方式提供外部获取的令牌(例如,从系统凭据存储中) accessTokenProvider.
SDK在发现完成后调用此闭包。返回 nil 退回到配置的授权流:
SWIFT``` 1 2 3 4 5 6 7 8let config = OAuthConfiguration( grantType: .clientCredentials, authentication: .none(clientID: “my-app”), accessTokenProvider: { context, session in // context contains the discovered resource URI, token endpoint, scopes, etc. return try await KeychainTokenStore.shared.loadToken(for: context.resource) } )
### 客户端:自定义令牌存储
默认情况下,令牌存储在内存中,并在进程退出时丢失。
要在会话之间持久化令牌,请实现 `TokenStorage` 并将其传递给 `OAuthAuthorizer`:
SWIFT```
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16final class KeychainTokenStorage: TokenStorage {
func save(_ token: OAuthAccessToken) {
// Encode and store token.value in the system Keychain
}
func load() -> OAuthAccessToken? {
// Load and decode token from the Keychain
return nil
}
func clear() {
// Delete from the Keychain
}
}
let authorizer = OAuthAuthorizer(
configuration: config,
tokenStorage: KeychainTokenStorage()
)
客户: private_key_jwt 认证
使用非对称密钥对令牌端点进行身份验证(RFC 7523)。 SDK为P-256密钥提供了一个内置的ES256助手:
SWIFT``` 1 2 3 4 5 6 7 8 9 10 11 12 13let config = OAuthConfiguration( grantType: .clientCredentials, authentication: .privateKeyJWT( clientID: “my-app”, assertionFactory: { tokenEndpoint, clientID in try OAuthConfiguration.makePrivateKeyJWTAssertion( clientID: clientID, tokenEndpoint: tokenEndpoint, privateKeyPEM: myEC256PrivateKeyPEM // PEM-encoded P-256 private key ) } ) )
### 客户端:端点覆盖
通过提供显式端点URL跳过自动发现。
当服务器不发布众所周知的元数据文档时很有用:
SWIFT```
1
2
3
4
5
6
7let config = OAuthConfiguration(
grantType: .clientCredentials,
authentication: .clientSecretBasic(clientID: "app", clientSecret: "secret"),
endpointOverrides: OAuthConfiguration.EndpointOverrides(
tokenEndpoint: URL(string: "https://auth.example.com/oauth/token")!
)
)
服务器:提供受保护的资源元数据
根据MCP授权规范,服务器 必须 提供受保护的资源元数据
在 /.well-known/oauth-protected-resource 因此客户端可以发现授权服务器端点。
使用 ProtectedResourceMetadataValidator 作为流程中的第一个验证器,以便
在承载令牌检查之前处理未经身份验证的发现请求:
SWIFT``` 1 2 3 4 5 6let metadata = OAuthProtectedResourceServerMetadata( resource: “https://api.example.com”, authorizationServers: [URL(string: “https://auth.example.com”)!], scopesSupported: [“read”, “write”] ) let metadataValidator = ProtectedResourceMetadataValidator(metadata: metadata)
### 服务器:验证承载令牌
使用 `BearerTokenValidator` 以验证传入的请求。
你的 `tokenValidator` 闭包 **必须** 验证令牌的 `aud` 声称防止
令牌替换攻击,其中针对您的服务器重放用于另一个资源的令牌:
SWIFT```
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23let resourceIdentifier = URL(string: "https://api.example.com")!
let bearerValidator = BearerTokenValidator(
resourceMetadataURL: URL(string: "https://api.example.com/.well-known/oauth-protected-resource")!,
resourceIdentifier: resourceIdentifier,
tokenValidator: { token, request, context in
guard let claims = try? verifyAndDecodeJWT(token) else {
return .invalidToken(errorDescription: "Token verification failed")
}
// Pass audience and expiry to BearerTokenInfo; the SDK validates the
// audience claim against resourceIdentifier automatically.
return .valid(BearerTokenInfo(
audience: claims.audience,
expiresAt: claims.expiresAt
))
}
)
let pipeline = StandardValidationPipeline(validators: [
metadataValidator, // serves /.well-known/oauth-protected-resource unauthenticated
bearerValidator, // validates Bearer tokens on all other requests
AcceptHeaderValidator(mode: .sseRequired),
ContentTypeValidator(),
SessionValidator(),
])
平台可用性
Swift SDK有以下平台要求:
| 平台 | 最低版本 |
|---|---|
| macOS | 13.0+ |
| iOS/Mac Catalyst | 16.0+ |
| watchOS | 9.0+ |
| tvOS | 16.0+ |
| visionOS | 1.0+ |
| Linux | 发行版 glibc 或 musl,包括Ubuntu、Debian、Fedora和Alpine Linux |
虽然核心库可以在任何支持Swift 6的平台上运行 (包括Linux),运行客户端或服务器需要兼容的传输。
调试和记录
启用日志记录以帮助解决问题:
SWIFT``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14import Logging import MCP // Configure Logger LoggingSystem.bootstrap { label in var handler = StreamLogHandler.standardOutput(label: label) handler.logLevel = .debug return handler } // Create logger let logger = Logger(label: “com.example.mcp”) // Pass to client/server let client = Client(name: “MyApp”, version: “1.0.0”) // Pass to transport let transport = StdioTransport(logger: logger)
## 其他资源
- [MCP规范](https://modelcontextprotocol.io/specification/2025-11-25)
- [协议文件](https://modelcontextprotocol.io)
- [GitHub 仓库](https://github.com/modelcontextprotocol/swift-sdk)
## 更新日志
该项目如下 [语义版本控制](https://semver.org/).
对于1.0之前的版本,
次要版本增量(0.X.0)可能包含破坏性更改。
有关每个版本中更改的详细信息,
看 [GitHub发布页面](https://github.com/modelcontextprotocol/swift-sdk/releases).
## 许可证
该项目在Apache 2.0下获得了新贡献的许可,现有代码在MIT下。看 许可证 文件以获取详细信息。