From 309439f97f31faf244ff8a3c6c9e58596414071d Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Fri, 30 Jan 2026 00:52:11 +0100 Subject: [PATCH 1/8] Support different transports in Client --- README.md | 8 +- .../mcp_simple_auth_client/main.py | 19 +- .../mcp_simple_task_client/main.py | 2 +- .../main.py | 2 +- .../mcp_sse_polling_client/main.py | 2 +- examples/snippets/clients/oauth_client.py | 2 +- examples/snippets/clients/streamable_basic.py | 6 +- src/mcp/client/__init__.py | 3 +- src/mcp/client/_memory.py | 56 +-- src/mcp/client/_transport.py | 20 ++ src/mcp/client/client.py | 116 ++++--- src/mcp/client/session_group.py | 2 +- src/mcp/client/streamable_http.py | 20 +- src/mcp/shared/_httpx_utils.py | 4 +- tests/client/test_http_unicode.py | 4 +- tests/client/test_notification_response.py | 8 +- tests/client/transports/test_memory.py | 6 +- tests/server/mcpserver/test_integration.py | 60 +--- tests/shared/test_streamable_http.py | 328 ++++++++---------- 19 files changed, 290 insertions(+), 378 deletions(-) create mode 100644 src/mcp/client/_transport.py diff --git a/README.md b/README.md index 0f0468a19..c2dec7ac4 100644 --- a/README.md +++ b/README.md @@ -2213,11 +2213,7 @@ from mcp.client.streamable_http import streamable_http_client async def main(): # Connect to a streamable HTTP server - async with streamable_http_client("http://localhost:8000/mcp") as ( - read_stream, - write_stream, - _, - ): + async with streamable_http_client("http://localhost:8000/mcp") as (read_stream, write_stream): # Create a session using the client streams async with ClientSession(read_stream, write_stream) as session: # Initialize the connection @@ -2395,7 +2391,7 @@ async def main(): ) async with httpx.AsyncClient(auth=oauth_auth, follow_redirects=True) as custom_client: - async with streamable_http_client("http://localhost:8001/mcp", http_client=custom_client) as (read, write, _): + async with streamable_http_client("http://localhost:8001/mcp", http_client=custom_client) as (read, write): async with ClientSession(read, write) as session: await session.initialize() diff --git a/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py b/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py index 684222dec..5fac56be5 100644 --- a/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py +++ b/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py @@ -14,7 +14,7 @@ import time import webbrowser from http.server import BaseHTTPRequestHandler, HTTPServer -from typing import Any, Callable +from typing import Any from urllib.parse import parse_qs, urlparse import httpx @@ -223,15 +223,15 @@ async def _default_redirect_handler(authorization_url: str) -> None: auth=oauth_auth, timeout=60.0, ) as (read_stream, write_stream): - await self._run_session(read_stream, write_stream, None) + await self._run_session(read_stream, write_stream) else: print("šŸ“” Opening StreamableHTTP transport connection with auth...") async with httpx.AsyncClient(auth=oauth_auth, follow_redirects=True) as custom_client: - async with streamable_http_client( - url=self.server_url, - http_client=custom_client, - ) as (read_stream, write_stream, get_session_id): - await self._run_session(read_stream, write_stream, get_session_id) + async with streamable_http_client(url=self.server_url, http_client=custom_client) as ( + read_stream, + write_stream, + ): + await self._run_session(read_stream, write_stream) except Exception as e: print(f"āŒ Failed to connect: {e}") @@ -243,7 +243,6 @@ async def _run_session( self, read_stream: MemoryObjectReceiveStream[SessionMessage | Exception], write_stream: MemoryObjectSendStream[SessionMessage], - get_session_id: Callable[[], str | None] | None = None, ): """Run the MCP session with the given streams.""" print("šŸ¤ Initializing MCP session...") @@ -254,10 +253,6 @@ async def _run_session( print("✨ Session initialization complete!") print(f"\nāœ… Connected to MCP server at {self.server_url}") - if get_session_id: - session_id = get_session_id() - if session_id: - print(f"Session ID: {session_id}") # Run interactive loop await self.interactive_loop() diff --git a/examples/clients/simple-task-client/mcp_simple_task_client/main.py b/examples/clients/simple-task-client/mcp_simple_task_client/main.py index 1e653d58e..f9e555c8e 100644 --- a/examples/clients/simple-task-client/mcp_simple_task_client/main.py +++ b/examples/clients/simple-task-client/mcp_simple_task_client/main.py @@ -9,7 +9,7 @@ async def run(url: str) -> None: - async with streamable_http_client(url) as (read, write, _): + async with streamable_http_client(url) as (read, write): async with ClientSession(read, write) as session: await session.initialize() diff --git a/examples/clients/simple-task-interactive-client/mcp_simple_task_interactive_client/main.py b/examples/clients/simple-task-interactive-client/mcp_simple_task_interactive_client/main.py index 5f34eb949..d37958f52 100644 --- a/examples/clients/simple-task-interactive-client/mcp_simple_task_interactive_client/main.py +++ b/examples/clients/simple-task-interactive-client/mcp_simple_task_interactive_client/main.py @@ -73,7 +73,7 @@ def get_text(result: CallToolResult) -> str: async def run(url: str) -> None: - async with streamable_http_client(url) as (read, write, _): + async with streamable_http_client(url) as (read, write): async with ClientSession( read, write, diff --git a/examples/clients/sse-polling-client/mcp_sse_polling_client/main.py b/examples/clients/sse-polling-client/mcp_sse_polling_client/main.py index 3b3171205..e91ed9d52 100644 --- a/examples/clients/sse-polling-client/mcp_sse_polling_client/main.py +++ b/examples/clients/sse-polling-client/mcp_sse_polling_client/main.py @@ -31,7 +31,7 @@ async def run_demo(url: str, items: int, checkpoint_every: int) -> None: print(f"Processing {items} items with checkpoints every {checkpoint_every}") print(f"{'=' * 60}\n") - async with streamable_http_client(url) as (read_stream, write_stream, _): + async with streamable_http_client(url) as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: # Initialize the connection print("Initializing connection...") diff --git a/examples/snippets/clients/oauth_client.py b/examples/snippets/clients/oauth_client.py index 6d605afa9..3887c5c8c 100644 --- a/examples/snippets/clients/oauth_client.py +++ b/examples/snippets/clients/oauth_client.py @@ -69,7 +69,7 @@ async def main(): ) async with httpx.AsyncClient(auth=oauth_auth, follow_redirects=True) as custom_client: - async with streamable_http_client("http://localhost:8001/mcp", http_client=custom_client) as (read, write, _): + async with streamable_http_client("http://localhost:8001/mcp", http_client=custom_client) as (read, write): async with ClientSession(read, write) as session: await session.initialize() diff --git a/examples/snippets/clients/streamable_basic.py b/examples/snippets/clients/streamable_basic.py index 87e16f4ba..43bb6396c 100644 --- a/examples/snippets/clients/streamable_basic.py +++ b/examples/snippets/clients/streamable_basic.py @@ -10,11 +10,7 @@ async def main(): # Connect to a streamable HTTP server - async with streamable_http_client("http://localhost:8000/mcp") as ( - read_stream, - write_stream, - _, - ): + async with streamable_http_client("http://localhost:8000/mcp") as (read_stream, write_stream): # Create a session using the client streams async with ClientSession(read_stream, write_stream) as session: # Initialize the connection diff --git a/src/mcp/client/__init__.py b/src/mcp/client/__init__.py index 446d2109b..a1eaf3d7c 100644 --- a/src/mcp/client/__init__.py +++ b/src/mcp/client/__init__.py @@ -1,6 +1,7 @@ """MCP Client module.""" +from mcp.client._transport import Transport from mcp.client.client import Client from mcp.client.session import ClientSession -__all__ = ["Client", "ClientSession"] +__all__ = ["Client", "ClientSession", "Transport"] diff --git a/src/mcp/client/_memory.py b/src/mcp/client/_memory.py index 33a1a186f..27b326e4f 100644 --- a/src/mcp/client/_memory.py +++ b/src/mcp/client/_memory.py @@ -2,17 +2,17 @@ from __future__ import annotations -from collections.abc import AsyncGenerator -from contextlib import asynccontextmanager +from collections.abc import AsyncIterator +from contextlib import AbstractAsyncContextManager, asynccontextmanager +from types import TracebackType from typing import Any import anyio -from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream +from mcp.client._transport import TransportStreams from mcp.server import Server from mcp.server.mcpserver import MCPServer from mcp.shared.memory import create_client_server_memory_streams -from mcp.shared.message import SessionMessage class InMemoryTransport: @@ -23,17 +23,17 @@ class InMemoryTransport: stopped when the context manager exits. Example: - server = MCPServer("test") - transport = InMemoryTransport(server) + ```python + from mcp.client import Client, ClientSession + from mcp.server.mcpserver import MCPServer + from mcp.client.memory import InMemoryTransport - async with transport.connect() as (read_stream, write_stream): + server = MCPServer("test") + async with InMemoryTransport(server) as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: await session.initialize() # Use the session... - - Or more commonly, use with Client: - async with Client(server) as client: - result = await client.call_tool("my_tool", {...}) + ``` """ def __init__(self, server: Server[Any] | MCPServer, *, raise_exceptions: bool = False) -> None: @@ -45,26 +45,15 @@ def __init__(self, server: Server[Any] | MCPServer, *, raise_exceptions: bool = """ self._server = server self._raise_exceptions = raise_exceptions + self._cm: AbstractAsyncContextManager[TransportStreams] | None = None @asynccontextmanager - async def connect( - self, - ) -> AsyncGenerator[ - tuple[ - MemoryObjectReceiveStream[SessionMessage | Exception], - MemoryObjectSendStream[SessionMessage], - ], - None, - ]: - """Connect to the server and return streams for communication. - - Yields: - A tuple of (read_stream, write_stream) for bidirectional communication - """ + async def _connect(self) -> AsyncIterator[TransportStreams]: + """Connect to the server and yield streams for communication.""" # Unwrap MCPServer to get underlying Server - actual_server: Server[Any] if isinstance(self._server, MCPServer): - actual_server = self._server._lowlevel_server # type: ignore[reportPrivateUsage] + # TODO(Marcelo): Make `lowlevel_server` public. + actual_server: Server[Any] = self._server._lowlevel_server # type: ignore[reportPrivateUsage] else: actual_server = self._server @@ -87,3 +76,16 @@ async def connect( yield client_read, client_write finally: tg.cancel_scope.cancel() + + async def __aenter__(self) -> TransportStreams: + """Connect to the server and return streams for communication.""" + self._cm = self._connect() + return await self._cm.__aenter__() + + async def __aexit__( + self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None + ) -> None: + """Close the transport and stop the server.""" + if self._cm is not None: + await self._cm.__aexit__(exc_type, exc_val, exc_tb) + self._cm = None diff --git a/src/mcp/client/_transport.py b/src/mcp/client/_transport.py new file mode 100644 index 000000000..a86362900 --- /dev/null +++ b/src/mcp/client/_transport.py @@ -0,0 +1,20 @@ +"""Transport protocol for MCP clients.""" + +from __future__ import annotations + +from contextlib import AbstractAsyncContextManager +from typing import Protocol + +from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream + +from mcp.shared.message import SessionMessage + +TransportStreams = tuple[MemoryObjectReceiveStream[SessionMessage | Exception], MemoryObjectSendStream[SessionMessage]] + + +class Transport(AbstractAsyncContextManager[TransportStreams], Protocol): + """Protocol for MCP transports. + + A transport is an async context manager that yields read and write streams + for bidirectional communication with an MCP server. + """ diff --git a/src/mcp/client/client.py b/src/mcp/client/client.py index 14a230aa3..6add3ddb8 100644 --- a/src/mcp/client/client.py +++ b/src/mcp/client/client.py @@ -3,10 +3,13 @@ from __future__ import annotations from contextlib import AsyncExitStack +from dataclasses import KW_ONLY, dataclass, field from typing import Any from mcp.client._memory import InMemoryTransport +from mcp.client._transport import Transport from mcp.client.session import ClientSession, ElicitationFnT, ListRootsFnT, LoggingFnT, MessageHandlerFnT, SamplingFnT +from mcp.client.streamable_http import streamable_http_client from mcp.server import Server from mcp.server.mcpserver import MCPServer from mcp.shared.session import ProgressFnT @@ -30,6 +33,7 @@ ) +@dataclass class Client: """A high-level MCP client for connecting to MCP servers. @@ -55,52 +59,53 @@ async def main(): ``` """ - # TODO(felixweinberger): Expand to support all transport types: - # - Add ClientTransport base class with connect_session() method - # - Add StreamableHttpTransport, SSETransport, StdioTransport - # - Add infer_transport() to auto-detect transport from input type - # - Accept URL strings, Path objects, config dicts in constructor - # - Add auth support (OAuth, bearer tokens) + server: Server[Any] | MCPServer | Transport | str + """The MCP server to connect to. - def __init__( - self, - server: Server[Any] | MCPServer, - *, - # TODO(Marcelo): When do `raise_exceptions=True` actually raises? - raise_exceptions: bool = False, - read_timeout_seconds: float | None = None, - sampling_callback: SamplingFnT | None = None, - list_roots_callback: ListRootsFnT | None = None, - logging_callback: LoggingFnT | None = None, - message_handler: MessageHandlerFnT | None = None, - client_info: Implementation | None = None, - elicitation_callback: ElicitationFnT | None = None, - ) -> None: - """Initialize the client with a server. + If the server is a `Server` or `MCPServer` instance, it will be wrapped in an `InMemoryTransport`. + If the server is a URL string, it will be used as the URL for a `streamable_http_client` transport. + If the server is a `Transport` instance, it will be used directly. + """ - Args: - server: The MCP server to connect to (Server or MCPServer instance) - raise_exceptions: Whether to raise exceptions from the server - read_timeout_seconds: Timeout for read operations - sampling_callback: Callback for handling sampling requests - list_roots_callback: Callback for handling list roots requests - logging_callback: Callback for handling logging notifications - message_handler: Callback for handling raw messages - client_info: Client implementation info to send to server - elicitation_callback: Callback for handling elicitation requests - """ - self._server = server - self._raise_exceptions = raise_exceptions - self._read_timeout_seconds = read_timeout_seconds - self._sampling_callback = sampling_callback - self._list_roots_callback = list_roots_callback - self._logging_callback = logging_callback - self._message_handler = message_handler - self._client_info = client_info - self._elicitation_callback = elicitation_callback - - self._session: ClientSession | None = None - self._exit_stack: AsyncExitStack | None = None + _: KW_ONLY + + # TODO(Marcelo): When do `raise_exceptions=True` actually raises? + raise_exceptions: bool = False + """Whether to raise exceptions from the server.""" + + read_timeout_seconds: float | None = None + """Timeout for read operations.""" + + sampling_callback: SamplingFnT | None = None + """Callback for handling sampling requests.""" + + list_roots_callback: ListRootsFnT | None = None + """Callback for handling list roots requests.""" + + logging_callback: LoggingFnT | None = None + """Callback for handling logging notifications.""" + + # TODO(Marcelo): Why do we have both "callback" and "handler"? + message_handler: MessageHandlerFnT | None = None + """Callback for handling raw messages.""" + + client_info: Implementation | None = None + """Client implementation info to send to server.""" + + elicitation_callback: ElicitationFnT | None = None + """Callback for handling elicitation requests.""" + + _session: ClientSession | None = None + _exit_stack: AsyncExitStack | None = None + _transport: Transport = field(init=False) + + def __post_init__(self) -> None: + if isinstance(self.server, Server | MCPServer): + self._transport = InMemoryTransport(self.server, raise_exceptions=self.raise_exceptions) + elif isinstance(self.server, str): + self._transport = streamable_http_client(self.server) + else: + self._transport = self.server async def __aenter__(self) -> Client: """Enter the async context manager.""" @@ -108,26 +113,25 @@ async def __aenter__(self) -> Client: raise RuntimeError("Client is already entered; cannot reenter") async with AsyncExitStack() as exit_stack: - # Create transport and connect - transport = InMemoryTransport(self._server, raise_exceptions=self._raise_exceptions) - read_stream, write_stream = await exit_stack.enter_async_context(transport.connect()) + # Transports may return additional values (e.g., streamable_http_client returns a 3-tuple) + # We only need the first two elements (read_stream, write_stream) + streams = await exit_stack.enter_async_context(self._transport) + read_stream, write_stream = streams[0], streams[1] - # Create session self._session = await exit_stack.enter_async_context( ClientSession( read_stream=read_stream, write_stream=write_stream, - read_timeout_seconds=self._read_timeout_seconds, - sampling_callback=self._sampling_callback, - list_roots_callback=self._list_roots_callback, - logging_callback=self._logging_callback, - message_handler=self._message_handler, - client_info=self._client_info, - elicitation_callback=self._elicitation_callback, + read_timeout_seconds=self.read_timeout_seconds, + sampling_callback=self.sampling_callback, + list_roots_callback=self.list_roots_callback, + logging_callback=self.logging_callback, + message_handler=self.message_handler, + client_info=self.client_info, + elicitation_callback=self.elicitation_callback, ) ) - # Initialize the session await self._session.initialize() # Transfer ownership to self for __aexit__ to handle diff --git a/src/mcp/client/session_group.py b/src/mcp/client/session_group.py index 9b0f80a44..f4e6293b7 100644 --- a/src/mcp/client/session_group.py +++ b/src/mcp/client/session_group.py @@ -296,7 +296,7 @@ async def _establish_session( http_client=httpx_client, terminate_on_close=server_params.terminate_on_close, ) - read, write, _ = await session_stack.enter_async_context(client) + read, write = await session_stack.enter_async_context(client) session = await session_stack.enter_async_context( mcp.ClientSession( diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index f6d164574..119bf7d02 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -14,6 +14,7 @@ from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from httpx_sse import EventSource, ServerSentEvent, aconnect_sse +from mcp.client._transport import TransportStreams from mcp.shared._httpx_utils import create_mcp_http_client from mcp.shared.message import ClientMessageMetadata, SessionMessage from mcp.types import ( @@ -200,8 +201,8 @@ async def handle_get_stream(self, client: httpx.AsyncClient, read_stream_writer: # Stream ended normally (server closed) - reset attempt counter attempt = 0 - except Exception as exc: # pragma: lax no cover - logger.debug(f"GET stream error: {exc}") + except Exception: # pragma: lax no cover + logger.debug("GET stream error", exc_info=True) attempt += 1 if attempt >= MAX_RECONNECTION_ATTEMPTS: # pragma: no cover @@ -493,20 +494,16 @@ def get_session_id(self) -> str | None: return self.session_id +# TODO(Marcelo): I've dropped the `get_session_id` callback because it breaks the Transport protocol. Is that needed? +# It's a completely wrong abstraction, so removal is a good idea. But if we need the client to find the session ID, +# we should think about a better way to do it. I believe we can achieve it with other means. @asynccontextmanager async def streamable_http_client( url: str, *, http_client: httpx.AsyncClient | None = None, terminate_on_close: bool = True, -) -> AsyncGenerator[ - tuple[ - MemoryObjectReceiveStream[SessionMessage | Exception], - MemoryObjectSendStream[SessionMessage], - GetSessionIdCallback, - ], - None, -]: +) -> AsyncGenerator[TransportStreams, None]: """Client transport for StreamableHTTP. Args: @@ -520,7 +517,6 @@ async def streamable_http_client( Tuple containing: - read_stream: Stream for reading messages from the server - write_stream: Stream for sending messages to the server - - get_session_id_callback: Function to retrieve the current session ID Example: See examples/snippets/clients/ for usage patterns. @@ -561,7 +557,7 @@ def start_get_stream() -> None: ) try: - yield (read_stream, write_stream, transport.get_session_id) + yield read_stream, write_stream finally: if transport.session_id and terminate_on_close: await transport.terminate_session(client) diff --git a/src/mcp/shared/_httpx_utils.py b/src/mcp/shared/_httpx_utils.py index 945ef8095..8cf7bda2a 100644 --- a/src/mcp/shared/_httpx_utils.py +++ b/src/mcp/shared/_httpx_utils.py @@ -66,9 +66,7 @@ def create_mcp_http_client( response = await client.get("/protected-endpoint") """ # Set MCP defaults - kwargs: dict[str, Any] = { - "follow_redirects": True, - } + kwargs: dict[str, Any] = {"follow_redirects": True} # Handle timeout if timeout is None: diff --git a/tests/client/test_http_unicode.py b/tests/client/test_http_unicode.py index f368c3018..fb4ad9408 100644 --- a/tests/client/test_http_unicode.py +++ b/tests/client/test_http_unicode.py @@ -173,7 +173,7 @@ async def test_streamable_http_client_unicode_tool_call(running_unicode_server: base_url = running_unicode_server endpoint_url = f"{base_url}/mcp" - async with streamable_http_client(endpoint_url) as (read_stream, write_stream, _get_session_id): + async with streamable_http_client(endpoint_url) as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: await session.initialize() @@ -205,7 +205,7 @@ async def test_streamable_http_client_unicode_prompts(running_unicode_server: st base_url = running_unicode_server endpoint_url = f"{base_url}/mcp" - async with streamable_http_client(endpoint_url) as (read_stream, write_stream, _get_session_id): + async with streamable_http_client(endpoint_url) as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: await session.initialize() diff --git a/tests/client/test_notification_response.py b/tests/client/test_notification_response.py index 83d977097..b36f2fe34 100644 --- a/tests/client/test_notification_response.py +++ b/tests/client/test_notification_response.py @@ -125,12 +125,8 @@ async def message_handler( # pragma: no cover if isinstance(message, Exception): returned_exception = message - async with streamable_http_client(server_url) as (read_stream, write_stream, _): - async with ClientSession( - read_stream, - write_stream, - message_handler=message_handler, - ) as session: + async with streamable_http_client(server_url) as (read_stream, write_stream): + async with ClientSession(read_stream, write_stream, message_handler=message_handler) as session: # Initialize should work normally await session.initialize() diff --git a/tests/client/transports/test_memory.py b/tests/client/transports/test_memory.py index 0336aea0a..30ecb0ac3 100644 --- a/tests/client/transports/test_memory.py +++ b/tests/client/transports/test_memory.py @@ -53,7 +53,7 @@ def test_resource() -> str: # pragma: no cover async def test_with_server(simple_server: Server): """Test creating transport with a Server instance.""" transport = InMemoryTransport(simple_server) - async with transport.connect() as (read_stream, write_stream): + async with transport as (read_stream, write_stream): assert read_stream is not None assert write_stream is not None @@ -61,7 +61,7 @@ async def test_with_server(simple_server: Server): async def test_with_mcpserver(mcpserver_server: MCPServer): """Test creating transport with an MCPServer instance.""" transport = InMemoryTransport(mcpserver_server) - async with transport.connect() as (read_stream, write_stream): + async with transport as (read_stream, write_stream): assert read_stream is not None assert write_stream is not None @@ -93,5 +93,5 @@ async def test_call_tool(mcpserver_server: MCPServer): async def test_raise_exceptions(mcpserver_server: MCPServer): """Test that raise_exceptions parameter is passed through.""" transport = InMemoryTransport(mcpserver_server, raise_exceptions=True) - async with transport.connect() as (read_stream, _write_stream): + async with transport as (read_stream, _write_stream): assert read_stream is not None diff --git a/tests/server/mcpserver/test_integration.py b/tests/server/mcpserver/test_integration.py index 4d624c68e..132427e5e 100644 --- a/tests/server/mcpserver/test_integration.py +++ b/tests/server/mcpserver/test_integration.py @@ -16,7 +16,6 @@ import pytest import uvicorn -from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from inline_snapshot import snapshot from examples.snippets.servers import ( @@ -33,9 +32,8 @@ ) from mcp.client.session import ClientSession from mcp.client.sse import sse_client -from mcp.client.streamable_http import GetSessionIdCallback, streamable_http_client +from mcp.client.streamable_http import streamable_http_client from mcp.shared.context import RequestContext -from mcp.shared.message import SessionMessage from mcp.shared.session import RequestResponder from mcp.types import ( ClientResult, @@ -185,32 +183,6 @@ def create_client_for_transport(transport: str, server_url: str): raise ValueError(f"Invalid transport: {transport}") -def unpack_streams( - client_streams: tuple[MemoryObjectReceiveStream[SessionMessage | Exception], MemoryObjectSendStream[SessionMessage]] - | tuple[ - MemoryObjectReceiveStream[SessionMessage | Exception], - MemoryObjectSendStream[SessionMessage], - GetSessionIdCallback, - ], -): - """Unpack client streams handling different return values from SSE vs StreamableHTTP. - - SSE client returns (read_stream, write_stream) - StreamableHTTP client returns (read_stream, write_stream, session_id_callback) - - Args: - client_streams: Tuple from client context manager - - Returns: - Tuple of (read_stream, write_stream) - """ - if len(client_streams) == 2: - return client_streams - else: - read_stream, write_stream, _ = client_streams - return read_stream, write_stream - - # Callback functions for testing async def sampling_callback( context: RequestContext[ClientSession, None], params: CreateMessageRequestParams @@ -253,8 +225,7 @@ async def test_basic_tools(server_transport: str, server_url: str) -> None: transport = server_transport client_cm = create_client_for_transport(transport, server_url) - async with client_cm as client_streams: - read_stream, write_stream = unpack_streams(client_streams) + async with client_cm as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: # Test initialization result = await session.initialize() @@ -290,8 +261,7 @@ async def test_basic_resources(server_transport: str, server_url: str) -> None: transport = server_transport client_cm = create_client_for_transport(transport, server_url) - async with client_cm as client_streams: - read_stream, write_stream = unpack_streams(client_streams) + async with client_cm as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: # Test initialization result = await session.initialize() @@ -331,8 +301,7 @@ async def test_basic_prompts(server_transport: str, server_url: str) -> None: transport = server_transport client_cm = create_client_for_transport(transport, server_url) - async with client_cm as client_streams: - read_stream, write_stream = unpack_streams(client_streams) + async with client_cm as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: # Test initialization result = await session.initialize() @@ -391,8 +360,7 @@ async def message_handler(message: RequestResponder[ServerRequest, ClientResult] client_cm = create_client_for_transport(transport, server_url) - async with client_cm as client_streams: - read_stream, write_stream = unpack_streams(client_streams) + async with client_cm as (read_stream, write_stream): async with ClientSession(read_stream, write_stream, message_handler=message_handler) as session: # Test initialization result = await session.initialize() @@ -441,8 +409,7 @@ async def test_sampling(server_transport: str, server_url: str) -> None: transport = server_transport client_cm = create_client_for_transport(transport, server_url) - async with client_cm as client_streams: - read_stream, write_stream = unpack_streams(client_streams) + async with client_cm as (read_stream, write_stream): async with ClientSession(read_stream, write_stream, sampling_callback=sampling_callback) as session: # Test initialization result = await session.initialize() @@ -472,8 +439,7 @@ async def test_elicitation(server_transport: str, server_url: str) -> None: transport = server_transport client_cm = create_client_for_transport(transport, server_url) - async with client_cm as client_streams: - read_stream, write_stream = unpack_streams(client_streams) + async with client_cm as (read_stream, write_stream): async with ClientSession(read_stream, write_stream, elicitation_callback=elicitation_callback) as session: # Test initialization result = await session.initialize() @@ -529,8 +495,7 @@ async def message_handler(message: RequestResponder[ServerRequest, ClientResult] client_cm = create_client_for_transport(transport, server_url) - async with client_cm as client_streams: - read_stream, write_stream = unpack_streams(client_streams) + async with client_cm as (read_stream, write_stream): async with ClientSession(read_stream, write_stream, message_handler=message_handler) as session: # Test initialization result = await session.initialize() @@ -570,8 +535,7 @@ async def test_completion(server_transport: str, server_url: str) -> None: transport = server_transport client_cm = create_client_for_transport(transport, server_url) - async with client_cm as client_streams: - read_stream, write_stream = unpack_streams(client_streams) + async with client_cm as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: # Test initialization result = await session.initialize() @@ -623,8 +587,7 @@ async def test_mcpserver_quickstart(server_transport: str, server_url: str) -> N transport = server_transport client_cm = create_client_for_transport(transport, server_url) - async with client_cm as client_streams: - read_stream, write_stream = unpack_streams(client_streams) + async with client_cm as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: # Test initialization result = await session.initialize() @@ -659,8 +622,7 @@ async def test_structured_output(server_transport: str, server_url: str) -> None transport = server_transport client_cm = create_client_for_transport(transport, server_url) - async with client_cm as client_streams: - read_stream, write_stream = unpack_streams(client_streams) + async with client_cm as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: # Test initialization result = await session.initialize() diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index cd02cacdb..70a9fca40 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -43,7 +43,11 @@ ) from mcp.server.streamable_http_manager import StreamableHTTPSessionManager from mcp.server.transport_security import TransportSecuritySettings -from mcp.shared._httpx_utils import create_mcp_http_client +from mcp.shared._httpx_utils import ( + MCP_DEFAULT_SSE_READ_TIMEOUT, + MCP_DEFAULT_TIMEOUT, + create_mcp_http_client, +) from mcp.shared.context import RequestContext from mcp.shared.message import ClientMessageMetadata, ServerMessageMetadata, SessionMessage from mcp.shared.session import RequestResponder @@ -965,15 +969,8 @@ async def http_client(basic_server: None, basic_server_url: str): # pragma: no @pytest.fixture async def initialized_client_session(basic_server: None, basic_server_url: str): """Create initialized StreamableHTTP client session.""" - async with streamable_http_client(f"{basic_server_url}/mcp") as ( - read_stream, - write_stream, - _, - ): - async with ClientSession( - read_stream, - write_stream, - ) as session: + async with streamable_http_client(f"{basic_server_url}/mcp") as (read_stream, write_stream): + async with ClientSession(read_stream, write_stream) as session: await session.initialize() yield session @@ -981,11 +978,8 @@ async def initialized_client_session(basic_server: None, basic_server_url: str): @pytest.mark.anyio async def test_streamable_http_client_basic_connection(basic_server: None, basic_server_url: str): """Test basic client connection with initialization.""" - async with streamable_http_client(f"{basic_server_url}/mcp") as (read_stream, write_stream, _): - async with ClientSession( - read_stream, - write_stream, - ) as session: + async with streamable_http_client(f"{basic_server_url}/mcp") as (read_stream, write_stream): + async with ClientSession(read_stream, write_stream) as session: # Test initialization result = await session.initialize() assert isinstance(result, InitializeResult) @@ -1029,15 +1023,8 @@ async def test_streamable_http_client_error_handling(initialized_client_session: @pytest.mark.anyio async def test_streamable_http_client_session_persistence(basic_server: None, basic_server_url: str): """Test that session ID persists across requests.""" - async with streamable_http_client(f"{basic_server_url}/mcp") as ( - read_stream, - write_stream, - _, - ): - async with ClientSession( - read_stream, - write_stream, - ) as session: + async with streamable_http_client(f"{basic_server_url}/mcp") as (read_stream, write_stream): + async with ClientSession(read_stream, write_stream) as session: # Initialize the session result = await session.initialize() assert isinstance(result, InitializeResult) @@ -1057,7 +1044,7 @@ async def test_streamable_http_client_session_persistence(basic_server: None, ba @pytest.mark.anyio async def test_streamable_http_client_json_response(json_response_server: None, json_server_url: str): """Test client with JSON response mode.""" - async with streamable_http_client(f"{json_server_url}/mcp") as (read_stream, write_stream, _): + async with streamable_http_client(f"{json_server_url}/mcp") as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: # Initialize the session result = await session.initialize() @@ -1087,7 +1074,7 @@ async def message_handler( # pragma: no branch if isinstance(message, types.ServerNotification): # pragma: no branch notifications_received.append(message) - async with streamable_http_client(f"{basic_server_url}/mcp") as (read_stream, write_stream, _): + async with streamable_http_client(f"{basic_server_url}/mcp") as (read_stream, write_stream): async with ClientSession(read_stream, write_stream, message_handler=message_handler) as session: # Initialize the session - this triggers the GET stream setup result = await session.initialize() @@ -1109,34 +1096,54 @@ async def message_handler( # pragma: no branch assert resource_update_found, "ResourceUpdatedNotification not received via GET stream" +def create_session_id_capturing_client() -> tuple[httpx.AsyncClient, list[str]]: + """Create an httpx client that captures the session ID from responses.""" + captured_ids: list[str] = [] + + async def capture_session_id(response: httpx.Response) -> None: + session_id = response.headers.get(MCP_SESSION_ID_HEADER) + if session_id: + captured_ids.append(session_id) + + client = httpx.AsyncClient( + follow_redirects=True, + timeout=httpx.Timeout(MCP_DEFAULT_TIMEOUT, read=MCP_DEFAULT_SSE_READ_TIMEOUT), + event_hooks={"response": [capture_session_id]}, + ) + return client, captured_ids + + @pytest.mark.anyio async def test_streamable_http_client_session_termination(basic_server: None, basic_server_url: str): """Test client session termination functionality.""" + # Use httpx client with event hooks to capture session ID + httpx_client, captured_ids = create_session_id_capturing_client() - captured_session_id = None - - # Create the streamable_http_client with a custom httpx client to capture headers - async with streamable_http_client(f"{basic_server_url}/mcp") as (read_stream, write_stream, get_session_id): - async with ClientSession(read_stream, write_stream) as session: - # Initialize the session - result = await session.initialize() - assert isinstance(result, InitializeResult) - captured_session_id = get_session_id() - assert captured_session_id is not None + async with httpx_client: + async with streamable_http_client(f"{basic_server_url}/mcp", http_client=httpx_client) as ( + read_stream, + write_stream, + ): + async with ClientSession(read_stream, write_stream) as session: + # Initialize the session + result = await session.initialize() + assert isinstance(result, InitializeResult) + assert len(captured_ids) > 0 + captured_session_id = captured_ids[0] + assert captured_session_id is not None - # Make a request to confirm session is working - tools = await session.list_tools() - assert len(tools.tools) == 10 + # Make a request to confirm session is working + tools = await session.list_tools() + assert len(tools.tools) == 10 headers: dict[str, str] = {} # pragma: lax no cover if captured_session_id: # pragma: lax no cover headers[MCP_SESSION_ID_HEADER] = captured_session_id - async with create_mcp_http_client(headers=headers) as httpx_client: - async with streamable_http_client(f"{basic_server_url}/mcp", http_client=httpx_client) as ( + async with create_mcp_http_client(headers=headers) as httpx_client2: + async with streamable_http_client(f"{basic_server_url}/mcp", http_client=httpx_client2) as ( read_stream, write_stream, - _, ): async with ClientSession(read_stream, write_stream) as session: # pragma: no branch # Attempt to make a request after termination @@ -1173,30 +1180,34 @@ async def mock_delete(self: httpx.AsyncClient, *args: Any, **kwargs: Any) -> htt # Apply the patch to the httpx client monkeypatch.setattr(httpx.AsyncClient, "delete", mock_delete) - captured_session_id = None + # Use httpx client with event hooks to capture session ID + httpx_client, captured_ids = create_session_id_capturing_client() - # Create the streamable_http_client with a custom httpx client to capture headers - async with streamable_http_client(f"{basic_server_url}/mcp") as (read_stream, write_stream, get_session_id): - async with ClientSession(read_stream, write_stream) as session: - # Initialize the session - result = await session.initialize() - assert isinstance(result, InitializeResult) - captured_session_id = get_session_id() - assert captured_session_id is not None + async with httpx_client: + async with streamable_http_client(f"{basic_server_url}/mcp", http_client=httpx_client) as ( + read_stream, + write_stream, + ): + async with ClientSession(read_stream, write_stream) as session: + # Initialize the session + result = await session.initialize() + assert isinstance(result, InitializeResult) + assert len(captured_ids) > 0 + captured_session_id = captured_ids[0] + assert captured_session_id is not None - # Make a request to confirm session is working - tools = await session.list_tools() - assert len(tools.tools) == 10 + # Make a request to confirm session is working + tools = await session.list_tools() + assert len(tools.tools) == 10 headers: dict[str, str] = {} # pragma: lax no cover if captured_session_id: # pragma: lax no cover headers[MCP_SESSION_ID_HEADER] = captured_session_id - async with create_mcp_http_client(headers=headers) as httpx_client: - async with streamable_http_client(f"{basic_server_url}/mcp", http_client=httpx_client) as ( + async with create_mcp_http_client(headers=headers) as httpx_client2: + async with streamable_http_client(f"{basic_server_url}/mcp", http_client=httpx_client2) as ( read_stream, write_stream, - _, ): async with ClientSession(read_stream, write_stream) as session: # pragma: no branch # Attempt to make a request after termination @@ -1210,10 +1221,9 @@ async def test_streamable_http_client_resumption(event_server: tuple[SimpleEvent _, server_url = event_server # Variables to track the state - captured_session_id = None - captured_resumption_token = None + captured_resumption_token: str | None = None captured_notifications: list[types.ServerNotification] = [] - captured_protocol_version = None + captured_protocol_version: str | int | None = None first_notification_received = False async def message_handler( # pragma: no branch @@ -1231,50 +1241,56 @@ async def on_resumption_token_update(token: str) -> None: nonlocal captured_resumption_token captured_resumption_token = token - # First, start the client session and begin the tool that waits on lock - async with streamable_http_client(f"{server_url}/mcp", terminate_on_close=False) as ( - read_stream, - write_stream, - get_session_id, - ): - async with ClientSession(read_stream, write_stream, message_handler=message_handler) as session: - # Initialize the session - result = await session.initialize() - assert isinstance(result, InitializeResult) - captured_session_id = get_session_id() - assert captured_session_id is not None - # Capture the negotiated protocol version - captured_protocol_version = result.protocol_version - - # Start the tool that will wait on lock in a task - async with anyio.create_task_group() as tg: # pragma: no branch + # Use httpx client with event hooks to capture session ID + httpx_client, captured_ids = create_session_id_capturing_client() - async def run_tool(): - metadata = ClientMessageMetadata( - on_resumption_token_update=on_resumption_token_update, - ) - await session.send_request( - types.CallToolRequest( - params=types.CallToolRequestParams(name="wait_for_lock_with_notification", arguments={}), - ), - types.CallToolResult, - metadata=metadata, - ) + # First, start the client session and begin the tool that waits on lock + async with httpx_client: + async with streamable_http_client(f"{server_url}/mcp", terminate_on_close=False, http_client=httpx_client) as ( + read_stream, + write_stream, + ): + async with ClientSession(read_stream, write_stream, message_handler=message_handler) as session: + # Initialize the session + result = await session.initialize() + assert isinstance(result, InitializeResult) + assert len(captured_ids) > 0 + captured_session_id = captured_ids[0] + assert captured_session_id is not None + # Capture the negotiated protocol version + captured_protocol_version = result.protocol_version + + # Start the tool that will wait on lock in a task + async with anyio.create_task_group() as tg: # pragma: no branch + + async def run_tool(): + metadata = ClientMessageMetadata( + on_resumption_token_update=on_resumption_token_update, + ) + await session.send_request( + types.CallToolRequest( + params=types.CallToolRequestParams( + name="wait_for_lock_with_notification", arguments={} + ), + ), + types.CallToolResult, + metadata=metadata, + ) - tg.start_soon(run_tool) + tg.start_soon(run_tool) - # Wait for the first notification and resumption token - while not first_notification_received or not captured_resumption_token: - await anyio.sleep(0.1) + # Wait for the first notification and resumption token + while not first_notification_received or not captured_resumption_token: + await anyio.sleep(0.1) - # Kill the client session while tool is waiting on lock - tg.cancel_scope.cancel() + # Kill the client session while tool is waiting on lock + tg.cancel_scope.cancel() - # Verify we received exactly one notification (inside ClientSession - # so coverage tracks these on Python 3.11, see PR #1897 for details) - assert len(captured_notifications) == 1 # pragma: lax no cover - assert isinstance(captured_notifications[0], types.LoggingMessageNotification) # pragma: lax no cover - assert captured_notifications[0].params.data == "First notification before lock" # pragma: lax no cover + # Verify we received exactly one notification (inside ClientSession + # so coverage tracks these on Python 3.11, see PR #1897 for details) + assert len(captured_notifications) == 1 # pragma: lax no cover + assert isinstance(captured_notifications[0], types.LoggingMessageNotification) # pragma: lax no cover + assert captured_notifications[0].params.data == "First notification before lock" # pragma: lax no cover # Clear notifications and set up headers for phase 2 (between connections, # not tracked by coverage on Python 3.11 due to cancel scope + sys.settrace bug) @@ -1286,11 +1302,10 @@ async def run_tool(): MCP_PROTOCOL_VERSION_HEADER: captured_protocol_version, } - async with create_mcp_http_client(headers=headers) as httpx_client: - async with streamable_http_client(f"{server_url}/mcp", http_client=httpx_client) as ( + async with create_mcp_http_client(headers=headers) as httpx_client2: + async with streamable_http_client(f"{server_url}/mcp", http_client=httpx_client2) as ( read_stream, write_stream, - _, ): async with ClientSession( read_stream, write_stream, message_handler=message_handler @@ -1349,16 +1364,8 @@ async def sampling_callback( ) # Create client with sampling callback - async with streamable_http_client(f"{basic_server_url}/mcp") as ( - read_stream, - write_stream, - _, - ): - async with ClientSession( - read_stream, - write_stream, - sampling_callback=sampling_callback, - ) as session: + async with streamable_http_client(f"{basic_server_url}/mcp") as (read_stream, write_stream): + async with ClientSession(read_stream, write_stream, sampling_callback=sampling_callback) as session: # Initialize the session result = await session.initialize() assert isinstance(result, InitializeResult) @@ -1498,7 +1505,6 @@ async def test_streamablehttp_request_context_propagation(context_aware_server: async with streamable_http_client(f"{basic_server_url}/mcp", http_client=httpx_client) as ( read_stream, write_stream, - _, ): async with ClientSession(read_stream, write_stream) as session: # pragma: no branch result = await session.initialize() @@ -1536,7 +1542,6 @@ async def test_streamablehttp_request_context_isolation(context_aware_server: No async with streamable_http_client(f"{basic_server_url}/mcp", http_client=httpx_client) as ( read_stream, write_stream, - _, ): async with ClientSession(read_stream, write_stream) as session: # pragma: no branch await session.initialize() @@ -1561,11 +1566,7 @@ async def test_streamablehttp_request_context_isolation(context_aware_server: No @pytest.mark.anyio async def test_client_includes_protocol_version_header_after_init(context_aware_server: None, basic_server_url: str): """Test that client includes mcp-protocol-version header after initialization.""" - async with streamable_http_client(f"{basic_server_url}/mcp") as ( - read_stream, - write_stream, - _, - ): + async with streamable_http_client(f"{basic_server_url}/mcp") as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: # Initialize and get the negotiated version init_result = await session.initialize() @@ -1677,7 +1678,7 @@ async def test_client_crash_handled(basic_server: None, basic_server_url: str): # Simulate bad client that crashes after init async def bad_client(): """Client that triggers ClosedResourceError""" - async with streamable_http_client(f"{basic_server_url}/mcp") as (read_stream, write_stream, _): + async with streamable_http_client(f"{basic_server_url}/mcp") as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: await session.initialize() raise Exception("client crash") @@ -1691,7 +1692,7 @@ async def bad_client(): await anyio.sleep(0.1) # Try a good client, it should still be able to connect and list tools - async with streamable_http_client(f"{basic_server_url}/mcp") as (read_stream, write_stream, _): + async with streamable_http_client(f"{basic_server_url}/mcp") as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: result = await session.initialize() assert isinstance(result, InitializeResult) @@ -1847,11 +1848,7 @@ async def test_streamable_http_client_receives_priming_event( async def on_resumption_token_update(token: str) -> None: captured_resumption_tokens.append(token) - async with streamable_http_client(f"{server_url}/mcp") as ( - read_stream, - write_stream, - _, - ): + async with streamable_http_client(f"{server_url}/mcp") as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: await session.initialize() @@ -1884,11 +1881,7 @@ async def test_server_close_sse_stream_via_context( """Server tool can call ctx.close_sse_stream() to close connection.""" _, server_url = event_server - async with streamable_http_client(f"{server_url}/mcp") as ( - read_stream, - write_stream, - _, - ): + async with streamable_http_client(f"{server_url}/mcp") as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: await session.initialize() @@ -1921,16 +1914,8 @@ async def message_handler( if isinstance(message, types.LoggingMessageNotification): # pragma: no branch captured_notifications.append(str(message.params.data)) - async with streamable_http_client(f"{server_url}/mcp") as ( - read_stream, - write_stream, - _, - ): - async with ClientSession( - read_stream, - write_stream, - message_handler=message_handler, - ) as session: + async with streamable_http_client(f"{server_url}/mcp") as (read_stream, write_stream): + async with ClientSession(read_stream, write_stream, message_handler=message_handler) as session: await session.initialize() # Call tool that: @@ -1956,11 +1941,7 @@ async def test_streamable_http_client_respects_retry_interval( """Client MUST respect retry field, waiting specified ms before reconnecting.""" _, server_url = event_server - async with streamable_http_client(f"{server_url}/mcp") as ( - read_stream, - write_stream, - _, - ): + async with streamable_http_client(f"{server_url}/mcp") as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: await session.initialize() @@ -1997,16 +1978,8 @@ async def message_handler( if isinstance(message, types.LoggingMessageNotification): # pragma: no branch all_notifications.append(str(message.params.data)) - async with streamable_http_client(f"{server_url}/mcp") as ( - read_stream, - write_stream, - _, - ): - async with ClientSession( - read_stream, - write_stream, - message_handler=message_handler, - ) as session: + async with streamable_http_client(f"{server_url}/mcp") as (read_stream, write_stream): + async with ClientSession(read_stream, write_stream, message_handler=message_handler) as session: await session.initialize() # Call tool that simulates polling pattern: @@ -2045,16 +2018,8 @@ async def message_handler( if isinstance(message, types.LoggingMessageNotification): # pragma: no branch notification_data.append(str(message.params.data)) - async with streamable_http_client(f"{server_url}/mcp") as ( - read_stream, - write_stream, - _, - ): - async with ClientSession( - read_stream, - write_stream, - message_handler=message_handler, - ) as session: + async with streamable_http_client(f"{server_url}/mcp") as (read_stream, write_stream): + async with ClientSession(read_stream, write_stream, message_handler=message_handler) as session: await session.initialize() # Tool sends: notification1, close_stream, notification2, notification3, response @@ -2097,7 +2062,7 @@ async def test_streamable_http_multiple_reconnections( async def on_resumption_token(token: str) -> None: resumption_tokens.append(token) - async with streamable_http_client(f"{server_url}/mcp") as (read_stream, write_stream, _): + async with streamable_http_client(f"{server_url}/mcp") as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: await session.initialize() @@ -2128,9 +2093,7 @@ async def on_resumption_token(token: str) -> None: @pytest.mark.anyio -async def test_standalone_get_stream_reconnection( - event_server: tuple[SimpleEventStore, str], -) -> None: +async def test_standalone_get_stream_reconnection(event_server: tuple[SimpleEventStore, str]) -> None: """Test that standalone GET stream automatically reconnects after server closes it. Verifies: @@ -2154,16 +2117,8 @@ async def message_handler( if isinstance(message, types.ResourceUpdatedNotification): # pragma: no branch received_notifications.append(str(message.params.uri)) - async with streamable_http_client(f"{server_url}/mcp") as ( - read_stream, - write_stream, - _, - ): - async with ClientSession( - read_stream, - write_stream, - message_handler=message_handler, - ) as session: + async with streamable_http_client(f"{server_url}/mcp") as (read_stream, write_stream): + async with ClientSession(read_stream, write_stream, message_handler=message_handler) as session: await session.initialize() # Call tool that: @@ -2203,7 +2158,6 @@ async def test_streamable_http_client_does_not_mutate_provided_client( async with streamable_http_client(f"{basic_server_url}/mcp", http_client=custom_client) as ( read_stream, write_stream, - _, ): async with ClientSession(read_stream, write_stream) as session: result = await session.initialize() @@ -2233,11 +2187,7 @@ async def test_streamable_http_client_mcp_headers_override_defaults( # Verify client has default accept header assert client.headers.get("accept") == "*/*" - async with streamable_http_client(f"{basic_server_url}/mcp", http_client=client) as ( - read_stream, - write_stream, - _, - ): + async with streamable_http_client(f"{basic_server_url}/mcp", http_client=client) as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: # pragma: no branch await session.initialize() @@ -2268,11 +2218,7 @@ async def test_streamable_http_client_preserves_custom_with_mcp_headers( } async with httpx.AsyncClient(headers=custom_headers, follow_redirects=True) as client: - async with streamable_http_client(f"{basic_server_url}/mcp", http_client=client) as ( - read_stream, - write_stream, - _, - ): + async with streamable_http_client(f"{basic_server_url}/mcp", http_client=client) as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: # pragma: no branch await session.initialize() From 77d0af00409e4cc976611e6c8989ad499b13f8fb Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Fri, 30 Jan 2026 01:03:07 +0100 Subject: [PATCH 2/8] Add migration note --- docs/migration.md | 50 +++++++++++++++++++++++++++++- tests/client/test_session_group.py | 16 ++-------- 2 files changed, 52 insertions(+), 14 deletions(-) diff --git a/docs/migration.md b/docs/migration.md index 6380f5487..b941fb5a1 100644 --- a/docs/migration.md +++ b/docs/migration.md @@ -44,10 +44,58 @@ async with http_client: async with streamable_http_client( url="http://localhost:8000/mcp", http_client=http_client, - ) as (read_stream, write_stream, get_session_id): + ) as (read_stream, write_stream): ... ``` +### `get_session_id` callback removed from `streamable_http_client` + +The `get_session_id` callback (third element of the returned tuple) has been removed from `streamable_http_client`. The function now returns a 2-tuple `(read_stream, write_stream)` instead of a 3-tuple. + +If you need to capture the session ID (e.g., for session resumption testing), you can use httpx event hooks to capture it from the response headers: + +**Before (v1):** + +```python +from mcp.client.streamable_http import streamable_http_client + +async with streamable_http_client(url) as (read_stream, write_stream, get_session_id): + async with ClientSession(read_stream, write_stream) as session: + await session.initialize() + session_id = get_session_id() # Get session ID via callback +``` + +**After (v2):** + +```python +import httpx +from mcp.client.streamable_http import streamable_http_client + +# Option 1: Simply ignore if you don't need the session ID +async with streamable_http_client(url) as (read_stream, write_stream): + async with ClientSession(read_stream, write_stream) as session: + await session.initialize() + +# Option 2: Capture session ID via httpx event hooks if needed +captured_session_ids: list[str] = [] + +async def capture_session_id(response: httpx.Response) -> None: + session_id = response.headers.get("mcp-session-id") + if session_id: + captured_session_ids.append(session_id) + +http_client = httpx.AsyncClient( + event_hooks={"response": [capture_session_id]}, + follow_redirects=True, +) + +async with http_client: + async with streamable_http_client(url, http_client=http_client) as (read_stream, write_stream): + async with ClientSession(read_stream, write_stream) as session: + await session.initialize() + session_id = captured_session_ids[0] if captured_session_ids else None +``` + ### `StreamableHTTPTransport` parameters removed The `headers`, `timeout`, `sse_read_timeout`, and `auth` parameters have been removed from `StreamableHTTPTransport`. Configure these on the `httpx.AsyncClient` instead (see example above). diff --git a/tests/client/test_session_group.py b/tests/client/test_session_group.py index b480a0cb1..6a58b39f3 100644 --- a/tests/client/test_session_group.py +++ b/tests/client/test_session_group.py @@ -278,6 +278,7 @@ async def test_client_session_group_disconnect_non_existent_server(): await group.disconnect_from_server(session) +# TODO(Marcelo): This is horrible. We should drop this test. @pytest.mark.anyio @pytest.mark.parametrize( "server_params_instance, client_type_name, patch_target_for_client_func", @@ -310,19 +311,8 @@ async def test_client_session_group_establish_session_parameterized( mock_read_stream = mock.AsyncMock(name=f"{client_type_name}Read") mock_write_stream = mock.AsyncMock(name=f"{client_type_name}Write") - # streamable_http_client's __aenter__ returns three values - if client_type_name == "streamablehttp": - mock_extra_stream_val = mock.AsyncMock(name="StreamableExtra") - mock_client_cm_instance.__aenter__.return_value = ( - mock_read_stream, - mock_write_stream, - mock_extra_stream_val, - ) - else: - mock_client_cm_instance.__aenter__.return_value = ( - mock_read_stream, - mock_write_stream, - ) + # All client context managers return (read_stream, write_stream) + mock_client_cm_instance.__aenter__.return_value = (mock_read_stream, mock_write_stream) mock_client_cm_instance.__aexit__ = mock.AsyncMock(return_value=None) mock_specific_client_func.return_value = mock_client_cm_instance From 1a577a4eb5da7fcdb92ce8b28ec7fa8d8a9be7e1 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Fri, 30 Jan 2026 10:09:43 +0100 Subject: [PATCH 3/8] Proper tests --- CLAUDE.md | 2 +- src/mcp/client/_memory.py | 2 +- src/mcp/client/streamable_http.py | 3 ++- tests/client/test_client.py | 21 +++++++++++++++++++++ 4 files changed, 25 insertions(+), 3 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 93ddf44e9..a4ef16e42 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -17,7 +17,7 @@ This document contains critical information about working with this codebase. Fo - Functions must be focused and small - Follow existing patterns exactly - Line length: 120 chars maximum - - FORBIDDEN: imports inside functions + - FORBIDDEN: imports inside functions. THEY SHOULD BE AT THE TOP OF THE FILE. 3. Testing Requirements - Framework: `uv run --frozen pytest` diff --git a/src/mcp/client/_memory.py b/src/mcp/client/_memory.py index 27b326e4f..e1da4ab61 100644 --- a/src/mcp/client/_memory.py +++ b/src/mcp/client/_memory.py @@ -86,6 +86,6 @@ async def __aexit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None ) -> None: """Close the transport and stop the server.""" - if self._cm is not None: + if self._cm is not None: # pragma: no branch await self._cm.__aexit__(exc_type, exc_val, exc_tb) self._cm = None diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 119bf7d02..f3be208ca 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -489,9 +489,10 @@ async def terminate_session(self, client: httpx.AsyncClient) -> None: except Exception as exc: # pragma: no cover logger.warning(f"Session termination failed: {exc}") + # TODO(Marcelo): Check the TODO below, and cover this with tests if necessary. def get_session_id(self) -> str | None: """Get the current session ID.""" - return self.session_id + return self.session_id # pragma: no cover # TODO(Marcelo): I've dropped the `get_session_id` callback because it breaks the Transport protocol. Is that needed? diff --git a/tests/client/test_client.py b/tests/client/test_client.py index 44d0a2037..3e6db423b 100644 --- a/tests/client/test_client.py +++ b/tests/client/test_client.py @@ -2,11 +2,14 @@ from __future__ import annotations +from unittest.mock import patch + import anyio import pytest from inline_snapshot import snapshot import mcp.types as types +from mcp.client._memory import InMemoryTransport from mcp.client.client import Client from mcp.server import Server from mcp.server.mcpserver import MCPServer @@ -285,3 +288,21 @@ async def test_complete_with_prompt_reference(simple_server: Server): ref = types.PromptReference(type="ref/prompt", name="test_prompt") result = await client.complete(ref=ref, argument={"name": "arg", "value": "test"}) assert result == snapshot(types.CompleteResult(completion=types.Completion(values=[]))) + + +def test_client_with_url_initializes_streamable_http_transport(): + with patch("mcp.client.client.streamable_http_client") as mock: + _ = Client("http://localhost:8000/mcp") + mock.assert_called_once_with("http://localhost:8000/mcp") + + +async def test_client_uses_transport_directly(app: MCPServer): + transport = InMemoryTransport(app) + async with Client(transport) as client: + result = await client.call_tool("greet", {"name": "Transport"}) + assert result == snapshot( + CallToolResult( + content=[TextContent(text="Hello, Transport!")], + structured_content={"result": "Hello, Transport!"}, + ) + ) From 3e9506d306f902a2b6e3bcc8bfafc8a0123f815c Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Fri, 30 Jan 2026 10:12:16 +0100 Subject: [PATCH 4/8] fix conformance test --- .github/actions/conformance/client.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/actions/conformance/client.py b/.github/actions/conformance/client.py index 7ca88110a..3a2ac2802 100644 --- a/.github/actions/conformance/client.py +++ b/.github/actions/conformance/client.py @@ -156,7 +156,7 @@ async def handle_callback(self) -> tuple[str, str | None]: @register("initialize") async def run_initialize(server_url: str) -> None: """Connect, initialize, list tools, close.""" - async with streamable_http_client(url=server_url) as (read_stream, write_stream, _): + async with streamable_http_client(url=server_url) as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: await session.initialize() logger.debug("Initialized successfully") @@ -167,7 +167,7 @@ async def run_initialize(server_url: str) -> None: @register("tools_call") async def run_tools_call(server_url: str) -> None: """Connect, initialize, list tools, call add_numbers(a=5, b=3), close.""" - async with streamable_http_client(url=server_url) as (read_stream, write_stream, _): + async with streamable_http_client(url=server_url) as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: await session.initialize() await session.list_tools() @@ -178,7 +178,7 @@ async def run_tools_call(server_url: str) -> None: @register("sse-retry") async def run_sse_retry(server_url: str) -> None: """Connect, initialize, list tools, call test_reconnection, close.""" - async with streamable_http_client(url=server_url) as (read_stream, write_stream, _): + async with streamable_http_client(url=server_url) as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: await session.initialize() await session.list_tools() @@ -209,7 +209,7 @@ async def default_elicitation_callback( @register("elicitation-sep1034-client-defaults") async def run_elicitation_defaults(server_url: str) -> None: """Connect with elicitation callback that applies schema defaults.""" - async with streamable_http_client(url=server_url) as (read_stream, write_stream, _): + async with streamable_http_client(url=server_url) as (read_stream, write_stream): async with ClientSession( read_stream, write_stream, elicitation_callback=default_elicitation_callback ) as session: @@ -296,7 +296,7 @@ async def run_auth_code_client(server_url: str) -> None: async def _run_auth_session(server_url: str, oauth_auth: OAuthClientProvider) -> None: """Common session logic for all OAuth flows.""" client = httpx.AsyncClient(auth=oauth_auth, timeout=30.0) - async with streamable_http_client(url=server_url, http_client=client) as (read_stream, write_stream, _): + async with streamable_http_client(url=server_url, http_client=client) as (read_stream, write_stream): async with ClientSession( read_stream, write_stream, elicitation_callback=default_elicitation_callback ) as session: From c202c1205af6395ce45b3261d2fd19652ff3ce5f Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Fri, 30 Jan 2026 10:13:08 +0100 Subject: [PATCH 5/8] updated readme --- README.v2.md | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/README.v2.md b/README.v2.md index 652f2e730..d34b7832b 100644 --- a/README.v2.md +++ b/README.v2.md @@ -2215,11 +2215,7 @@ from mcp.client.streamable_http import streamable_http_client async def main(): # Connect to a streamable HTTP server - async with streamable_http_client("http://localhost:8000/mcp") as ( - read_stream, - write_stream, - _, - ): + async with streamable_http_client("http://localhost:8000/mcp") as (read_stream, write_stream): # Create a session using the client streams async with ClientSession(read_stream, write_stream) as session: # Initialize the connection @@ -2397,7 +2393,7 @@ async def main(): ) async with httpx.AsyncClient(auth=oauth_auth, follow_redirects=True) as custom_client: - async with streamable_http_client("http://localhost:8001/mcp", http_client=custom_client) as (read, write, _): + async with streamable_http_client("http://localhost:8001/mcp", http_client=custom_client) as (read, write): async with ClientSession(read, write) as session: await session.initialize() From 530f50cb26aa06c004dfc1a10854a9490c60bf1a Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Fri, 30 Jan 2026 10:23:10 +0100 Subject: [PATCH 6/8] old readme --- README.md | 415 +++++++++++++++++++++++++----------------------------- 1 file changed, 188 insertions(+), 227 deletions(-) diff --git a/README.md b/README.md index b551ef6cd..0f0468a19 100644 --- a/README.md +++ b/README.md @@ -13,13 +13,12 @@ - - -> [!NOTE] -> **This README documents v1.x of the MCP Python SDK (the current stable release).** +> [!IMPORTANT] +> **This is the `main` branch which contains v2 of the SDK (currently in development, pre-alpha).** +> +> We anticipate a stable v2 release in Q1 2026. Until then, **v1.x remains the recommended version** for production use. v1.x will continue to receive bug fixes and security updates for at least 6 months after v2 ships to give people time to upgrade. > -> For v1.x code and documentation, see the [`v1.x` branch](https://github.com/modelcontextprotocol/python-sdk/tree/v1.x). -> For the upcoming v2 documentation (pre-alpha, in development on `main`), see [`README.v2.md`](README.v2.md). +> For v1 documentation and code, see the [`v1.x` branch](https://github.com/modelcontextprotocol/python-sdk/tree/v1.x). ## Table of Contents @@ -46,7 +45,7 @@ - [Sampling](#sampling) - [Logging and Notifications](#logging-and-notifications) - [Authentication](#authentication) - - [FastMCP Properties](#fastmcp-properties) + - [MCPServer Properties](#mcpserver-properties) - [Session Properties and Methods](#session-properties-and-methods) - [Request Context Properties](#request-context-properties) - [Running Your Server](#running-your-server) @@ -135,19 +134,18 @@ uv run mcp Let's create a simple MCP server that exposes a calculator tool and some data: - + ```python -""" -FastMCP quickstart example. +"""MCPServer quickstart example. Run from the repository root: - uv run examples/snippets/servers/fastmcp_quickstart.py + uv run examples/snippets/servers/mcpserver_quickstart.py """ -from mcp.server.fastmcp import FastMCP +from mcp.server.mcpserver import MCPServer # Create an MCP server -mcp = FastMCP("Demo", json_response=True) +mcp = MCPServer("Demo") # Add an addition tool @@ -179,16 +177,16 @@ def greet_user(name: str, style: str = "friendly") -> str: # Run with streamable HTTP transport if __name__ == "__main__": - mcp.run(transport="streamable-http") + mcp.run(transport="streamable-http", json_response=True) ``` -_Full example: [examples/snippets/servers/fastmcp_quickstart.py](https://github.com/modelcontextprotocol/python-sdk/blob/main/examples/snippets/servers/fastmcp_quickstart.py)_ +_Full example: [examples/snippets/servers/mcpserver_quickstart.py](https://github.com/modelcontextprotocol/python-sdk/blob/main/examples/snippets/servers/mcpserver_quickstart.py)_ You can install this server in [Claude Code](https://docs.claude.com/en/docs/claude-code/mcp) and interact with it right away. First, run the server: ```bash -uv run --with mcp examples/snippets/servers/fastmcp_quickstart.py +uv run --with mcp examples/snippets/servers/mcpserver_quickstart.py ``` Then add it to Claude Code: @@ -218,7 +216,7 @@ The [Model Context Protocol (MCP)](https://modelcontextprotocol.io) lets you bui ### Server -The FastMCP server is your core interface to the MCP protocol. It handles connection management, protocol compliance, and message routing: +The MCPServer server is your core interface to the MCP protocol. It handles connection management, protocol compliance, and message routing: ```python @@ -228,7 +226,7 @@ from collections.abc import AsyncIterator from contextlib import asynccontextmanager from dataclasses import dataclass -from mcp.server.fastmcp import Context, FastMCP +from mcp.server.mcpserver import Context, MCPServer from mcp.server.session import ServerSession @@ -258,7 +256,7 @@ class AppContext: @asynccontextmanager -async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: +async def app_lifespan(server: MCPServer) -> AsyncIterator[AppContext]: """Manage application lifecycle with type-safe context.""" # Initialize on startup db = await Database.connect() @@ -270,7 +268,7 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: # Pass lifespan to server -mcp = FastMCP("My App", lifespan=app_lifespan) +mcp = MCPServer("My App", lifespan=app_lifespan) # Access type-safe lifespan context in tools @@ -290,9 +288,9 @@ Resources are how you expose data to LLMs. They're similar to GET endpoints in a ```python -from mcp.server.fastmcp import FastMCP +from mcp.server.mcpserver import MCPServer -mcp = FastMCP(name="Resource Example") +mcp = MCPServer(name="Resource Example") @mcp.resource("file://documents/{name}") @@ -321,9 +319,9 @@ Tools let LLMs take actions through your server. Unlike resources, tools are exp ```python -from mcp.server.fastmcp import FastMCP +from mcp.server.mcpserver import MCPServer -mcp = FastMCP(name="Tool Example") +mcp = MCPServer(name="Tool Example") @mcp.tool() @@ -342,14 +340,14 @@ def get_weather(city: str, unit: str = "celsius") -> str: _Full example: [examples/snippets/servers/basic_tool.py](https://github.com/modelcontextprotocol/python-sdk/blob/main/examples/snippets/servers/basic_tool.py)_ -Tools can optionally receive a Context object by including a parameter with the `Context` type annotation. This context is automatically injected by the FastMCP framework and provides access to MCP capabilities: +Tools can optionally receive a Context object by including a parameter with the `Context` type annotation. This context is automatically injected by the MCPServer framework and provides access to MCP capabilities: ```python -from mcp.server.fastmcp import Context, FastMCP +from mcp.server.mcpserver import Context, MCPServer from mcp.server.session import ServerSession -mcp = FastMCP(name="Progress Example") +mcp = MCPServer(name="Progress Example") @mcp.tool() @@ -397,7 +395,7 @@ validated data that clients can easily process. **Note:** For backward compatibility, unstructured results are also returned. Unstructured results are provided for backward compatibility with previous versions of the MCP specification, and are quirks-compatible -with previous versions of FastMCP in the current version of the SDK. +with previous versions of MCPServer in the current version of the SDK. **Note:** In cases where a tool function's return type annotation causes the tool to be classified as structured _and this is undesirable_, @@ -416,10 +414,10 @@ from typing import Annotated from pydantic import BaseModel -from mcp.server.fastmcp import FastMCP +from mcp.server.mcpserver import MCPServer from mcp.types import CallToolResult, TextContent -mcp = FastMCP("CallToolResult Example") +mcp = MCPServer("CallToolResult Example") class ValidationModel(BaseModel): @@ -443,7 +441,7 @@ def validated_tool() -> Annotated[CallToolResult, ValidationModel]: """Return CallToolResult with structured output validation.""" return CallToolResult( content=[TextContent(type="text", text="Validated response")], - structuredContent={"status": "success", "data": {"result": 42}}, + structured_content={"status": "success", "data": {"result": 42}}, _meta={"internal": "metadata"}, ) @@ -467,9 +465,9 @@ from typing import TypedDict from pydantic import BaseModel, Field -from mcp.server.fastmcp import FastMCP +from mcp.server.mcpserver import MCPServer -mcp = FastMCP("Structured Output Example") +mcp = MCPServer("Structured Output Example") # Using Pydantic models for rich structured data @@ -569,10 +567,10 @@ Prompts are reusable templates that help LLMs interact with your server effectiv ```python -from mcp.server.fastmcp import FastMCP -from mcp.server.fastmcp.prompts import base +from mcp.server.mcpserver import MCPServer +from mcp.server.mcpserver.prompts import base -mcp = FastMCP(name="Prompt Example") +mcp = MCPServer(name="Prompt Example") @mcp.prompt(title="Code Review") @@ -597,7 +595,7 @@ _Full example: [examples/snippets/servers/basic_prompt.py](https://github.com/mo MCP servers can provide icons for UI display. Icons can be added to the server implementation, tools, resources, and prompts: ```python -from mcp.server.fastmcp import FastMCP, Icon +from mcp.server.mcpserver import MCPServer, Icon # Create an icon from a file path or URL icon = Icon( @@ -607,7 +605,7 @@ icon = Icon( ) # Add icons to server -mcp = FastMCP( +mcp = MCPServer( "My Server", website_url="https://example.com", icons=[icon] @@ -625,21 +623,21 @@ def my_resource(): return "content" ``` -_Full example: [examples/fastmcp/icons_demo.py](https://github.com/modelcontextprotocol/python-sdk/blob/main/examples/fastmcp/icons_demo.py)_ +_Full example: [examples/mcpserver/icons_demo.py](https://github.com/modelcontextprotocol/python-sdk/blob/main/examples/mcpserver/icons_demo.py)_ ### Images -FastMCP provides an `Image` class that automatically handles image data: +MCPServer provides an `Image` class that automatically handles image data: ```python -"""Example showing image handling with FastMCP.""" +"""Example showing image handling with MCPServer.""" from PIL import Image as PILImage -from mcp.server.fastmcp import FastMCP, Image +from mcp.server.mcpserver import Image, MCPServer -mcp = FastMCP("Image Example") +mcp = MCPServer("Image Example") @mcp.tool() @@ -662,9 +660,9 @@ The Context object is automatically injected into tool and resource functions th To use context in a tool or resource function, add a parameter with the `Context` type annotation: ```python -from mcp.server.fastmcp import Context, FastMCP +from mcp.server.mcpserver import Context, MCPServer -mcp = FastMCP(name="Context Example") +mcp = MCPServer(name="Context Example") @mcp.tool() @@ -680,7 +678,7 @@ The Context object provides the following capabilities: - `ctx.request_id` - Unique ID for the current request - `ctx.client_id` - Client ID if available -- `ctx.fastmcp` - Access to the FastMCP server instance (see [FastMCP Properties](#fastmcp-properties)) +- `ctx.mcp_server` - Access to the MCPServer server instance (see [MCPServer Properties](#mcpserver-properties)) - `ctx.session` - Access to the underlying session for advanced communication (see [Session Properties and Methods](#session-properties-and-methods)) - `ctx.request_context` - Access to request-specific data and lifespan resources (see [Request Context Properties](#request-context-properties)) - `await ctx.debug(message)` - Send debug log message @@ -694,10 +692,10 @@ The Context object provides the following capabilities: ```python -from mcp.server.fastmcp import Context, FastMCP +from mcp.server.mcpserver import Context, MCPServer from mcp.server.session import ServerSession -mcp = FastMCP(name="Progress Example") +mcp = MCPServer(name="Progress Example") @mcp.tool() @@ -728,9 +726,8 @@ Client usage: ```python -""" -cd to the `examples/snippets` directory and run: - uv run completion-client +"""cd to the `examples/snippets` directory and run: +uv run completion-client """ import asyncio @@ -758,8 +755,8 @@ async def run(): # List available resource templates templates = await session.list_resource_templates() print("Available resource templates:") - for template in templates.resourceTemplates: - print(f" - {template.uriTemplate}") + for template in templates.resource_templates: + print(f" - {template.uri_template}") # List available prompts prompts = await session.list_prompts() @@ -768,20 +765,20 @@ async def run(): print(f" - {prompt.name}") # Complete resource template arguments - if templates.resourceTemplates: - template = templates.resourceTemplates[0] - print(f"\nCompleting arguments for resource template: {template.uriTemplate}") + if templates.resource_templates: + template = templates.resource_templates[0] + print(f"\nCompleting arguments for resource template: {template.uri_template}") # Complete without context result = await session.complete( - ref=ResourceTemplateReference(type="ref/resource", uri=template.uriTemplate), + ref=ResourceTemplateReference(type="ref/resource", uri=template.uri_template), argument={"name": "owner", "value": "model"}, ) print(f"Completions for 'owner' starting with 'model': {result.completion.values}") # Complete with context - repo suggestions based on owner result = await session.complete( - ref=ResourceTemplateReference(type="ref/resource", uri=template.uriTemplate), + ref=ResourceTemplateReference(type="ref/resource", uri=template.uri_template), argument={"name": "repo", "value": ""}, context_arguments={"owner": "modelcontextprotocol"}, ) @@ -827,12 +824,12 @@ import uuid from pydantic import BaseModel, Field -from mcp.server.fastmcp import Context, FastMCP +from mcp.server.mcpserver import Context, MCPServer from mcp.server.session import ServerSession from mcp.shared.exceptions import UrlElicitationRequiredError from mcp.types import ElicitRequestURLParams -mcp = FastMCP(name="Elicitation Example") +mcp = MCPServer(name="Elicitation Example") class BookingPreferences(BaseModel): @@ -911,7 +908,7 @@ async def connect_service(service_name: str, ctx: Context[ServerSession, None]) mode="url", message=f"Authorization required to connect to {service_name}", url=f"https://{service_name}.example.com/oauth/authorize?elicit={elicitation_id}", - elicitationId=elicitation_id, + elicitation_id=elicitation_id, ) ] ) @@ -934,11 +931,11 @@ Tools can interact with LLMs through sampling (generating text): ```python -from mcp.server.fastmcp import Context, FastMCP +from mcp.server.mcpserver import Context, MCPServer from mcp.server.session import ServerSession from mcp.types import SamplingMessage, TextContent -mcp = FastMCP(name="Sampling Example") +mcp = MCPServer(name="Sampling Example") @mcp.tool() @@ -971,10 +968,10 @@ Tools can send logs and notifications through the context: ```python -from mcp.server.fastmcp import Context, FastMCP +from mcp.server.mcpserver import Context, MCPServer from mcp.server.session import ServerSession -mcp = FastMCP(name="Notifications Example") +mcp = MCPServer(name="Notifications Example") @mcp.tool() @@ -1005,16 +1002,15 @@ MCP servers can use authentication by providing an implementation of the `TokenV ```python -""" -Run from the repository root: - uv run examples/snippets/servers/oauth_server.py +"""Run from the repository root: +uv run examples/snippets/servers/oauth_server.py """ from pydantic import AnyHttpUrl from mcp.server.auth.provider import AccessToken, TokenVerifier from mcp.server.auth.settings import AuthSettings -from mcp.server.fastmcp import FastMCP +from mcp.server.mcpserver import MCPServer class SimpleTokenVerifier(TokenVerifier): @@ -1024,10 +1020,9 @@ class SimpleTokenVerifier(TokenVerifier): pass # This is where you would implement actual token validation -# Create FastMCP instance as a Resource Server -mcp = FastMCP( +# Create MCPServer instance as a Resource Server +mcp = MCPServer( "Weather Service", - json_response=True, # Token verifier for authentication token_verifier=SimpleTokenVerifier(), # Auth settings for RFC 9728 Protected Resource Metadata @@ -1051,7 +1046,7 @@ async def get_weather(city: str = "London") -> dict[str, str]: if __name__ == "__main__": - mcp.run(transport="streamable-http") + mcp.run(transport="streamable-http", json_response=True) ``` _Full example: [examples/snippets/servers/oauth_server.py](https://github.com/modelcontextprotocol/python-sdk/blob/main/examples/snippets/servers/oauth_server.py)_ @@ -1067,19 +1062,19 @@ For a complete example with separate Authorization Server and Resource Server im See [TokenVerifier](src/mcp/server/auth/provider.py) for more details on implementing token validation. -### FastMCP Properties +### MCPServer Properties -The FastMCP server instance accessible via `ctx.fastmcp` provides access to server configuration and metadata: +The MCPServer server instance accessible via `ctx.mcp_server` provides access to server configuration and metadata: -- `ctx.fastmcp.name` - The server's name as defined during initialization -- `ctx.fastmcp.instructions` - Server instructions/description provided to clients -- `ctx.fastmcp.website_url` - Optional website URL for the server -- `ctx.fastmcp.icons` - Optional list of icons for UI display -- `ctx.fastmcp.settings` - Complete server configuration object containing: +- `ctx.mcp_server.name` - The server's name as defined during initialization +- `ctx.mcp_server.instructions` - Server instructions/description provided to clients +- `ctx.mcp_server.website_url` - Optional website URL for the server +- `ctx.mcp_server.icons` - Optional list of icons for UI display +- `ctx.mcp_server.settings` - Complete server configuration object containing: - `debug` - Debug mode flag - `log_level` - Current logging level - `host` and `port` - Server network configuration - - `mount_path`, `sse_path`, `streamable_http_path` - Transport paths + - `sse_path`, `streamable_http_path` - Transport paths - `stateless_http` - Whether the server operates in stateless mode - And other configuration options @@ -1088,12 +1083,12 @@ The FastMCP server instance accessible via `ctx.fastmcp` provides access to serv def server_info(ctx: Context) -> dict: """Get information about the current server.""" return { - "name": ctx.fastmcp.name, - "instructions": ctx.fastmcp.instructions, - "debug_mode": ctx.fastmcp.settings.debug, - "log_level": ctx.fastmcp.settings.log_level, - "host": ctx.fastmcp.settings.host, - "port": ctx.fastmcp.settings.port, + "name": ctx.mcp_server.name, + "instructions": ctx.mcp_server.instructions, + "debug_mode": ctx.mcp_server.settings.debug, + "log_level": ctx.mcp_server.settings.log_level, + "host": ctx.mcp_server.settings.host, + "port": ctx.mcp_server.settings.port, } ``` @@ -1208,9 +1203,9 @@ cd to the `examples/snippets` directory and run: python servers/direct_execution.py """ -from mcp.server.fastmcp import FastMCP +from mcp.server.mcpserver import MCPServer -mcp = FastMCP("My App") +mcp = MCPServer("My App") @mcp.tool() @@ -1239,7 +1234,7 @@ python servers/direct_execution.py uv run mcp run servers/direct_execution.py ``` -Note that `uv run mcp run` or `uv run mcp dev` only supports server using FastMCP and not the low-level server variant. +Note that `uv run mcp run` or `uv run mcp dev` only supports server using MCPServer and not the low-level server variant. ### Streamable HTTP Transport @@ -1247,22 +1242,13 @@ Note that `uv run mcp run` or `uv run mcp dev` only supports server using FastMC ```python +"""Run from the repository root: +uv run examples/snippets/servers/streamable_config.py """ -Run from the repository root: - uv run examples/snippets/servers/streamable_config.py -""" - -from mcp.server.fastmcp import FastMCP -# Stateless server with JSON responses (recommended) -mcp = FastMCP("StatelessServer", stateless_http=True, json_response=True) +from mcp.server.mcpserver import MCPServer -# Other configuration options: -# Stateless server with SSE streaming responses -# mcp = FastMCP("StatelessServer", stateless_http=True) - -# Stateful server with session persistence -# mcp = FastMCP("StatefulServer") +mcp = MCPServer("StatelessServer") # Add a simple tool to demonstrate the server @@ -1273,20 +1259,28 @@ def greet(name: str = "World") -> str: # Run server with streamable_http transport +# Transport-specific options (stateless_http, json_response) are passed to run() if __name__ == "__main__": - mcp.run(transport="streamable-http") + # Stateless server with JSON responses (recommended) + mcp.run(transport="streamable-http", stateless_http=True, json_response=True) + + # Other configuration options: + # Stateless server with SSE streaming responses + # mcp.run(transport="streamable-http", stateless_http=True) + + # Stateful server with session persistence + # mcp.run(transport="streamable-http") ``` _Full example: [examples/snippets/servers/streamable_config.py](https://github.com/modelcontextprotocol/python-sdk/blob/main/examples/snippets/servers/streamable_config.py)_ -You can mount multiple FastMCP servers in a Starlette application: +You can mount multiple MCPServer servers in a Starlette application: ```python -""" -Run from the repository root: - uvicorn examples.snippets.servers.streamable_starlette_mount:app --reload +"""Run from the repository root: +uvicorn examples.snippets.servers.streamable_starlette_mount:app --reload """ import contextlib @@ -1294,10 +1288,10 @@ import contextlib from starlette.applications import Starlette from starlette.routing import Mount -from mcp.server.fastmcp import FastMCP +from mcp.server.mcpserver import MCPServer # Create the Echo server -echo_mcp = FastMCP(name="EchoServer", stateless_http=True, json_response=True) +echo_mcp = MCPServer(name="EchoServer") @echo_mcp.tool() @@ -1307,7 +1301,7 @@ def echo(message: str) -> str: # Create the Math server -math_mcp = FastMCP(name="MathServer", stateless_http=True, json_response=True) +math_mcp = MCPServer(name="MathServer") @math_mcp.tool() @@ -1328,16 +1322,16 @@ async def lifespan(app: Starlette): # Create the Starlette app and mount the MCP servers app = Starlette( routes=[ - Mount("/echo", echo_mcp.streamable_http_app()), - Mount("/math", math_mcp.streamable_http_app()), + Mount("/echo", echo_mcp.streamable_http_app(stateless_http=True, json_response=True)), + Mount("/math", math_mcp.streamable_http_app(stateless_http=True, json_response=True)), ], lifespan=lifespan, ) # Note: Clients connect to http://localhost:8000/echo/mcp and http://localhost:8000/math/mcp # To mount at the root of each path (e.g., /echo instead of /echo/mcp): -# echo_mcp.settings.streamable_http_path = "/" -# math_mcp.settings.streamable_http_path = "/" +# echo_mcp.streamable_http_app(streamable_http_path="/", stateless_http=True, json_response=True) +# math_mcp.streamable_http_app(streamable_http_path="/", stateless_http=True, json_response=True) ``` _Full example: [examples/snippets/servers/streamable_starlette_mount.py](https://github.com/modelcontextprotocol/python-sdk/blob/main/examples/snippets/servers/streamable_starlette_mount.py)_ @@ -1395,8 +1389,7 @@ You can mount the StreamableHTTP server to an existing ASGI server using the `st ```python -""" -Basic example showing how to mount StreamableHTTP server in Starlette. +"""Basic example showing how to mount StreamableHTTP server in Starlette. Run from the repository root: uvicorn examples.snippets.servers.streamable_http_basic_mounting:app --reload @@ -1407,10 +1400,10 @@ import contextlib from starlette.applications import Starlette from starlette.routing import Mount -from mcp.server.fastmcp import FastMCP +from mcp.server.mcpserver import MCPServer # Create MCP server -mcp = FastMCP("My App", json_response=True) +mcp = MCPServer("My App") @mcp.tool() @@ -1427,9 +1420,10 @@ async def lifespan(app: Starlette): # Mount the StreamableHTTP server to the existing ASGI server +# Transport-specific options are passed to streamable_http_app() app = Starlette( routes=[ - Mount("/", app=mcp.streamable_http_app()), + Mount("/", app=mcp.streamable_http_app(json_response=True)), ], lifespan=lifespan, ) @@ -1442,8 +1436,7 @@ _Full example: [examples/snippets/servers/streamable_http_basic_mounting.py](htt ```python -""" -Example showing how to mount StreamableHTTP server using Host-based routing. +"""Example showing how to mount StreamableHTTP server using Host-based routing. Run from the repository root: uvicorn examples.snippets.servers.streamable_http_host_mounting:app --reload @@ -1454,10 +1447,10 @@ import contextlib from starlette.applications import Starlette from starlette.routing import Host -from mcp.server.fastmcp import FastMCP +from mcp.server.mcpserver import MCPServer # Create MCP server -mcp = FastMCP("MCP Host App", json_response=True) +mcp = MCPServer("MCP Host App") @mcp.tool() @@ -1474,9 +1467,10 @@ async def lifespan(app: Starlette): # Mount using Host-based routing +# Transport-specific options are passed to streamable_http_app() app = Starlette( routes=[ - Host("mcp.acme.corp", app=mcp.streamable_http_app()), + Host("mcp.acme.corp", app=mcp.streamable_http_app(json_response=True)), ], lifespan=lifespan, ) @@ -1489,8 +1483,7 @@ _Full example: [examples/snippets/servers/streamable_http_host_mounting.py](http ```python -""" -Example showing how to mount multiple StreamableHTTP servers with path configuration. +"""Example showing how to mount multiple StreamableHTTP servers with path configuration. Run from the repository root: uvicorn examples.snippets.servers.streamable_http_multiple_servers:app --reload @@ -1501,11 +1494,11 @@ import contextlib from starlette.applications import Starlette from starlette.routing import Mount -from mcp.server.fastmcp import FastMCP +from mcp.server.mcpserver import MCPServer # Create multiple MCP servers -api_mcp = FastMCP("API Server", json_response=True) -chat_mcp = FastMCP("Chat Server", json_response=True) +api_mcp = MCPServer("API Server") +chat_mcp = MCPServer("Chat Server") @api_mcp.tool() @@ -1520,12 +1513,6 @@ def send_message(message: str) -> str: return f"Message sent: {message}" -# Configure servers to mount at the root of each path -# This means endpoints will be at /api and /chat instead of /api/mcp and /chat/mcp -api_mcp.settings.streamable_http_path = "/" -chat_mcp.settings.streamable_http_path = "/" - - # Create a combined lifespan to manage both session managers @contextlib.asynccontextmanager async def lifespan(app: Starlette): @@ -1535,11 +1522,12 @@ async def lifespan(app: Starlette): yield -# Mount the servers +# Mount the servers with transport-specific options passed to streamable_http_app() +# streamable_http_path="/" means endpoints will be at /api and /chat instead of /api/mcp and /chat/mcp app = Starlette( routes=[ - Mount("/api", app=api_mcp.streamable_http_app()), - Mount("/chat", app=chat_mcp.streamable_http_app()), + Mount("/api", app=api_mcp.streamable_http_app(json_response=True, streamable_http_path="/")), + Mount("/chat", app=chat_mcp.streamable_http_app(json_response=True, streamable_http_path="/")), ], lifespan=lifespan, ) @@ -1552,8 +1540,7 @@ _Full example: [examples/snippets/servers/streamable_http_multiple_servers.py](h ```python -""" -Example showing path configuration during FastMCP initialization. +"""Example showing path configuration when mounting MCPServer. Run from the repository root: uvicorn examples.snippets.servers.streamable_http_path_config:app --reload @@ -1562,15 +1549,10 @@ Run from the repository root: from starlette.applications import Starlette from starlette.routing import Mount -from mcp.server.fastmcp import FastMCP +from mcp.server.mcpserver import MCPServer -# Configure streamable_http_path during initialization -# This server will mount at the root of wherever it's mounted -mcp_at_root = FastMCP( - "My Server", - json_response=True, - streamable_http_path="/", -) +# Create a simple MCPServer server +mcp_at_root = MCPServer("My Server") @mcp_at_root.tool() @@ -1579,10 +1561,14 @@ def process_data(data: str) -> str: return f"Processed: {data}" -# Mount at /process - endpoints will be at /process instead of /process/mcp +# Mount at /process with streamable_http_path="/" so the endpoint is /process (not /process/mcp) +# Transport-specific options like json_response are passed to streamable_http_app() app = Starlette( routes=[ - Mount("/process", app=mcp_at_root.streamable_http_app()), + Mount( + "/process", + app=mcp_at_root.streamable_http_app(json_response=True, streamable_http_path="/"), + ), ] ) ``` @@ -1599,10 +1585,10 @@ You can mount the SSE server to an existing ASGI server using the `sse_app` meth ```python from starlette.applications import Starlette from starlette.routing import Mount, Host -from mcp.server.fastmcp import FastMCP +from mcp.server.mcpserver import MCPServer -mcp = FastMCP("My App") +mcp = MCPServer("My App") # Mount the SSE server to the existing ASGI server app = Starlette( @@ -1615,41 +1601,28 @@ app = Starlette( app.router.routes.append(Host('mcp.acme.corp', app=mcp.sse_app())) ``` -When mounting multiple MCP servers under different paths, you can configure the mount path in several ways: +You can also mount multiple MCP servers at different sub-paths. The SSE transport automatically detects the mount path via ASGI's `root_path` mechanism, so message endpoints are correctly routed: ```python from starlette.applications import Starlette from starlette.routing import Mount -from mcp.server.fastmcp import FastMCP +from mcp.server.mcpserver import MCPServer # Create multiple MCP servers -github_mcp = FastMCP("GitHub API") -browser_mcp = FastMCP("Browser") -curl_mcp = FastMCP("Curl") -search_mcp = FastMCP("Search") - -# Method 1: Configure mount paths via settings (recommended for persistent configuration) -github_mcp.settings.mount_path = "/github" -browser_mcp.settings.mount_path = "/browser" - -# Method 2: Pass mount path directly to sse_app (preferred for ad-hoc mounting) -# This approach doesn't modify the server's settings permanently +github_mcp = MCPServer("GitHub API") +browser_mcp = MCPServer("Browser") +search_mcp = MCPServer("Search") -# Create Starlette app with multiple mounted servers +# Mount each server at its own sub-path +# The SSE transport automatically uses ASGI's root_path to construct +# the correct message endpoint (e.g., /github/messages/, /browser/messages/) app = Starlette( routes=[ - # Using settings-based configuration Mount("/github", app=github_mcp.sse_app()), Mount("/browser", app=browser_mcp.sse_app()), - # Using direct mount path parameter - Mount("/curl", app=curl_mcp.sse_app("/curl")), - Mount("/search", app=search_mcp.sse_app("/search")), + Mount("/search", app=search_mcp.sse_app()), ] ) - -# Method 3: For direct execution, you can also pass the mount path to run() -if __name__ == "__main__": - search_mcp.run(transport="sse", mount_path="/search") ``` For more information on mounting applications in Starlette, see the [Starlette documentation](https://www.starlette.io/routing/#submounting-routes). @@ -1662,9 +1635,8 @@ For more control, you can use the low-level server implementation directly. This ```python -""" -Run from the repository root: - uv run examples/snippets/servers/lowlevel/lifespan.py +"""Run from the repository root: +uv run examples/snippets/servers/lowlevel/lifespan.py """ from collections.abc import AsyncIterator @@ -1720,7 +1692,7 @@ async def handle_list_tools() -> list[types.Tool]: types.Tool( name="query_db", description="Query the database", - inputSchema={ + input_schema={ "type": "object", "properties": {"query": {"type": "string", "description": "SQL query to execute"}}, "required": ["query"], @@ -1779,8 +1751,7 @@ The lifespan API provides: ```python -""" -Run from the repository root: +"""Run from the repository root: uv run examples/snippets/servers/lowlevel/basic.py """ @@ -1858,9 +1829,8 @@ The low-level server supports structured output for tools, allowing you to retur ```python -""" -Run from the repository root: - uv run examples/snippets/servers/lowlevel/structured_output.py +"""Run from the repository root: +uv run examples/snippets/servers/lowlevel/structured_output.py """ import asyncio @@ -1881,12 +1851,12 @@ async def list_tools() -> list[types.Tool]: types.Tool( name="get_weather", description="Get current weather for a city", - inputSchema={ + input_schema={ "type": "object", "properties": {"city": {"type": "string", "description": "City name"}}, "required": ["city"], }, - outputSchema={ + output_schema={ "type": "object", "properties": { "temperature": {"type": "number", "description": "Temperature in Celsius"}, @@ -1961,9 +1931,8 @@ For full control over the response including the `_meta` field (for passing data ```python -""" -Run from the repository root: - uv run examples/snippets/servers/lowlevel/direct_call_tool_result.py +"""Run from the repository root: +uv run examples/snippets/servers/lowlevel/direct_call_tool_result.py """ import asyncio @@ -1984,7 +1953,7 @@ async def list_tools() -> list[types.Tool]: types.Tool( name="advanced_tool", description="Tool with full control including _meta field", - inputSchema={ + input_schema={ "type": "object", "properties": {"message": {"type": "string"}}, "required": ["message"], @@ -2000,7 +1969,7 @@ async def handle_call_tool(name: str, arguments: dict[str, Any]) -> types.CallTo message = str(arguments.get("message", "")) return types.CallToolResult( content=[types.TextContent(type="text", text=f"Processed: {message}")], - structuredContent={"result": "success", "message": message}, + structured_content={"result": "success", "message": message}, _meta={"hidden": "data for client applications only"}, ) @@ -2041,11 +2010,7 @@ For servers that need to handle large datasets, the low-level server provides pa ```python -""" -Example of implementing pagination with MCP server decorators. -""" - -from pydantic import AnyUrl +"""Example of implementing pagination with MCP server decorators.""" import mcp.types as types from mcp.server.lowlevel import Server @@ -2071,14 +2036,14 @@ async def list_resources_paginated(request: types.ListResourcesRequest) -> types # Get page of resources page_items = [ - types.Resource(uri=AnyUrl(f"resource://items/{item}"), name=item, description=f"Description for {item}") + types.Resource(uri=f"resource://items/{item}", name=item, description=f"Description for {item}") for item in ITEMS[start:end] ] # Determine next cursor next_cursor = str(end) if end < len(ITEMS) else None - return types.ListResourcesResult(resources=page_items, nextCursor=next_cursor) + return types.ListResourcesResult(resources=page_items, next_cursor=next_cursor) ``` _Full example: [examples/snippets/servers/pagination_example.py](https://github.com/modelcontextprotocol/python-sdk/blob/main/examples/snippets/servers/pagination_example.py)_ @@ -2088,9 +2053,7 @@ _Full example: [examples/snippets/servers/pagination_example.py](https://github. ```python -""" -Example of consuming paginated MCP endpoints from a client. -""" +"""Example of consuming paginated MCP endpoints from a client.""" import asyncio @@ -2119,8 +2082,8 @@ async def list_all_resources() -> None: print(f"Fetched {len(result.resources)} resources") # Check if there are more pages - if result.nextCursor: - cursor = result.nextCursor + if result.next_cursor: + cursor = result.next_cursor else: break @@ -2149,16 +2112,13 @@ The SDK provides a high-level client interface for connecting to MCP servers usi ```python -""" -cd to the `examples/snippets/clients` directory and run: - uv run client +"""cd to the `examples/snippets/clients` directory and run: +uv run client """ import asyncio import os -from pydantic import AnyUrl - from mcp import ClientSession, StdioServerParameters, types from mcp.client.stdio import stdio_client from mcp.shared.context import RequestContext @@ -2166,7 +2126,7 @@ from mcp.shared.context import RequestContext # Create server parameters for stdio connection server_params = StdioServerParameters( command="uv", # Using uv to run the server - args=["run", "server", "fastmcp_quickstart", "stdio"], # We're already in snippets dir + args=["run", "server", "mcpserver_quickstart", "stdio"], # We're already in snippets dir env={"UV_INDEX": os.environ.get("UV_INDEX", "")}, ) @@ -2183,7 +2143,7 @@ async def handle_sampling_message( text="Hello, world! from model", ), model="gpt-3.5-turbo", - stopReason="endTurn", + stop_reason="endTurn", ) @@ -2197,7 +2157,7 @@ async def run(): prompts = await session.list_prompts() print(f"Available prompts: {[p.name for p in prompts.prompts]}") - # Get a prompt (greet_user prompt from fastmcp_quickstart) + # Get a prompt (greet_user prompt from mcpserver_quickstart) if prompts.prompts: prompt = await session.get_prompt("greet_user", arguments={"name": "Alice", "style": "friendly"}) print(f"Prompt result: {prompt.messages[0].content}") @@ -2210,18 +2170,18 @@ async def run(): tools = await session.list_tools() print(f"Available tools: {[t.name for t in tools.tools]}") - # Read a resource (greeting resource from fastmcp_quickstart) - resource_content = await session.read_resource(AnyUrl("greeting://World")) + # Read a resource (greeting resource from mcpserver_quickstart) + resource_content = await session.read_resource("greeting://World") content_block = resource_content.contents[0] if isinstance(content_block, types.TextContent): print(f"Resource content: {content_block.text}") - # Call a tool (add tool from fastmcp_quickstart) + # Call a tool (add tool from mcpserver_quickstart) result = await session.call_tool("add", arguments={"a": 5, "b": 3}) result_unstructured = result.content[0] if isinstance(result_unstructured, types.TextContent): print(f"Tool result: {result_unstructured.text}") - result_structured = result.structuredContent + result_structured = result.structured_content print(f"Structured tool result: {result_structured}") @@ -2241,9 +2201,8 @@ Clients can also connect using [Streamable HTTP transport](https://modelcontextp ```python -""" -Run from the repository root: - uv run examples/snippets/clients/streamable_basic.py +"""Run from the repository root: +uv run examples/snippets/clients/streamable_basic.py """ import asyncio @@ -2254,7 +2213,11 @@ from mcp.client.streamable_http import streamable_http_client async def main(): # Connect to a streamable HTTP server - async with streamable_http_client("http://localhost:8000/mcp") as (read_stream, write_stream): + async with streamable_http_client("http://localhost:8000/mcp") as ( + read_stream, + write_stream, + _, + ): # Create a session using the client streams async with ClientSession(read_stream, write_stream) as session: # Initialize the connection @@ -2277,9 +2240,8 @@ When building MCP clients, the SDK provides utilities to help display human-read ```python -""" -cd to the `examples/snippets` directory and run: - uv run display-utilities-client +"""cd to the `examples/snippets` directory and run: +uv run display-utilities-client """ import asyncio @@ -2292,7 +2254,7 @@ from mcp.shared.metadata_utils import get_display_name # Create server parameters for stdio connection server_params = StdioServerParameters( command="uv", # Using uv to run the server - args=["run", "server", "fastmcp_quickstart", "stdio"], + args=["run", "server", "mcpserver_quickstart", "stdio"], env={"UV_INDEX": os.environ.get("UV_INDEX", "")}, ) @@ -2318,7 +2280,7 @@ async def display_resources(session: ClientSession): print(f"Resource: {display_name} ({resource.uri})") templates_response = await session.list_resource_templates() - for template in templates_response.resourceTemplates: + for template in templates_response.resource_templates: display_name = get_display_name(template) print(f"Resource Template: {display_name}") @@ -2362,8 +2324,7 @@ The SDK includes [authorization support](https://modelcontextprotocol.io/specifi ```python -""" -Before running, specify running MCP RS server URL. +"""Before running, specify running MCP RS server URL. To spin up RS server locally, see examples/servers/simple-auth/README.md @@ -2434,7 +2395,7 @@ async def main(): ) async with httpx.AsyncClient(auth=oauth_auth, follow_redirects=True) as custom_client: - async with streamable_http_client("http://localhost:8001/mcp", http_client=custom_client) as (read, write): + async with streamable_http_client("http://localhost:8001/mcp", http_client=custom_client) as (read, write, _): async with ClientSession(read, write) as session: await session.initialize() From 8fdfa697198cc524fc627f026b44c9284090c824 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Fri, 30 Jan 2026 13:02:49 +0100 Subject: [PATCH 7/8] address comments --- src/mcp/client/client.py | 9 +++------ src/mcp/client/streamable_http.py | 2 +- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/src/mcp/client/client.py b/src/mcp/client/client.py index 6add3ddb8..29d4a7035 100644 --- a/src/mcp/client/client.py +++ b/src/mcp/client/client.py @@ -95,8 +95,8 @@ async def main(): elicitation_callback: ElicitationFnT | None = None """Callback for handling elicitation requests.""" - _session: ClientSession | None = None - _exit_stack: AsyncExitStack | None = None + _session: ClientSession | None = field(init=False, default=None) + _exit_stack: AsyncExitStack | None = field(init=False, default=None) _transport: Transport = field(init=False) def __post_init__(self) -> None: @@ -113,10 +113,7 @@ async def __aenter__(self) -> Client: raise RuntimeError("Client is already entered; cannot reenter") async with AsyncExitStack() as exit_stack: - # Transports may return additional values (e.g., streamable_http_client returns a 3-tuple) - # We only need the first two elements (read_stream, write_stream) - streams = await exit_stack.enter_async_context(self._transport) - read_stream, write_stream = streams[0], streams[1] + read_stream, write_stream = await exit_stack.enter_async_context(self._transport) self._session = await exit_stack.enter_async_context( ClientSession( diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index f3be208ca..cbb611419 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -32,10 +32,10 @@ logger = logging.getLogger(__name__) +# TODO(Marcelo): Put the TransportStreams in a module under shared, so we can import here. SessionMessageOrError = SessionMessage | Exception StreamWriter = MemoryObjectSendStream[SessionMessageOrError] StreamReader = MemoryObjectReceiveStream[SessionMessage] -GetSessionIdCallback = Callable[[], str | None] MCP_SESSION_ID = "mcp-session-id" MCP_PROTOCOL_VERSION = "mcp-protocol-version" From afc2aef26d5b06975dcd5fb694ed3ae7d68a45e4 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Fri, 30 Jan 2026 13:03:51 +0100 Subject: [PATCH 8/8] Drop code example from InMemoryTransport --- src/mcp/client/_memory.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/src/mcp/client/_memory.py b/src/mcp/client/_memory.py index e1da4ab61..e6e938673 100644 --- a/src/mcp/client/_memory.py +++ b/src/mcp/client/_memory.py @@ -21,19 +21,6 @@ class InMemoryTransport: This transport starts the server in a background task and provides streams for client-side communication. The server is automatically stopped when the context manager exits. - - Example: - ```python - from mcp.client import Client, ClientSession - from mcp.server.mcpserver import MCPServer - from mcp.client.memory import InMemoryTransport - - server = MCPServer("test") - async with InMemoryTransport(server) as (read_stream, write_stream): - async with ClientSession(read_stream, write_stream) as session: - await session.initialize() - # Use the session... - ``` """ def __init__(self, server: Server[Any] | MCPServer, *, raise_exceptions: bool = False) -> None: