Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 77 additions & 13 deletions Sources/MCP/Base/Transports/NetworkTransport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ import Logging

// Track connection state for continuations
private var connectionContinuationResumed = false
private var connectContinuationID: UUID?

// Connection is marked nonisolated(unsafe) to allow access from closures
private nonisolated(unsafe) var connection: NetworkConnectionProtocol
Expand Down Expand Up @@ -319,6 +320,8 @@ import Logging

// Reset continuation state
connectionContinuationResumed = false
let continuationID = UUID()
connectContinuationID = continuationID

// Wait for connection to be ready
try await withCheckedThrowingContinuation {
Expand All @@ -334,12 +337,21 @@ import Logging
Task { @MainActor in
switch state {
case .ready:
await self.handleConnectionReady(continuation: continuation)
await self.handleConnectionReady(
continuation: continuation,
continuationID: continuationID
)
case .failed(let error):
await self.handleConnectionFailed(
error: error, continuation: continuation)
error: error,
continuation: continuation,
continuationID: continuationID
)
case .cancelled:
await self.handleConnectionCancelled(continuation: continuation)
await self.handleConnectionCancelled(
continuation: continuation,
continuationID: continuationID
)
case .waiting(let error):
self.logger.debug("Connection waiting: \(error)")
case .preparing:
Expand All @@ -356,20 +368,47 @@ import Logging
}
}

private func isCurrentConnectAttempt(_ continuationID: UUID) -> Bool {
connectContinuationID == continuationID
}

private func resumeConnectContinuation(
_ continuation: CheckedContinuation<Void, Swift.Error>,
with result: Result<Void, Swift.Error>,
continuationID: UUID
) {
guard isCurrentConnectAttempt(continuationID) else { return }
connectContinuationID = nil
switch result {
case .success:
continuation.resume()
case .failure(let error):
continuation.resume(throwing: error)
}
}

/// Handles when the connection reaches the ready state
///
/// - Parameter continuation: The continuation to resume when connection is ready
private func handleConnectionReady(continuation: CheckedContinuation<Void, Swift.Error>)
private func handleConnectionReady(
continuation: CheckedContinuation<Void, Swift.Error>,
continuationID: UUID
)
async
{
guard isCurrentConnectAttempt(continuationID) else { return }
if !connectionContinuationResumed {
connectionContinuationResumed = true
isConnected = true

// Reset reconnect attempt counter on successful connection
reconnectAttempt = 0
logger.debug("Network transport connected successfully")
continuation.resume()
resumeConnectContinuation(
continuation,
with: .success(()),
continuationID: continuationID
)

// Start the receive loop after connection is established
Task { await self.receiveLoop() }
Expand Down Expand Up @@ -449,34 +488,43 @@ import Logging
/// - error: The error that caused the connection to fail
/// - continuation: The continuation to resume with the error
private func handleConnectionFailed(
error: Swift.Error, continuation: CheckedContinuation<Void, Swift.Error>
error: Swift.Error,
continuation: CheckedContinuation<Void, Swift.Error>,
continuationID: UUID
) async {
guard isCurrentConnectAttempt(continuationID) else { return }
if !connectionContinuationResumed {
connectionContinuationResumed = true
logger.error("Connection failed: \(error)")

await handleReconnection(
error: error,
continuation: continuation,
context: "failure"
context: "failure",
continuationID: continuationID
)
}
}

/// Handles connection cancellation
///
/// - Parameter continuation: The continuation to resume with cancellation error
private func handleConnectionCancelled(continuation: CheckedContinuation<Void, Swift.Error>)
private func handleConnectionCancelled(
continuation: CheckedContinuation<Void, Swift.Error>,
continuationID: UUID
)
async
{
guard isCurrentConnectAttempt(continuationID) else { return }
if !connectionContinuationResumed {
connectionContinuationResumed = true
logger.warning("Connection cancelled")

await handleReconnection(
error: MCPError.internalError("Connection cancelled"),
continuation: continuation,
context: "cancellation"
context: "cancellation",
continuationID: continuationID
)
}
}
Expand All @@ -490,8 +538,10 @@ import Logging
private func handleReconnection(
error: Swift.Error,
continuation: CheckedContinuation<Void, Swift.Error>,
context: String
context: String,
continuationID: UUID
) async {
guard isCurrentConnectAttempt(continuationID) else { return }
if !isStopping,
reconnectionConfig.enabled,
reconnectAttempt < reconnectionConfig.maxAttempts
Expand All @@ -508,19 +558,33 @@ import Logging
// Schedule reconnection attempt after delay
Task {
try? await Task.sleep(for: .seconds(delay))
guard self.isCurrentConnectAttempt(continuationID) else { return }
if !isStopping {
// Cancel the current connection before attempting to reconnect.
self.connection.cancel()
// Resume original continuation with error; outer logic or a new call to connect() will handle retry.
continuation.resume(throwing: error)
self.resumeConnectContinuation(
continuation,
with: .failure(error),
continuationID: continuationID
)
} else {
continuation.resume(throwing: error) // Stopping, so fail.
self.resumeConnectContinuation(
continuation,
with: .failure(error),
continuationID: continuationID
) // Stopping, so fail.
}
}
} else {
guard isCurrentConnectAttempt(continuationID) else { return }
// Not configured to reconnect, exceeded max attempts, or stopping
self.connection.cancel() // Ensure connection is cancelled
continuation.resume(throwing: error)
resumeConnectContinuation(
continuation,
with: .failure(error),
continuationID: continuationID
)
}
}

Expand Down