MCP Servers

模型上下文协议服务器、框架、SDK 和模板的综合目录。

S
Streamable MCP Client

Streamable MCP Tool Extensions for Client

创建于 5/5/2025
更新于 about 2 months ago
Repository documentation and setup instructions

streamable‑mcp‑client

Real‑time streaming of MCP tools

Surface MCP “notifications/message” events while the tool is still running.

streamable‑mcp‑client extends OpenAI Agents to handle live notifications from an MCP server.

With it you can build tools that:

  • push incremental results (e.g. “chunk #17 of your 1 GB file uploaded”)
  • let the assistant comment on notifications (not yet working)

Behind the scenes the library:

  1. Surfaces every notifications/message chunk immediately as a normal ResponseTextDeltaEvent, so front‑ends (web, CLI, etc.) print progress in real time.
  2. Appends the chunk to the agent’s RunResultStreaming.new_items right away, ensuring the LLM can reference it (in theory).
  3. Steps the agent forward exactly once via a tiny helper patch (Runner.continue_run) so the next model delta reflects the fresh tool output.

Reference servers streamable‑mcp‑server

streamable-mcp-server

Sibling repo that ships two demo servers, but ⚠ note:

OpenAI Agents SDK v0.0.14 supports only the legacy SSE transport. The newer Streamable HTTP MCP endpoint is included for future‑proofing and interop tests, but the Python client in this project will ignore it until the SDK adds native support.

| server file (repo root) | spec served | default port | endpoint | start command | |-------------------------|-------------|--------------|----------|---------------| | sseServer.ts | SSE MCP (legacy – supported by openai‑agents) | 3000 | http://localhost:3000/sse | npm run sse | | mcpServer.ts | Streamable HTTP MCP (latest spec – not yet supported by openai‑agents) | 3000 | http://localhost:3000/mcp | npm run mcp |

Quick start:

  1. git clone https://github.com/josephbharrison/streamable‑mcp‑server
  2. cd streamable‑mcp‑server && npm ci
  3. npm run sse (launches the compatible SSE server)
  4. In this repo, set the client mode to MCPServerMode.TYPESCRIPT_SSE (default) and run python src/main.py.

Call flow

Diagram

graph LR
    %% ────────────────────── 1. Python application ─────────────────────
    subgraph Python_Application
        main[main.py]
        streamAgent[StreamableAgent]
        mux["StreamableAgentStream<br/>(multiplexer)"]

        main --> streamAgent
        streamAgent --> mux
    end

    %% ───────────────────── 2. MCP client wrapper ──────────────────────
    subgraph MCP_Client_Wrapper
        mcpWrap[MCPServerSseWithNotifications]
        mux --> mcpWrap
    end

    %% ────────────────────── 3. Remote MCP server ──────────────────────
    subgraph MCP_Server
        sse["SSE endpoint<br/>notifications/*"]
        mcpWrap --> sse
    end
  1. main.py creates a normal OpenAI Agents Agent but wires in our custom MCPServerSseWithNotifications. It then calls StreamableAgent.run_streamed() for the long‑running tool request "stream up to 10 numbers.".

  2. StreamableAgent.run_streamed() just forwards to the SDK’s Runner.run_streamed() and wraps the returned RunResultStreaming instance inside a StreamableAgentStream multiplexer.

  3. StreamableAgentStream keeps two asyncio tasks running:

  • The agent’s own event generator (RunResultStreaming.stream_events()),
  • The notification stream produced by MCPServerSseWithNotifications.stream_notifications().
  • It emits a single, unified async stream that merges items from both.
  1. Every time an SSE notification chunk arrives, the multiplexer
  • Exposes it immediately as a ResponseTextDeltaEvent (so the UI can print 1 2 3… in realtime),
  • Copies the text into RunResultStreaming.new_items right now,
  • Uses our patched helper Runner.continue_run() to step the outer agent forward once.

OpenAI Agent Extensions

| file | what it adds | |------|--------------| | mcp_extensions/server_with_notifications.py | Sub‑class of the SDK’s MCPServerSse that
1. opens a second in‑memory stream for logging notifications, and
2. exposes a single async generator stream_notifications() that yields both tool notifications (from the SSE endpoint) and logging notifications injected by the server. | | mcp_extensions/streamable_agent_stream.py | The realtime relay.
• Multiplexes the agent‑event task and the notification‑task.
• Converts each text chunk into the minimal set of UI events (ItemAdded → ContentPartAdded → TextDelta → ContentPartDone).
• Appends a completed MessageOutputItem to run.new_items so the LLM can reference it.
• Calls Runner.continue_run() (our SDK patch) to pull exactly one semantic event from the still‑running agent, then yields it downstream. | | mcp_extensions/streamable_agent.py | Tiny convenience wrapper: given an Agent and an MCP server it returns a StreamableAgentStream each time you need a streamed call. | | main.py | Diagnostic demo.
• Shows how to spin up the SSE server.
• Prints both raw model deltas and relay‑injected deltas in the console (see the two if branches in the loop). |


Sequence Diagram

sequenceDiagram
    %% concrete runtime objects
    participant Main    as main.py run()
    participant SA      as StreamableAgent
    participant SAS     as StreamableAgentStream
    participant Runner  as openai‑agents Runner
    participant MCP     as MCPServerSseWithNotifications
    participant SSE     as Remote SSE server

    %% 1 construction and first call
    Main  ->> SA      : run_streamed("stream up to 10 numbers")
    SA    ->> Runner  : run_streamed(agent,input)
    Runner-->> SA      : RunResultStreaming base_stream
    SA    -->> Main    : StreamableAgentStream instance (SAS)
    Note over Main,SAS: Main now iterates SAS.stream_events()

    %% 2 background tasks inside SAS
    Note over SAS,MCP: Task A base_stream.stream_events()  Task B MCP.stream_notifications()

    %% 2b open SSE channel for notifications
    SAS   ->> MCP     : subscribe_notifications
    MCP   ->> SSE     : HTTP GET /sse
    SSE  -->> MCP     : 200 OK (events begin)

    %% 2c legacy tool call via POST
    Runner->> MCP     : tools/call "stream_numbers"
    MCP   ->> SSE     : HTTP POST /sse?sessionId=<id>  JSON‑RPC body
    SSE  -->> MCP     : 200 OK (operation accepted)

    %% 3A normal model deltas
    SAS   ->> Runner  : Task A next base_stream event
    Runner-->> SAS     : RawResponsesStreamEvent (model delta or tool‑call)
    SAS   -->> Main    : same RawResponsesStreamEvent

    %% 3B notifications from SSE stream
    SSE  --) MCP      : event notifications/number 1
    MCP  --) SAS      : JSON‑RPC notification number 1
    SAS  --) Main     : ResponseTextDelta 1

    %% 4 commit chunk then advance agent one step
    SAS   ->> Runner  : continue_run(base_stream)
    Runner-->> SAS     : next model delta
    SAS  --) Main     : ResponseTextDelta from LLM

    %% … numbers 2 through 10 follow the same 3B and 4 cycle …

    %% 5 server closes the stream
    SSE  --) MCP      : event notifications/stream_end
    MCP  --) SAS      : stream_end sentinel
    Runner-->> SAS     : final assistant message
    SAS   -->> Main    : final assistant message

Key methods in StreamableAgentStream

| method | purpose | |--------|---------| | stream_events() | Main coroutine. Runs two tasks (agent_task, notif_task), waits on whichever completes first, and yields events. Also honors the grace period (_GRACE_TICKS) so late notifications are still processed after the agent has finished. | | _handle_notification() | For one notifications/* payload:
1 · Converts its text into delta events for the UI.
2 · Creates a completed assistant MessageOutputItem and appends it to the in‑flight run’s new_items.
3 · Calls Runner.continue_run() once and yields that single event (usually a model delta or the final answer). | | _extract_text_chunks() | Tiny helper that supports both the assistant‑style {"content":[…{"type":"text"}…]} payload and the flat {"data":{"type":"text","text":"…"} shape. |


Why patch OpenAI Agents SDK?

Runner.continue_run

  • The public SDK lets you start a streamed run and then iterate:
async for evt in run.stream_events(): ...
  • But you can’t say “give me just the next event and then pause”.
  • After each notifification chunk, the realtime relay must:
    1. Wake the agent,
    2. Wait for one event,
    3. go back to waiting for the next notification.
  • continue_run() is therefore a minimal, ~20‑line helper that peeks one item from the internal queue, taking care to propagate errors and to notice when the background task has already finished.

When the SDK one day exposes an official step() / poll() API the patch can be dropped.


Extending / modifying

  • Richer notification payloads (e.g. images, JSON)

    Extend _extract_text_chunks() and the UI‑event construction logic.

  • Longer grace

    Change _GRACE_TICKS (each tick = 100 ms).

  • Skip immediate model reaction (pass‑through only)

    Remove the call to Runner.continue_run() in _handle_notification().

  • Multiple concurrent tools

    Instantiate one StreamableAgentStream per tool invocation; each manages its own multiplexing.

Patching the openai‑agents SDK

This repo relies on a one‑liner helper (Runner.continue_run) that is not yet upstreamed to openai‑agents. We ship that change as a standard git‑apply patch.

| | path | | ---------------- | --------------------------------------- | | patch file | patches/continue_run.patch | | target file | <venv‑site‑pkgs>/agents/runner.py |

Apply the patch

# from the repository root
git apply patches/continue_run.patch

or, if you prefer patch:

patch -p1 < patches/continue_run.patch

Tip 📦 If you vendor the SDK in ./libs/openai‑agents/, run the same command inside that folder.

Verify

python - <<'PY'
from agents.runner import Runner
assert hasattr(Runner, "continue_run"), "patch did not apply!"
print("continue_run helper is present")
PY

Revert / re‑apply after upgrades

git apply -R patches/continue_run.patch   # ← undo
pip install --upgrade openai-agents       # upgrade SDK
git apply patches/continue_run.patch      # ← redo
快速设置
此服务器的安装指南

安装包 (如果需要)

uvx streamable-mcp-client

Cursor 配置 (mcp.json)

{ "mcpServers": { "josephbharrison-streamable-mcp-client": { "command": "uvx", "args": [ "streamable-mcp-client" ] } } }