diff --git a/Sources/MCP/Base/Transports/NetworkTransport.swift b/Sources/MCP/Base/Transports/NetworkTransport.swift index b34af418..5543086e 100644 --- a/Sources/MCP/Base/Transports/NetworkTransport.swift +++ b/Sources/MCP/Base/Transports/NetworkTransport.swift @@ -244,6 +244,7 @@ import Logging // Track connection state for continuations private var connectionContinuationResumed = false + private var connectContinuationID: UUID? // Connection is marked nonisolated(unsafe) to allow access from closures private nonisolated(unsafe) var connection: NetworkConnectionProtocol @@ -319,6 +320,8 @@ import Logging // Reset continuation state connectionContinuationResumed = false + let continuationID = UUID() + connectContinuationID = continuationID // Wait for connection to be ready try await withCheckedThrowingContinuation { @@ -334,12 +337,21 @@ import Logging Task { @MainActor in switch state { case .ready: - await self.handleConnectionReady(continuation: continuation) + await self.handleConnectionReady( + continuation: continuation, + continuationID: continuationID + ) case .failed(let error): await self.handleConnectionFailed( - error: error, continuation: continuation) + error: error, + continuation: continuation, + continuationID: continuationID + ) case .cancelled: - await self.handleConnectionCancelled(continuation: continuation) + await self.handleConnectionCancelled( + continuation: continuation, + continuationID: continuationID + ) case .waiting(let error): self.logger.debug("Connection waiting: \(error)") case .preparing: @@ -356,12 +368,35 @@ import Logging } } + private func isCurrentConnectAttempt(_ continuationID: UUID) -> Bool { + connectContinuationID == continuationID + } + + private func resumeConnectContinuation( + _ continuation: CheckedContinuation, + with result: Result, + continuationID: UUID + ) { + guard isCurrentConnectAttempt(continuationID) else { return } + connectContinuationID = nil + switch result { + case .success: + continuation.resume() + case .failure(let error): + continuation.resume(throwing: error) + } + } + /// Handles when the connection reaches the ready state /// /// - Parameter continuation: The continuation to resume when connection is ready - private func handleConnectionReady(continuation: CheckedContinuation) + private func handleConnectionReady( + continuation: CheckedContinuation, + continuationID: UUID + ) async { + guard isCurrentConnectAttempt(continuationID) else { return } if !connectionContinuationResumed { connectionContinuationResumed = true isConnected = true @@ -369,7 +404,11 @@ import Logging // Reset reconnect attempt counter on successful connection reconnectAttempt = 0 logger.debug("Network transport connected successfully") - continuation.resume() + resumeConnectContinuation( + continuation, + with: .success(()), + continuationID: continuationID + ) // Start the receive loop after connection is established Task { await self.receiveLoop() } @@ -449,8 +488,11 @@ import Logging /// - error: The error that caused the connection to fail /// - continuation: The continuation to resume with the error private func handleConnectionFailed( - error: Swift.Error, continuation: CheckedContinuation + error: Swift.Error, + continuation: CheckedContinuation, + continuationID: UUID ) async { + guard isCurrentConnectAttempt(continuationID) else { return } if !connectionContinuationResumed { connectionContinuationResumed = true logger.error("Connection failed: \(error)") @@ -458,7 +500,8 @@ import Logging await handleReconnection( error: error, continuation: continuation, - context: "failure" + context: "failure", + continuationID: continuationID ) } } @@ -466,9 +509,13 @@ import Logging /// Handles connection cancellation /// /// - Parameter continuation: The continuation to resume with cancellation error - private func handleConnectionCancelled(continuation: CheckedContinuation) + private func handleConnectionCancelled( + continuation: CheckedContinuation, + continuationID: UUID + ) async { + guard isCurrentConnectAttempt(continuationID) else { return } if !connectionContinuationResumed { connectionContinuationResumed = true logger.warning("Connection cancelled") @@ -476,7 +523,8 @@ import Logging await handleReconnection( error: MCPError.internalError("Connection cancelled"), continuation: continuation, - context: "cancellation" + context: "cancellation", + continuationID: continuationID ) } } @@ -490,8 +538,10 @@ import Logging private func handleReconnection( error: Swift.Error, continuation: CheckedContinuation, - context: String + context: String, + continuationID: UUID ) async { + guard isCurrentConnectAttempt(continuationID) else { return } if !isStopping, reconnectionConfig.enabled, reconnectAttempt < reconnectionConfig.maxAttempts @@ -508,19 +558,33 @@ import Logging // Schedule reconnection attempt after delay Task { try? await Task.sleep(for: .seconds(delay)) + guard self.isCurrentConnectAttempt(continuationID) else { return } if !isStopping { // Cancel the current connection before attempting to reconnect. self.connection.cancel() // Resume original continuation with error; outer logic or a new call to connect() will handle retry. - continuation.resume(throwing: error) + self.resumeConnectContinuation( + continuation, + with: .failure(error), + continuationID: continuationID + ) } else { - continuation.resume(throwing: error) // Stopping, so fail. + self.resumeConnectContinuation( + continuation, + with: .failure(error), + continuationID: continuationID + ) // Stopping, so fail. } } } else { + guard isCurrentConnectAttempt(continuationID) else { return } // Not configured to reconnect, exceeded max attempts, or stopping self.connection.cancel() // Ensure connection is cancelled - continuation.resume(throwing: error) + resumeConnectContinuation( + continuation, + with: .failure(error), + continuationID: continuationID + ) } }