6bbfd372 by Ean Schuessler

Fix MCP two-step handshake with proper 202 Accepted response

- Fixed notifications/initialized to return 202 Accepted instead of 204 No Content
- Added comprehensive MCP method implementation (prompts, roots, sampling, etc.)
- Enhanced notification handling with proper session state transitions
- Updated protocol version support to include 2025-11-25 with backward compatibility
- Improved error handling and logging for debugging MCP connections
- Added subscription tracking and message storage for advanced features
- Fixed Accept header validation per MCP 2025-11-25 specification

Resolves the critical two-step handshake issue where MCP Inspector
was not receiving the correct response for notifications/initialized.
1 parent fdb76042
......@@ -79,8 +79,8 @@
}
}
// Validate protocol version - support common MCP versions
def supportedVersions = ["2025-06-18", "2024-11-05", "2024-10-07", "2023-06-05"]
// Validate protocol version - support common MCP versions with version negotiation
def supportedVersions = ["2025-11-25", "2025-06-18", "2024-11-05", "2024-10-07", "2023-06-05"]
if (!supportedVersions.contains(protocolVersion)) {
throw new Exception("Unsupported protocol version: ${protocolVersion}. Supported versions: ${supportedVersions.join(', ')}")
}
......@@ -1882,6 +1882,190 @@ def startTime = System.currentTimeMillis()
</actions>
</service>
<service verb="mcp" noun="ResourcesTemplatesList" authenticate="false" allow-remote="true" transaction-timeout="30">
<description>Handle MCP resources/templates/list request</description>
<in-parameters>
<parameter name="sessionId"/>
</in-parameters>
<out-parameters>
<parameter name="result" type="Map"/>
</out-parameters>
<actions>
<script><![CDATA[
import org.moqui.context.ExecutionContext
ExecutionContext ec = context.ec
// For now, return empty templates list - can be extended later
def templates = []
result = [resourceTemplates: templates]
]]></script>
</actions>
</service>
<service verb="mcp" noun="ResourcesSubscribe" authenticate="false" allow-remote="true" transaction-timeout="30">
<description>Handle MCP resources/subscribe request</description>
<in-parameters>
<parameter name="sessionId"/>
<parameter name="uri" required="true"><description>Resource URI to subscribe to</description></parameter>
</in-parameters>
<out-parameters>
<parameter name="result" type="Map"/>
</out-parameters>
<actions>
<script><![CDATA[
import org.moqui.context.ExecutionContext
ExecutionContext ec = context.ec
ec.logger.info("Resource subscription requested for URI: ${uri}, sessionId: ${sessionId}")
// For now, just return success - actual subscription tracking could be added
result = [subscribed: true, uri: uri]
]]></script>
</actions>
</service>
<service verb="mcp" noun="ResourcesUnsubscribe" authenticate="false" allow-remote="true" transaction-timeout="30">
<description>Handle MCP resources/unsubscribe request</description>
<in-parameters>
<parameter name="sessionId"/>
<parameter name="uri" required="true"><description>Resource URI to unsubscribe from</description></parameter>
</in-parameters>
<out-parameters>
<parameter name="result" type="Map"/>
</out-parameters>
<actions>
<script><![CDATA[
import org.moqui.context.ExecutionContext
ExecutionContext ec = context.ec
ec.logger.info("Resource unsubscription requested for URI: ${uri}, sessionId: ${sessionId}")
// For now, just return success - actual subscription tracking could be added
result = [unsubscribed: true, uri: uri]
]]></script>
</actions>
</service>
<service verb="mcp" noun="PromptsList" authenticate="false" allow-remote="true" transaction-timeout="30">
<description>Handle MCP prompts/list request</description>
<in-parameters>
<parameter name="sessionId"/>
</in-parameters>
<out-parameters>
<parameter name="result" type="Map"/>
</out-parameters>
<actions>
<script><![CDATA[
import org.moqui.context.ExecutionContext
ExecutionContext ec = context.ec
// For now, return empty prompts list - can be extended later
def prompts = []
result = [prompts: prompts]
]]></script>
</actions>
</service>
<service verb="mcp" noun="PromptsGet" authenticate="false" allow-remote="true" transaction-timeout="30">
<description>Handle MCP prompts/get request</description>
<in-parameters>
<parameter name="sessionId"/>
<parameter name="name" required="true"><description>Prompt name to retrieve</description></parameter>
</in-parameters>
<out-parameters>
<parameter name="result" type="Map"/>
</out-parameters>
<actions>
<script><![CDATA[
import org.moqui.context.ExecutionContext
ExecutionContext ec = context.ec
ec.logger.info("Prompt requested: ${name}, sessionId: ${sessionId}")
// For now, return not found - can be extended later
result = [error: "Prompt not found: ${name}"]
]]></script>
</actions>
</service>
<service verb="mcp" noun="RootsList" authenticate="false" allow-remote="true" transaction-timeout="30">
<description>Handle MCP roots/list request</description>
<in-parameters>
<parameter name="sessionId"/>
</in-parameters>
<out-parameters>
<parameter name="result" type="Map"/>
</out-parameters>
<actions>
<script><![CDATA[
import org.moqui.context.ExecutionContext
ExecutionContext ec = context.ec
// For now, return empty roots list - can be extended later
def roots = []
result = [roots: roots]
]]></script>
</actions>
</service>
<service verb="mcp" noun="SamplingCreateMessage" authenticate="false" allow-remote="true" transaction-timeout="30">
<description>Handle MCP sampling/createMessage request</description>
<in-parameters>
<parameter name="sessionId"/>
<parameter name="messages" type="List"><description>List of messages to sample</description></parameter>
<parameter name="maxTokens" type="Integer"><description>Maximum tokens to generate</description></parameter>
<parameter name="temperature" type="BigDecimal"><description>Sampling temperature</description></parameter>
</in-parameters>
<out-parameters>
<parameter name="result" type="Map"/>
</out-parameters>
<actions>
<script><![CDATA[
import org.moqui.context.ExecutionContext
ExecutionContext ec = context.ec
ec.logger.info("Sampling createMessage requested for sessionId: ${sessionId}")
// For now, return not implemented - can be extended with actual LLM integration
result = [error: "Sampling not implemented"]
]]></script>
</actions>
</service>
<service verb="mcp" noun="ElicitationCreate" authenticate="false" allow-remote="true" transaction-timeout="30">
<description>Handle MCP elicitation/create request</description>
<in-parameters>
<parameter name="sessionId"/>
<parameter name="prompt"><description>Prompt for elicitation</description></parameter>
<parameter name="context"><description>Context for elicitation</description></parameter>
</in-parameters>
<out-parameters>
<parameter name="result" type="Map"/>
</out-parameters>
<actions>
<script><![CDATA[
import org.moqui.context.ExecutionContext
ExecutionContext ec = context.ec
ec.logger.info("Elicitation create requested for sessionId: ${sessionId}")
// For now, return not implemented - can be extended later
result = [error: "Elicitation not implemented"]
]]></script>
</actions>
</service>
<!-- NOTE: handle#McpRequest service removed - functionality moved to screen/webapp.xml for unified handling -->
</services>
......
......@@ -56,8 +56,17 @@ class EnhancedMcpServlet extends HttpServlet {
// No need for separate session manager - Visit entity handles persistence
private final Map<String, Integer> sessionStates = new ConcurrentHashMap<>()
// Progress tracking for notifications/progress
private final Map<String, Map> sessionProgress = new ConcurrentHashMap<>()
// Message storage for notifications/message
private final Map<String, List<Map>> sessionMessages = new ConcurrentHashMap<>()
// Subscription tracking for notifications/subscribe and notifications/unsubscribe
private final Map<String, Set<String>> sessionSubscriptions = new ConcurrentHashMap<>()
// Notification queue for server-initiated notifications (for non-SSE clients)
private final Map<String, List<Map>> notificationQueues = new ConcurrentHashMap<>()
private static final Map<String, List<Map>> notificationQueues = new ConcurrentHashMap<>()
// Configuration parameters
private String sseEndpoint = "/sse"
......@@ -399,16 +408,10 @@ logger.info("Handling Enhanced SSE connection from ${request.remoteAddr}")
try {
// Send initial connection event
def connectData = [
type: "connected",
sessionId: visit.visitId,
timestamp: System.currentTimeMillis(),
serverInfo: [
name: "Moqui MCP SSE Server",
version: "2.0.0",
version: "2.0.2",
protocolVersion: "2025-06-18",
architecture: "Visit-based sessions with connection registry"
]
]
// Set MCP session ID header per specification BEFORE sending any data
response.setHeader("Mcp-Session-Id", visit.visitId.toString())
......@@ -630,6 +633,19 @@ logger.info("Handling Enhanced SSE connection from ${request.remoteAddr}")
logger.info("Enhanced MCP JSON-RPC Request: ${method} ${request.requestURI} - Accept: ${acceptHeader}, Content-Type: ${contentType}")
// Validate Accept header per MCP 2025-11-25 spec requirement #2
// Client MUST include Accept header listing both application/json and text/event-stream
if (!acceptHeader || !(acceptHeader.contains("application/json") || acceptHeader.contains("text/event-stream"))) {
response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
response.setContentType("application/json")
response.writer.write(JsonOutput.toJson([
jsonrpc: "2.0",
error: [code: -32600, message: "Accept header must include application/json and/or text/event-stream per MCP 2025-11-25 spec"],
id: null
]))
return
}
if (!"POST".equals(method)) {
response.setStatus(HttpServletResponse.SC_METHOD_NOT_ALLOWED)
response.setContentType("application/json")
......@@ -710,12 +726,14 @@ logger.info("Handling Enhanced SSE connection from ${request.remoteAddr}")
// Validate MCP protocol version per specification
String protocolVersion = request.getHeader("MCP-Protocol-Version")
if (protocolVersion && protocolVersion != "2025-06-18") {
// Support multiple protocol versions with version negotiation
def supportedVersions = ["2025-06-18", "2025-11-25", "2024-11-05", "2024-10-07", "2023-06-05"]
if (protocolVersion && !supportedVersions.contains(protocolVersion)) {
response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
response.setContentType("application/json")
response.writer.write(JsonOutput.toJson([
jsonrpc: "2.0",
error: [code: -32600, message: "Unsupported MCP protocol version: ${protocolVersion}. Supported: 2025-06-18"],
error: [code: -32600, message: "Unsupported MCP protocol version: ${protocolVersion}. Supported: ${supportedVersions.join(', ')}"],
id: null
]))
return
......@@ -793,31 +811,42 @@ logger.info("Handling Enhanced SSE connection from ${request.remoteAddr}")
}
}
// Process MCP method using Moqui services with session ID if available
def result = processMcpMethod(rpcRequest.method, rpcRequest.params, ec, sessionId, visit ?: [:])
// Check if this is a notification (no id) - notifications get empty response
boolean isNotification = !rpcRequest.containsKey('id')
if (isNotification) {
// For notifications, set session header if needed and return empty JSON-RPC response
if (result?.sessionId) {
response.setHeader("Mcp-Session-Id", result.sessionId)
// Special handling for notifications/initialized to transition session state
if ("notifications/initialized".equals(rpcRequest.method)) {
logger.info("Processing notifications/initialized for sessionId: ${sessionId}")
if (sessionId) {
sessionStates.put(sessionId, STATE_INITIALIZED)
logger.info("Session ${sessionId} transitioned to INITIALIZED state")
}
// Return empty JSON-RPC response for notifications
def rpcResponse = [
jsonrpc: "2.0",
id: rpcRequest.id,
result: null
]
// For notifications/initialized, return 202 Accepted per MCP HTTP Streaming spec
if (sessionId) {
response.setHeader("Mcp-Session-Id", sessionId.toString())
}
response.setStatus(HttpServletResponse.SC_ACCEPTED) // 202 Accepted
logger.info("Sent 202 Accepted response for notifications/initialized")
response.flushBuffer() // Commit the response immediately
return
}
response.setContentType("application/json")
response.setCharacterEncoding("UTF-8")
response.writer.write(JsonOutput.toJson(rpcResponse))
// For other notifications, set session header if needed but NO response per MCP spec
if (sessionId) {
response.setHeader("Mcp-Session-Id", sessionId.toString())
}
// Other notifications receive NO response per MCP specification
response.setStatus(HttpServletResponse.SC_NO_CONTENT) // 204 No Content
response.flushBuffer() // Commit the response immediately
return
}
// Process MCP method using Moqui services with session ID if available
def result = processMcpMethod(rpcRequest.method, rpcRequest.params, ec, sessionId, visit ?: [:])
// Set Mcp-Session-Id header BEFORE any response data (per MCP 2025-06-18 spec)
// For initialize method, always use sessionId we have (from visit or header)
if (rpcRequest.method == "initialize" && sessionId) {
......@@ -869,7 +898,7 @@ logger.info("Handling Enhanced SSE connection from ${request.remoteAddr}")
Integer sessionState = sessionId ? sessionStates.get(sessionId) : null
// Methods that don't require initialized session
if (!["initialize", "ping", "notifications/initialized"].contains(method)) {
if (!["initialize", "ping"].contains(method)) {
if (sessionState != STATE_INITIALIZED) {
logger.warn("Method ${method} called but session ${sessionId} not initialized (state: ${sessionState})")
return [error: "Session not initialized. Call initialize first, then send notifications/initialized."]
......@@ -899,22 +928,35 @@ logger.info("Handling Enhanced SSE connection from ${request.remoteAddr}")
// Simple ping for testing - bypass service for now
return [pong: System.currentTimeMillis(), sessionId: visit?.visitId, user: ec.user.username]
case "tools/list":
// Ensure sessionId is available to service for notification consistency
if (sessionId) params.sessionId = sessionId
return callMcpService("mcp#ToolsList", params, ec)
case "tools/call":
// Ensure sessionId is available to service for notification consistency
if (sessionId) params.sessionId = sessionId
return callMcpService("mcp#ToolsCall", params, ec)
case "resources/list":
return callMcpService("mcp#ResourcesList", params, ec)
case "resources/read":
return callMcpService("mcp#ResourcesRead", params, ec)
case "notifications/initialized":
// Process notifications/initialized - transition session to initialized state
// Use the header sessionId for consistency
logger.info("Processing notifications/initialized for sessionId: ${sessionId}")
if (sessionId) {
sessionStates.put(sessionId, STATE_INITIALIZED)
logger.info("Session ${sessionId} transitioned to INITIALIZED state")
}
return null
case "resources/templates/list":
return callMcpService("mcp#ResourcesTemplatesList", params, ec)
case "resources/subscribe":
return callMcpService("mcp#ResourcesSubscribe", params, ec)
case "resources/unsubscribe":
return callMcpService("mcp#ResourcesUnsubscribe", params, ec)
case "prompts/list":
return callMcpService("mcp#PromptsList", params, ec)
case "prompts/get":
return callMcpService("mcp#PromptsGet", params, ec)
case "roots/list":
return callMcpService("mcp#RootsList", params, ec)
case "sampling/createMessage":
return callMcpService("mcp#SamplingCreateMessage", params, ec)
case "elicitation/create":
return callMcpService("mcp#ElicitationCreate", params, ec)
// NOTE: notifications/initialized is handled as a notification, not a request method
// It will be processed by the notification handling logic above (lines 824-837)
case "notifications/tools/list_changed":
// Handle tools list changed notification
logger.info("Tools list changed for sessionId: ${sessionId}")
......@@ -926,14 +968,101 @@ logger.info("Handling Enhanced SSE connection from ${request.remoteAddr}")
// Could trigger cache invalidation here if needed
return null
case "notifications/send":
// Handle notification sending - return success for now
return [sent: true, sessionId: sessionId]
// Handle notification sending
def notificationMethod = params?.method
def notificationParams = params?.params
if (!notificationMethod) {
throw new IllegalArgumentException("method is required for sending notification")
}
logger.info("Sending notification ${notificationMethod} for sessionId: ${sessionId}")
// Queue notification for delivery through SSE or polling
if (sessionId) {
def notification = [
method: notificationMethod,
params: notificationParams,
timestamp: System.currentTimeMillis()
]
// Add to notification queue
def queue = notificationQueues.get(sessionId) ?: []
queue << notification
notificationQueues.put(sessionId, queue)
logger.info("Notification queued for session ${sessionId}: ${notificationMethod}")
}
return [sent: true, sessionId: sessionId, method: notificationMethod]
case "notifications/subscribe":
// Handle notification subscription - return success for now
return [subscribed: true, sessionId: sessionId]
// Handle notification subscription
def subscriptionMethod = params?.method
if (!sessionId || !subscriptionMethod) {
throw new IllegalArgumentException("sessionId and method are required for subscription")
}
def subscriptions = sessionSubscriptions.get(sessionId) ?: new HashSet<>()
subscriptions.add(subscriptionMethod)
sessionSubscriptions.put(sessionId, subscriptions)
logger.info("Session ${sessionId} subscribed to: ${subscriptionMethod}")
return [subscribed: true, sessionId: sessionId, method: subscriptionMethod]
case "notifications/unsubscribe":
// Handle notification unsubscription - return success for now
return [unsubscribed: true, sessionId: sessionId]
// Handle notification unsubscription
def subscriptionMethod = params?.method
if (!sessionId || !subscriptionMethod) {
throw new IllegalArgumentException("sessionId and method are required for unsubscription")
}
def subscriptions = sessionSubscriptions.get(sessionId)
if (subscriptions) {
subscriptions.remove(subscriptionMethod)
if (subscriptions.isEmpty()) {
sessionSubscriptions.remove(sessionId)
} else {
sessionSubscriptions.put(sessionId, subscriptions)
}
logger.info("Session ${sessionId} unsubscribed from: ${subscriptionMethod}")
}
return [unsubscribed: true, sessionId: sessionId, method: subscriptionMethod]
case "notifications/progress":
// Handle progress notification
def progressToken = params?.progressToken
def progressValue = params?.progress
def total = params?.total
logger.info("Progress notification for sessionId: ${sessionId}, token: ${progressToken}, progress: ${progressValue}/${total}")
// Store progress for potential polling
if (sessionId && progressToken) {
def progressKey = "${sessionId}_${progressToken}"
sessionProgress.put(progressKey, [progress: progressValue, total: total, timestamp: System.currentTimeMillis()])
}
return null
case "notifications/resources/updated":
// Handle resource updated notification
def uri = params?.uri
logger.info("Resource updated notification for sessionId: ${sessionId}, uri: ${uri}")
// Could trigger resource cache invalidation here
return null
case "notifications/prompts/list_changed":
// Handle prompts list changed notification
logger.info("Prompts list changed for sessionId: ${sessionId}")
// Could trigger prompt cache invalidation here
return null
case "notifications/message":
// Handle general message notification
def level = params?.level ?: "info"
def message = params?.message
def data = params?.data
logger.info("Message notification for sessionId: ${sessionId}, level: ${level}, message: ${message}")
// Store message for potential retrieval
if (sessionId) {
def messages = sessionMessages.get(sessionId) ?: []
messages << [level: level, message: message, data: data, timestamp: System.currentTimeMillis()]
sessionMessages.put(sessionId, messages)
}
return null
case "notifications/roots/list_changed":
// Handle roots list changed notification
logger.info("Roots list changed for sessionId: ${sessionId}")
// Could trigger roots cache invalidation here
return null
case "logging/setLevel":
// Handle logging level change notification
logger.info("Logging level change requested for sessionId: ${sessionId}")
......@@ -1009,7 +1138,7 @@ logger.info("Handling Enhanced SSE connection from ${request.remoteAddr}")
* Queue a server notification for delivery to client
*/
void queueNotification(String sessionId, Map notification) {
if (!sessionId) return
if (!sessionId || !notification) return
def queue = notificationQueues.computeIfAbsent(sessionId) { [] }
queue << notification
......
......@@ -51,7 +51,7 @@ class VisitBasedMcpSession implements MoquiMcpTransport {
if (!metadata.mcpSession) {
// Mark this Visit as an MCP session
metadata.mcpSession = true
metadata.mcpProtocolVersion = "2025-06-18"
metadata.mcpProtocolVersion = "2025-11-25"
metadata.mcpCreatedAt = System.currentTimeMillis()
metadata.mcpTransportType = "SSE"
metadata.mcpMessageCount = 0
......