3a39f97b by Ean Schuessler

Refactor MCP servlet to adapter architecture

Introduce clean adapter layer between Moqui infrastructure and MCP protocol:

- transport/MoquiMcpTransport: Interface abstracting transport concerns
- transport/SseTransport: SSE implementation with session management
- adapter/McpSessionAdapter: Maps Moqui Visit to MCP sessions
- adapter/McpToolAdapter: Maps MCP tools/methods to Moqui services
- adapter/MoquiNotificationMcpBridge: Bridges Moqui notifications to MCP

Simplify EnhancedMcpServlet to orchestrator role, removing inline session
management, SSE logic, and tool dispatch. Remove redundant session
validation in Initialize service (MoquiAuthFilter handles auth).

Delete obsolete files:
- VisitBasedMcpSession.groovy (replaced by McpSessionAdapter)
- JsonRpcMessage.groovy (using plain Maps)
- MoquiMcpTransport.groovy (replaced by new interface)
1 parent 9aefba5e
...@@ -37,46 +37,11 @@ ...@@ -37,46 +37,11 @@
37 // Permissions are handled by Moqui's artifact authorization system 37 // Permissions are handled by Moqui's artifact authorization system
38 // Users must be in appropriate groups (McpUser, MCP_BUSINESS) with access to McpServices artifact group 38 // Users must be in appropriate groups (McpUser, MCP_BUSINESS) with access to McpServices artifact group
39 39
40 // Disable authz to prevent automatic Visit updates during MCP operations 40 // Authentication is handled by MoquiAuthFilter - user context is already set
41 // No need to re-validate session ownership here
41 ec.artifactExecution.disableAuthz() 42 ec.artifactExecution.disableAuthz()
42 43
43 // Get Visit (session) created by servlet and validate access 44 ec.logger.info("MCP Initialize for session ${sessionId}, user ${ec.user.userId}")
44 def visit = ec.entity.find("moqui.server.Visit")
45 .condition("visitId", sessionId)
46 .one()
47
48 if (!visit) {
49 throw new Exception("Invalid session: ${sessionId}")
50 }
51
52 if (visit.userId != ec.user.userId) {
53 throw new Exception("Access denied for session: ${sessionId}")
54 }
55
56 // Update Visit with MCP initialization data
57 UserInfo adminUserInfo = null
58 try {
59 adminUserInfo = ec.user.pushUser("ADMIN")
60 def metadata = [:]
61 try {
62 metadata = groovy.json.JsonSlurper().parseText(visit.initialRequest ?: "{}") as Map
63 } catch (Exception e) {
64 ec.logger.debug("Failed to parse Visit metadata: ${e.message}")
65 }
66
67 metadata.mcpInitialized = true
68 metadata.mcpProtocolVersion = protocolVersion
69 metadata.mcpCapabilities = capabilities
70 metadata.mcpClientInfo = clientInfo
71 metadata.mcpInitializedAt = System.currentTimeMillis()
72
73 // Session metadata stored in memory only - no Visit updates to prevent lock contention
74 ec.logger.info("SESSIONID: ${sessionId} - metadata stored in memory")
75 } finally {
76 if (adminUserInfo != null) {
77 ec.user.popUser()
78 }
79 }
80 45
81 // Validate protocol version - support common MCP versions with version negotiation 46 // Validate protocol version - support common MCP versions with version negotiation
82 def supportedVersions = ["2025-11-25", "2025-06-18", "2024-11-05", "2024-10-07", "2023-06-05"] 47 def supportedVersions = ["2025-11-25", "2025-06-18", "2024-11-05", "2024-10-07", "2023-06-05"]
......
1 /* 1 /*
2 * This software is in the public domain under CC0 1.0 Universal plus a 2 * This software is in the public domain under CC0 1.0 Universal plus a
3 * Grant of Patent License. 3 * Grant of Patent License.
4 * 4 *
5 * To the extent possible under law, author(s) have dedicated all 5 * To the extent possible under law, author(s) have dedicated all
6 * copyright and related and neighboring rights to this software to the 6 * copyright and related and neighboring rights to this software to the
7 * public domain worldwide. This software is distributed without any 7 * public domain worldwide. This software is distributed without any
8 * warranty. 8 * warranty.
9 * 9 *
10 * You should have received a copy of the CC0 Public Domain Dedication 10 * You should have received a copy of the CC0 Public Domain Dedication
11 * along with this software (see the LICENSE.md file). If not, see 11 * along with this software (see the LICENSE.md file). If not, see
12 * <http://creativecommons.org/publicdomain/zero/1.0/>. 12 * <http://creativecommons.org/publicdomain/zero/1.0/>.
...@@ -14,14 +14,17 @@ ...@@ -14,14 +14,17 @@
14 package org.moqui.mcp 14 package org.moqui.mcp
15 15
16 import groovy.json.JsonSlurper 16 import groovy.json.JsonSlurper
17 import org.moqui.impl.context.ExecutionContextFactoryImpl
18 import groovy.json.JsonBuilder
19 import groovy.json.JsonOutput 17 import groovy.json.JsonOutput
18 import org.moqui.impl.context.ExecutionContextFactoryImpl
20 import org.moqui.context.ArtifactAuthorizationException 19 import org.moqui.context.ArtifactAuthorizationException
21 import org.moqui.context.ArtifactTarpitException 20 import org.moqui.context.ArtifactTarpitException
22 import org.moqui.impl.context.ExecutionContextImpl 21 import org.moqui.impl.context.ExecutionContextImpl
23 import org.moqui.entity.EntityValue 22 import org.moqui.entity.EntityValue
24 import org.moqui.context.ExecutionContext 23 import org.moqui.mcp.adapter.McpSessionAdapter
24 import org.moqui.mcp.adapter.McpSession
25 import org.moqui.mcp.adapter.McpToolAdapter
26 import org.moqui.mcp.adapter.MoquiNotificationMcpBridge
27 import org.moqui.mcp.transport.SseTransport
25 import org.slf4j.Logger 28 import org.slf4j.Logger
26 import org.slf4j.LoggerFactory 29 import org.slf4j.LoggerFactory
27 30
...@@ -30,102 +33,93 @@ import jakarta.servlet.ServletException ...@@ -30,102 +33,93 @@ import jakarta.servlet.ServletException
30 import jakarta.servlet.http.HttpServlet 33 import jakarta.servlet.http.HttpServlet
31 import jakarta.servlet.http.HttpServletRequest 34 import jakarta.servlet.http.HttpServletRequest
32 import jakarta.servlet.http.HttpServletResponse 35 import jakarta.servlet.http.HttpServletResponse
33 import java.sql.Timestamp
34 import java.util.concurrent.ConcurrentHashMap
35 import java.util.concurrent.atomic.AtomicBoolean
36 import java.util.UUID
37
38
39 36
40 /** 37 /**
41 * Enhanced MCP Servlet with proper SSE handling inspired by HttpServletSseServerTransportProvider 38 * Enhanced MCP Servlet with adapter-based architecture.
42 * This implementation provides better SSE support and session management. 39 * Uses adapters for session management, tool dispatch, and notifications.
40 * This servlet acts as an orchestrator, delegating to specialized adapters.
43 */ 41 */
44 class EnhancedMcpServlet extends HttpServlet { 42 class EnhancedMcpServlet extends HttpServlet {
45 protected final static Logger logger = LoggerFactory.getLogger(EnhancedMcpServlet.class) 43 protected final static Logger logger = LoggerFactory.getLogger(EnhancedMcpServlet.class)
46 44
47 private JsonSlurper jsonSlurper = new JsonSlurper() 45 private JsonSlurper jsonSlurper = new JsonSlurper()
48 46
49 // Session state constants 47 // Adapter instances
50 private static final int STATE_UNINITIALIZED = 0 48 private McpSessionAdapter sessionAdapter
51 private static final int STATE_INITIALIZING = 1 49 private McpToolAdapter toolAdapter
52 private static final int STATE_INITIALIZED = 2 50 private SseTransport transport
53 51 private MoquiNotificationMcpBridge notificationBridge
54 // Simple registry for active connections only (transient HTTP connections) 52
55 private final Map<String, PrintWriter> activeConnections = new ConcurrentHashMap<>()
56
57 // Session management using Moqui's Visit system directly
58 // No need for separate session manager - Visit entity handles persistence
59 private final Map<String, Integer> sessionStates = new ConcurrentHashMap<>()
60
61 // Message storage for notifications/subscribe and notifications/unsubscribe
62 private final Map<String, List<Map>> sessionMessages = new ConcurrentHashMap<>()
63
64 // In-memory session tracking to avoid database access for read operations
65 private final Map<String, String> sessionUsers = new ConcurrentHashMap<>()
66
67 // Progress tracking for notifications/progress
68 private final Map<String, Map> sessionProgress = new ConcurrentHashMap<>()
69
70 // Visit cache to reduce database access and prevent lock contention 53 // Visit cache to reduce database access and prevent lock contention
71 private final Map<String, EntityValue> visitCache = new ConcurrentHashMap<>() 54 private final Map<String, EntityValue> visitCache = new java.util.concurrent.ConcurrentHashMap<>()
72 55
73 // Notification queue for server-initiated notifications (for non-SSE clients) 56 // Throttled session activity tracking
74 private static final Map<String, List<Map>> notificationQueues = new ConcurrentHashMap<>() 57 private final Map<String, Long> lastActivityUpdate = new java.util.concurrent.ConcurrentHashMap<>()
75
76 // Throttled session activity tracking to prevent database lock contention
77 private final Map<String, Long> lastActivityUpdate = new ConcurrentHashMap<>()
78 private static final long ACTIVITY_UPDATE_INTERVAL_MS = 30000 // 30 seconds 58 private static final long ACTIVITY_UPDATE_INTERVAL_MS = 30000 // 30 seconds
79 59
80 // Session-specific locks to avoid sessionId.intern() deadlocks
81 private final Map<String, Object> sessionLocks = new ConcurrentHashMap<>()
82
83 // Configuration parameters 60 // Configuration parameters
84 private String sseEndpoint = "/sse" 61 private String sseEndpoint = "/sse"
85 private String messageEndpoint = "/message" 62 private String messageEndpoint = "/message"
86 private int keepAliveIntervalSeconds = 30 63 private int keepAliveIntervalSeconds = 30
87 private int maxConnections = 100 64 private int maxConnections = 100
88 65
89 @Override 66 @Override
90 void init(ServletConfig config) throws ServletException { 67 void init(ServletConfig config) throws ServletException {
91 super.init(config) 68 super.init(config)
92 69
70 // Initialize adapters
71 sessionAdapter = new McpSessionAdapter()
72 toolAdapter = new McpToolAdapter()
73 transport = new SseTransport(sessionAdapter)
74
75 // Initialize notification bridge
76 notificationBridge = new MoquiNotificationMcpBridge()
77
93 // Read configuration from servlet init parameters 78 // Read configuration from servlet init parameters
94 sseEndpoint = config.getInitParameter("sseEndpoint") ?: sseEndpoint 79 sseEndpoint = config.getInitParameter("sseEndpoint") ?: sseEndpoint
95 messageEndpoint = config.getInitParameter("messageEndpoint") ?: messageEndpoint 80 messageEndpoint = config.getInitParameter("messageEndpoint") ?: messageEndpoint
96 keepAliveIntervalSeconds = config.getInitParameter("keepAliveIntervalSeconds")?.toInteger() ?: keepAliveIntervalSeconds 81 keepAliveIntervalSeconds = config.getInitParameter("keepAliveIntervalSeconds")?.toInteger() ?: keepAliveIntervalSeconds
97 maxConnections = config.getInitParameter("maxConnections")?.toInteger() ?: maxConnections 82 maxConnections = config.getInitParameter("maxConnections")?.toInteger() ?: maxConnections
98 83
99 String webappName = config.getInitParameter("moqui-name") ?: 84 String webappName = config.getInitParameter("moqui-name") ?:
100 config.getServletContext().getInitParameter("moqui-name") 85 config.getServletContext().getInitParameter("moqui-name")
101 86
102 // Register servlet instance in context for service access 87 // Register servlet instance in context for service access
103 config.getServletContext().setAttribute("enhancedMcpServlet", this) 88 config.getServletContext().setAttribute("enhancedMcpServlet", this)
104 89
105 logger.info("EnhancedMcpServlet initialized for webapp ${webappName}") 90 // Get ECF and register notification bridge
91 ExecutionContextFactoryImpl ecfi =
92 (ExecutionContextFactoryImpl) config.getServletContext().getAttribute("executionContextFactory")
93 if (ecfi) {
94 notificationBridge.init(ecfi)
95 notificationBridge.setTransport(transport)
96 ecfi.registerNotificationMessageListener(notificationBridge)
97 logger.info("Registered MoquiNotificationMcpBridge with ECF")
98 }
99
100 logger.info("EnhancedMcpServlet initialized with adapter architecture for webapp ${webappName}")
106 logger.info("SSE endpoint: ${sseEndpoint}, Message endpoint: ${messageEndpoint}") 101 logger.info("SSE endpoint: ${sseEndpoint}, Message endpoint: ${messageEndpoint}")
107 logger.info("Keep-alive interval: ${keepAliveIntervalSeconds}s, Max connections: ${maxConnections}") 102 logger.info("Keep-alive interval: ${keepAliveIntervalSeconds}s, Max connections: ${maxConnections}")
108 logger.info("Servlet instance registered in context as 'enhancedMcpServlet'")
109 } 103 }
110 104
111 @Override 105 @Override
112 void service(HttpServletRequest request, HttpServletResponse response) 106 void service(HttpServletRequest request, HttpServletResponse response)
113 throws ServletException, IOException { 107 throws ServletException, IOException {
114 108
115 ExecutionContextFactoryImpl ecfi = 109 ExecutionContextFactoryImpl ecfi =
116 (ExecutionContextFactoryImpl) getServletContext().getAttribute("executionContextFactory") 110 (ExecutionContextFactoryImpl) getServletContext().getAttribute("executionContextFactory")
117 String webappName = getInitParameter("moqui-name") ?: 111 String webappName = getInitParameter("moqui-name") ?:
118 getServletContext().getInitParameter("moqui-name") 112 getServletContext().getInitParameter("moqui-name")
119 113
120 if (ecfi == null || webappName == null) { 114 if (ecfi == null || webappName == null) {
121 response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, 115 response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
122 "System is initializing, try again soon.") 116 "System is initializing, try again soon.")
123 return 117 return
124 } 118 }
125 119
126 // Handle CORS 120 // Handle CORS
127 if (handleCors(request, response, webappName, ecfi)) return 121 if (handleCors(request, response)) return
128 122
129 long startTime = System.currentTimeMillis() 123 long startTime = System.currentTimeMillis()
130 124
131 if (logger.traceEnabled) { 125 if (logger.traceEnabled) {
...@@ -137,9 +131,9 @@ class EnhancedMcpServlet extends HttpServlet { ...@@ -137,9 +131,9 @@ class EnhancedMcpServlet extends HttpServlet {
137 logger.warn("No ExecutionContext found from MoquiAuthFilter, creating new one") 131 logger.warn("No ExecutionContext found from MoquiAuthFilter, creating new one")
138 ec = ecfi.getEci() 132 ec = ecfi.getEci()
139 } 133 }
140 134
141 try { 135 try {
142 // Read request body VERY early before any other processing can consume it 136 // Read request body early before any other processing can consume it
143 String requestBody = null 137 String requestBody = null
144 if ("POST".equals(request.getMethod())) { 138 if ("POST".equals(request.getMethod())) {
145 try { 139 try {
...@@ -178,7 +172,7 @@ class EnhancedMcpServlet extends HttpServlet { ...@@ -178,7 +172,7 @@ class EnhancedMcpServlet extends HttpServlet {
178 ])) 172 ]))
179 return 173 return
180 } 174 }
181 175
182 // Get Visit created by web facade 176 // Get Visit created by web facade
183 def visit = ec.user.getVisit() 177 def visit = ec.user.getVisit()
184 if (!visit) { 178 if (!visit) {
...@@ -186,7 +180,7 @@ class EnhancedMcpServlet extends HttpServlet { ...@@ -186,7 +180,7 @@ class EnhancedMcpServlet extends HttpServlet {
186 response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Failed to create Visit") 180 response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Failed to create Visit")
187 return 181 return
188 } 182 }
189 183
190 // Route based on request method and path 184 // Route based on request method and path
191 String requestURI = request.getRequestURI() 185 String requestURI = request.getRequestURI()
192 String method = request.getMethod() 186 String method = request.getMethod()
...@@ -197,17 +191,14 @@ class EnhancedMcpServlet extends HttpServlet { ...@@ -197,17 +191,14 @@ class EnhancedMcpServlet extends HttpServlet {
197 } else if ("POST".equals(method) && requestURI.endsWith("/message")) { 191 } else if ("POST".equals(method) && requestURI.endsWith("/message")) {
198 handleMessage(request, response, ec, requestBody) 192 handleMessage(request, response, ec, requestBody)
199 } else if ("POST".equals(method) && (requestURI.equals("/mcp") || requestURI.endsWith("/mcp"))) { 193 } else if ("POST".equals(method) && (requestURI.equals("/mcp") || requestURI.endsWith("/mcp"))) {
200 // Handle POST requests to /mcp for JSON-RPC
201 logger.debug("About to call handleJsonRpc with visit: ${visit?.visitId}")
202 handleJsonRpc(request, response, ec, webappName, requestBody, visit) 194 handleJsonRpc(request, response, ec, webappName, requestBody, visit)
203 } else if ("GET".equals(method) && (requestURI.equals("/mcp") || requestURI.endsWith("/mcp"))) { 195 } else if ("GET".equals(method) && (requestURI.equals("/mcp") || requestURI.endsWith("/mcp"))) {
204 // Handle GET requests to /mcp - SSE connection for streaming
205 handleSseConnection(request, response, ec, webappName) 196 handleSseConnection(request, response, ec, webappName)
206 } else { 197 } else {
207 // Fallback to JSON-RPC handling 198 // Fallback to JSON-RPC handling
208 handleJsonRpc(request, response, ec, webappName, requestBody, visit) 199 handleJsonRpc(request, response, ec, webappName, requestBody, visit)
209 } 200 }
210 201
211 } catch (ArtifactAuthorizationException e) { 202 } catch (ArtifactAuthorizationException e) {
212 logger.warn("Enhanced MCP Access Forbidden (no authz): " + e.message) 203 logger.warn("Enhanced MCP Access Forbidden (no authz): " + e.message)
213 response.setStatus(HttpServletResponse.SC_FORBIDDEN) 204 response.setStatus(HttpServletResponse.SC_FORBIDDEN)
...@@ -230,207 +221,158 @@ class EnhancedMcpServlet extends HttpServlet { ...@@ -230,207 +221,158 @@ class EnhancedMcpServlet extends HttpServlet {
230 logger.error("Error in Enhanced MCP request", t) 221 logger.error("Error in Enhanced MCP request", t)
231 response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR) 222 response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR)
232 response.setContentType("application/json") 223 response.setContentType("application/json")
233 // Use simple JSON string to avoid Groovy JSON library issues
234 def errorMsg = t.message?.toString() ?: "Unknown error" 224 def errorMsg = t.message?.toString() ?: "Unknown error"
235 response.writer.write("{\"jsonrpc\":\"2.0\",\"error\":{\"code\":-32603,\"message\":\"Internal error: ${errorMsg.replace("\"", "\\\"")}\"},\"id\":null}") 225 response.writer.write("{\"jsonrpc\":\"2.0\",\"error\":{\"code\":-32603,\"message\":\"Internal error: ${errorMsg.replace("\"", "\\\"")}\"},\"id\":null}")
236 } 226 }
237 } 227 }
238 228
239 private void handleSseConnection(HttpServletRequest request, HttpServletResponse response, ExecutionContextImpl ec, String webappName) 229 private void handleSseConnection(HttpServletRequest request, HttpServletResponse response, ExecutionContextImpl ec, String webappName)
240 throws IOException { 230 throws IOException {
241 231
242 logger.debug("Handling Enhanced SSE connection from ${request.remoteAddr}") 232 logger.debug("Handling Enhanced SSE connection from ${request.remoteAddr}")
243 233
244 // Check for existing session ID first 234 // Check for existing session ID
245 String sessionId = request.getHeader("Mcp-Session-Id") 235 String sessionId = request.getHeader("Mcp-Session-Id")
246 def visit = null 236 def visit = null
247 237 String userId = ec.user.userId?.toString()
248 // If we have a session ID, validate using in-memory tracking 238
239 // If we have a session ID, validate it
249 if (sessionId) { 240 if (sessionId) {
250 try { 241 def session = sessionAdapter.getSession(sessionId)
251 String sessionUser = sessionUsers.get(sessionId) 242 if (session) {
252 243 // Verify user has access
253 if (sessionUser) { 244 if (session.userId != userId) {
254 // Verify user has access to this session using in-memory data 245 logger.warn("Session userId ${session.userId} doesn't match current user ${userId} - access denied")
255 if (!ec.user.userId || sessionUser != ec.user.userId.toString()) { 246 response.sendError(HttpServletResponse.SC_FORBIDDEN, "Access denied for session: " + sessionId)
256 logger.warn("Session userId ${sessionUser} doesn't match current user userId ${ec.user.userId} - access denied")
257 response.sendError(HttpServletResponse.SC_FORBIDDEN, "Access denied for session: " + sessionId)
258 return
259 }
260 // Get Visit from cache for activity updates (but not for validation)
261 visit = getCachedVisit(ec, sessionId)
262 } else {
263 logger.warn("Session not found in memory: ${sessionId}")
264 response.sendError(HttpServletResponse.SC_NOT_FOUND, "Session not found: " + sessionId)
265 return 247 return
266 } 248 }
267 } catch (Exception e) { 249 visit = getCachedVisit(ec, sessionId)
268 logger.error("Error validating session: ${e.message}", e) 250 } else {
269 response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Session validation error") 251 logger.warn("Session not found: ${sessionId}")
252 response.sendError(HttpServletResponse.SC_NOT_FOUND, "Session not found: " + sessionId)
270 return 253 return
271 } 254 }
272 } 255 }
273 256
274 // Only create new Visit if we didn't find an existing one 257 // Create new Visit/session if needed
275 if (!visit) { 258 if (!visit) {
276 // Initialize web facade for Visit creation, but avoid screen resolution 259 try {
277 // Modify request path to avoid ScreenResourceNotFoundException 260 ec.initWebFacade(webappName, request, response)
278 String originalRequestURI = request.getRequestURI() 261 visit = ec.user.getVisit()
279 String originalPathInfo = request.getPathInfo() 262 if (!visit) {
280 request.setAttribute("jakarta.servlet.include.request_uri", "/mcp") 263 throw new Exception("Web facade succeeded but no Visit created")
281 request.setAttribute("jakarta.servlet.include.path_info", "") 264 }
282 265
283 try { 266 // Create session in adapter with authenticated userId
284 ec.initWebFacade(webappName, request, response) 267 sessionId = visit.visitId?.toString()
285 // Web facade should always create a Visit - if it doesn't, that's a system error 268 sessionAdapter.createSession(sessionId, ec.user.userId?.toString())
286 visit = ec.user.getVisit() 269 logger.info("Created new session ${sessionId} for user ${ec.user.username}")
287 if (!visit) { 270
288 logger.error("Web facade succeeded but no Visit created - this is a system configuration error") 271 } catch (Exception e) {
289 throw new Exception("Web facade succeeded but no Visit created - check Moqui configuration") 272 logger.error("Failed to create session: ${e.message}", e)
273 response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Failed to create session")
274 return
290 } 275 }
291 logger.debug("Web facade created Visit ${visit.visitId} for user ${ec.user.username}")
292 // Store user mapping in memory for fast validation
293 sessionUsers.put(visit.visitId.toString(), ec.user.userId.toString())
294 logger.info("Created new Visit ${visit.visitId} for user ${ec.user.username}")
295 } catch (Exception e) {
296 logger.error("Web facade initialization failed - this is a system configuration error: ${e.message}", e)
297 response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "System configuration error: Web facade failed to initialize. Check Moqui logs for details.")
298 return
299 }
300 } 276 }
301 277
302 // Final check that we have a Visit
303 if (!visit) {
304 response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Failed to create Visit")
305 return
306 }
307
308 // Enable async support for SSE 278 // Enable async support for SSE
309 if (request.isAsyncSupported()) { 279 if (request.isAsyncSupported()) {
310 request.startAsync() 280 request.startAsync()
311 } 281 }
312 282
313 // Set SSE headers 283 // Set SSE headers
314 response.setContentType("text/event-stream") 284 response.setContentType("text/event-stream")
315 response.setCharacterEncoding("UTF-8") 285 response.setCharacterEncoding("UTF-8")
316 response.setHeader("Cache-Control", "no-cache") 286 response.setHeader("Cache-Control", "no-cache")
317 response.setHeader("Connection", "keep-alive") 287 response.setHeader("Connection", "keep-alive")
318 response.setHeader("Access-Control-Allow-Origin", "*") 288 response.setHeader("Access-Control-Allow-Origin", "*")
319 response.setHeader("X-Accel-Buffering", "no") // Disable nginx buffering 289 response.setHeader("X-Accel-Buffering", "no")
320 290 response.setHeader("Mcp-Session-Id", sessionId)
321 // Register active connection (transient HTTP connection) 291
322 activeConnections.put(visit.visitId, response.writer) 292 // Register SSE writer with transport
323 293 transport.registerSseWriter(sessionId, response.writer)
324 // Create Visit-based session transport (for persistence) 294
325 VisitBasedMcpSession session = new VisitBasedMcpSession(visit, response.writer, ec)
326
327 try { 295 try {
328 // Check if this is old HTTP+SSE transport (no session ID, no prior initialization) 296 // Send endpoint event for backwards compatibility
329 // Send endpoint event first for backwards compatibility
330 if (!request.getHeader("Mcp-Session-Id")) { 297 if (!request.getHeader("Mcp-Session-Id")) {
331 logger.debug("No Mcp-Session-Id header detected, assuming old HTTP+SSE transport") 298 transport.sendSseEventWithId(response.writer, "endpoint", "/mcp", 0)
332 sendSseEvent(response.writer, "endpoint", "/mcp", 0)
333 } 299 }
334 300
335 // Send initial connection event for new transport 301 // Send connect event
336 def connectData = [ 302 def connectData = [
337 version: "2.0.2", 303 version: "2.0.2",
338 protocolVersion: "2025-06-18", 304 protocolVersion: "2025-06-18",
339 architecture: "Visit-based sessions with connection registry" 305 architecture: "Adapter-based MCP with session registry"
340 ] 306 ]
341 307 transport.sendSseEventWithId(response.writer, "connect", JsonOutput.toJson(connectData), 1)
342 // Set MCP session ID header per specification BEFORE sending any data 308
343 response.setHeader("Mcp-Session-Id", visit.visitId.toString()) 309 // Deliver any queued notifications
344 logger.debug("Set Mcp-Session-Id header to ${visit.visitId} for SSE connection") 310 transport.deliverQueuedNotifications(sessionId)
345 311
346 sendSseEvent(response.writer, "connect", JsonOutput.toJson(connectData), 1)
347
348 // Keep connection alive with periodic pings 312 // Keep connection alive with periodic pings
349 int pingCount = 0 313 int pingCount = 0
350 while (!response.isCommitted() && pingCount < 60) { // 5 minutes max 314 while (!response.isCommitted() && pingCount < 60) {
351 Thread.sleep(5000) // Wait 5 seconds 315 Thread.sleep(5000)
352 316
353 if (!response.isCommitted()) { 317 if (!response.isCommitted()) {
354 def pingData = [ 318 if (!transport.sendPing(sessionId)) {
355 type: "ping", 319 logger.debug("Ping failed for session ${sessionId}, ending SSE loop")
356 timestamp: System.currentTimeMillis(), 320 break
357 sessionId: visit.visitId, 321 }
358 architecture: "Visit-based sessions"
359 ]
360 sendSseEvent(response.writer, "ping", JsonOutput.toJson(pingData), pingCount + 2)
361 pingCount++ 322 pingCount++
362 323
363 // Update session activity throttled (every 6th ping = every 30 seconds) 324 // Update session activity throttled
364 if (pingCount % 6 == 0) { 325 if (pingCount % 6 == 0) {
365 updateSessionActivityThrottled(visit.visitId.toString()) 326 updateSessionActivityThrottled(sessionId)
366 } 327 }
367 } 328 }
368 } 329 }
369 330
370 } catch (InterruptedException e) { 331 } catch (InterruptedException e) {
371 logger.info("SSE connection interrupted for session ${visit.visitId}") 332 logger.info("SSE connection interrupted for session ${sessionId}")
372 Thread.currentThread().interrupt() 333 Thread.currentThread().interrupt()
373 } catch (Exception e) { 334 } catch (Exception e) {
374 logger.warn("Enhanced SSE connection error: ${e.message}", e) 335 logger.warn("Enhanced SSE connection error: ${e.message}", e)
375 } finally { 336 } finally {
376 // Clean up session - Visit persistence handles cleanup automatically 337 // Clean up
338 transport.unregisterSseWriter(sessionId)
339
340 // Complete async context if available
341 if (request.isAsyncStarted()) {
377 try { 342 try {
378 def closeData = [ 343 request.getAsyncContext().complete()
379 type: "disconnected",
380 sessionId: visit.visitId,
381 timestamp: System.currentTimeMillis()
382 ]
383 sendSseEvent(response.writer, "disconnect", JsonOutput.toJson(closeData), -1)
384 } catch (Exception e) { 344 } catch (Exception e) {
385 // Ignore errors during cleanup 345 logger.debug("Error completing async context: ${e.message}")
386 }
387
388 // Remove from active connections registry
389 activeConnections.remove(visit.visitId)
390
391 // Complete async context if available
392 if (request.isAsyncStarted()) {
393 try {
394 request.getAsyncContext().complete()
395 } catch (Exception e) {
396 logger.debug("Error completing async context: ${e.message}")
397 } 346 }
398 } 347 }
399 } 348 }
400 } 349 }
401 350
402 private void handleMessage(HttpServletRequest request, HttpServletResponse response, ExecutionContextImpl ec, String requestBody) 351 private void handleMessage(HttpServletRequest request, HttpServletResponse response, ExecutionContextImpl ec, String requestBody)
403 throws IOException { 352 throws IOException {
404 353
405 String sessionId = request.getHeader("Mcp-Session-Id") 354 String sessionId = request.getHeader("Mcp-Session-Id")
406 def visit = getCachedVisit(ec, sessionId) 355 def session = sessionAdapter.getSession(sessionId)
407 if (!visit) { 356
357 if (!session) {
408 response.sendError(HttpServletResponse.SC_NOT_FOUND, "Session not found: " + sessionId) 358 response.sendError(HttpServletResponse.SC_NOT_FOUND, "Session not found: " + sessionId)
409 return 359 return
410 } 360 }
411 361
412 // Verify user has access to this Visit - rely on Moqui security 362 // Verify user has access
413 logger.debug("Session validation: visit.userId=${visit.userId}, ec.user.userId=${ec.user.userId}, ec.user.username=${ec.user.username}") 363 if (session.userId != ec.user.userId?.toString()) {
414 if (visit.userId && ec.user.userId && visit.userId.toString() != ec.user.userId.toString()) {
415 logger.warn("Visit userId ${visit.userId} doesn't match current user userId ${ec.user.userId} - access denied")
416 response.setContentType("application/json")
417 response.setCharacterEncoding("UTF-8")
418 response.setStatus(HttpServletResponse.SC_FORBIDDEN) 364 response.setStatus(HttpServletResponse.SC_FORBIDDEN)
365 response.setContentType("application/json")
419 response.writer.write(JsonOutput.toJson([ 366 response.writer.write(JsonOutput.toJson([
420 error: "Access denied for session: " + sessionId + " (visit.userId=${visit.userId}, ec.user.userId=${ec.user.userId})", 367 error: "Access denied for session: " + sessionId
421 architecture: "Visit-based sessions"
422 ])) 368 ]))
423 return 369 return
424 } 370 }
425 371
426 // Create session wrapper for this Visit
427 VisitBasedMcpSession session = new VisitBasedMcpSession(visit, response.writer, ec)
428
429 try { 372 try {
430 if (!requestBody || !requestBody.trim()) { 373 if (!requestBody || !requestBody.trim()) {
431 response.setContentType("application/json")
432 response.setCharacterEncoding("UTF-8")
433 response.setStatus(HttpServletResponse.SC_BAD_REQUEST) 374 response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
375 response.setContentType("application/json")
434 response.writer.write(JsonOutput.toJson([ 376 response.writer.write(JsonOutput.toJson([
435 jsonrpc: "2.0", 377 jsonrpc: "2.0",
436 error: [code: -32602, message: "Empty request body"], 378 error: [code: -32602, message: "Empty request body"],
...@@ -438,16 +380,14 @@ class EnhancedMcpServlet extends HttpServlet { ...@@ -438,16 +380,14 @@ class EnhancedMcpServlet extends HttpServlet {
438 ])) 380 ]))
439 return 381 return
440 } 382 }
441 383
442 // Parse JSON-RPC message 384 // Parse JSON-RPC message
443 def rpcRequest 385 def rpcRequest
444 try { 386 try {
445 rpcRequest = jsonSlurper.parseText(requestBody) 387 rpcRequest = jsonSlurper.parseText(requestBody)
446 } catch (Exception e) { 388 } catch (Exception e) {
447 logger.error("Failed to parse JSON-RPC message: ${e.message}")
448 response.setContentType("application/json")
449 response.setCharacterEncoding("UTF-8")
450 response.setStatus(HttpServletResponse.SC_BAD_REQUEST) 389 response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
390 response.setContentType("application/json")
451 response.writer.write(JsonOutput.toJson([ 391 response.writer.write(JsonOutput.toJson([
452 jsonrpc: "2.0", 392 jsonrpc: "2.0",
453 error: [code: -32700, message: "Invalid JSON: " + e.message], 393 error: [code: -32700, message: "Invalid JSON: " + e.message],
...@@ -455,12 +395,11 @@ class EnhancedMcpServlet extends HttpServlet { ...@@ -455,12 +395,11 @@ class EnhancedMcpServlet extends HttpServlet {
455 ])) 395 ]))
456 return 396 return
457 } 397 }
458 398
459 // Validate JSON-RPC 2.0 structure 399 // Validate JSON-RPC 2.0 structure
460 if (!rpcRequest?.jsonrpc || rpcRequest.jsonrpc != "2.0" || !rpcRequest?.method) { 400 if (!rpcRequest?.jsonrpc || rpcRequest.jsonrpc != "2.0" || !rpcRequest?.method) {
461 response.setContentType("application/json")
462 response.setCharacterEncoding("UTF-8")
463 response.setStatus(HttpServletResponse.SC_BAD_REQUEST) 401 response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
402 response.setContentType("application/json")
464 response.writer.write(JsonOutput.toJson([ 403 response.writer.write(JsonOutput.toJson([
465 jsonrpc: "2.0", 404 jsonrpc: "2.0",
466 error: [code: -32600, message: "Invalid JSON-RPC 2.0 request"], 405 error: [code: -32600, message: "Invalid JSON-RPC 2.0 request"],
...@@ -468,31 +407,25 @@ class EnhancedMcpServlet extends HttpServlet { ...@@ -468,31 +407,25 @@ class EnhancedMcpServlet extends HttpServlet {
468 ])) 407 ]))
469 return 408 return
470 } 409 }
471 410
472 // Process method with session context 411 // Process method with session context
473 def result = processMcpMethod(rpcRequest.method, rpcRequest.params, ec, sessionId) 412 def result = processMcpMethod(rpcRequest.method, rpcRequest.params, ec, sessionId, null)
474 413
475 // Send response via MCP transport to the specific session
476 def responseMessage = new JsonRpcResponse(result, rpcRequest.id)
477 session.sendMessage(responseMessage)
478
479 response.setContentType("application/json") 414 response.setContentType("application/json")
480 response.setCharacterEncoding("UTF-8") 415 response.setCharacterEncoding("UTF-8")
481 response.setStatus(HttpServletResponse.SC_OK) 416 response.setStatus(HttpServletResponse.SC_OK)
482 417
483 // Extract actual result from service response (same as regular handler)
484 def actualResult = result?.result ?: result 418 def actualResult = result?.result ?: result
485 response.writer.write(JsonOutput.toJson([ 419 response.writer.write(JsonOutput.toJson([
486 jsonrpc: "2.0", 420 jsonrpc: "2.0",
487 id: rpcRequest.id, 421 id: rpcRequest.id,
488 result: actualResult 422 result: actualResult
489 ])) 423 ]))
490 424
491 } catch (Exception e) { 425 } catch (Exception e) {
492 logger.error("Error processing message for session ${sessionId}: ${e.message}", e) 426 logger.error("Error processing message for session ${sessionId}: ${e.message}", e)
493 response.setContentType("application/json")
494 response.setCharacterEncoding("UTF-8")
495 response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR) 427 response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR)
428 response.setContentType("application/json")
496 response.writer.write(JsonOutput.toJson([ 429 response.writer.write(JsonOutput.toJson([
497 jsonrpc: "2.0", 430 jsonrpc: "2.0",
498 error: [code: -32603, message: "Internal error: " + e.message], 431 error: [code: -32603, message: "Internal error: " + e.message],
...@@ -500,46 +433,14 @@ class EnhancedMcpServlet extends HttpServlet { ...@@ -500,46 +433,14 @@ class EnhancedMcpServlet extends HttpServlet {
500 ])) 433 ]))
501 } 434 }
502 } 435 }
503 436
504 private void handleJsonRpc(HttpServletRequest request, HttpServletResponse response, ExecutionContextImpl ec, String webappName, String requestBody, def visit) 437 private void handleJsonRpc(HttpServletRequest request, HttpServletResponse response, ExecutionContextImpl ec, String webappName, String requestBody, def visit)
505 throws IOException { 438 throws IOException {
506 439
507 // Initialize web facade for proper session management
508 try {
509 // If we have a visit, use it directly (don't create new one)
510 visit = ec.user.getVisit()
511 if (visit) {
512 request.getSession().setAttribute("moqui.visitId", visit.visitId)
513 logger.debug("JSON-RPC web facade initialized for user: ${ec.user?.username} with visit: ${visit.visitId}")
514 } else {
515 // No visit exists, need to create one
516 logger.info("Creating new Visit record for user: ${ec.user?.username}")
517 visit = ec.entity.makeValue("moqui.server.Visit")
518 visit.visitId = ec.userFacade.getVisitId(visit)
519 visit.userId = ec.user.userId
520 visit.sessionId = visit.sessionId
521 visit.userAccountId = ec.user.userAccount?.userAccountId
522 visit.sessionCreatedDate = ec.user.nowTimestamp
523 visit.visitStatus = null
524 visit.lastActiveDate = ec.user.nowTimestamp
525 visit.visitDeletedDate = null
526 ec.entity.create(visit)
527 logger.info("Visit ${visit.visitId} created for user: ${ec.user?.username}")
528 }
529 ec.initWebFacade(webappName, request, response)
530 logger.debug("JSON-RPC web facade initialized for user: ${ec.user?.username} with visit: ${visit.visitId}")
531 } catch (Exception e) {
532 logger.warn("Web facade initialization warning: ${e.message}")
533 // Continue anyway - we may still have basic user context from auth
534 }
535
536 String method = request.getMethod() 440 String method = request.getMethod()
537 String acceptHeader = request.getHeader("Accept") 441 String acceptHeader = request.getHeader("Accept")
538 442
539 logger.debug("Enhanced MCP JSON-RPC Request: ${method} ${request.requestURI} - Accept: ${acceptHeader}") 443 // Validate Accept header per MCP spec
540
541 // Validate Accept header per MCP 2025-11-25 spec requirement #2
542 // Client MUST include Accept header with at least one of: application/json or text/event-stream
543 if (!acceptHeader || !(acceptHeader.contains("application/json") || acceptHeader.contains("text/event-stream"))) { 444 if (!acceptHeader || !(acceptHeader.contains("application/json") || acceptHeader.contains("text/event-stream"))) {
544 response.setStatus(HttpServletResponse.SC_BAD_REQUEST) 445 response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
545 response.setContentType("application/json") 446 response.setContentType("application/json")
...@@ -550,7 +451,7 @@ class EnhancedMcpServlet extends HttpServlet { ...@@ -550,7 +451,7 @@ class EnhancedMcpServlet extends HttpServlet {
550 ])) 451 ]))
551 return 452 return
552 } 453 }
553 454
554 if (!"POST".equals(method)) { 455 if (!"POST".equals(method)) {
555 response.setStatus(HttpServletResponse.SC_METHOD_NOT_ALLOWED) 456 response.setStatus(HttpServletResponse.SC_METHOD_NOT_ALLOWED)
556 response.setContentType("application/json") 457 response.setContentType("application/json")
...@@ -562,9 +463,6 @@ class EnhancedMcpServlet extends HttpServlet { ...@@ -562,9 +463,6 @@ class EnhancedMcpServlet extends HttpServlet {
562 return 463 return
563 } 464 }
564 465
565 // Use pre-read request body
566 logger.debug("Using pre-read request body, length: ${requestBody?.length()}")
567
568 if (!requestBody) { 466 if (!requestBody) {
569 response.setStatus(HttpServletResponse.SC_BAD_REQUEST) 467 response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
570 response.setContentType("application/json") 468 response.setContentType("application/json")
...@@ -576,16 +474,10 @@ class EnhancedMcpServlet extends HttpServlet { ...@@ -576,16 +474,10 @@ class EnhancedMcpServlet extends HttpServlet {
576 return 474 return
577 } 475 }
578 476
579 // Log request body for debugging (be careful with this in production)
580 if (requestBody.length() > 0) {
581 logger.trace("MCP JSON-RPC request body: ${requestBody}")
582 }
583
584 def rpcRequest 477 def rpcRequest
585 try { 478 try {
586 rpcRequest = jsonSlurper.parseText(requestBody) 479 rpcRequest = jsonSlurper.parseText(requestBody)
587 } catch (Exception e) { 480 } catch (Exception e) {
588 logger.error("Failed to parse JSON-RPC request: ${e.message}")
589 response.setStatus(HttpServletResponse.SC_BAD_REQUEST) 481 response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
590 response.setContentType("application/json") 482 response.setContentType("application/json")
591 response.writer.write(JsonOutput.toJson([ 483 response.writer.write(JsonOutput.toJson([
...@@ -595,7 +487,7 @@ class EnhancedMcpServlet extends HttpServlet { ...@@ -595,7 +487,7 @@ class EnhancedMcpServlet extends HttpServlet {
595 ])) 487 ]))
596 return 488 return
597 } 489 }
598 490
599 // Validate JSON-RPC 2.0 structure 491 // Validate JSON-RPC 2.0 structure
600 if (!rpcRequest?.jsonrpc || rpcRequest.jsonrpc != "2.0" || !rpcRequest?.method) { 492 if (!rpcRequest?.jsonrpc || rpcRequest.jsonrpc != "2.0" || !rpcRequest?.method) {
601 response.setStatus(HttpServletResponse.SC_BAD_REQUEST) 493 response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
...@@ -607,10 +499,9 @@ class EnhancedMcpServlet extends HttpServlet { ...@@ -607,10 +499,9 @@ class EnhancedMcpServlet extends HttpServlet {
607 ])) 499 ]))
608 return 500 return
609 } 501 }
610 502
611 // Validate MCP protocol version per specification 503 // Validate MCP protocol version
612 String protocolVersion = request.getHeader("MCP-Protocol-Version") 504 String protocolVersion = request.getHeader("MCP-Protocol-Version")
613 // Support multiple protocol versions with version negotiation
614 def supportedVersions = ["2025-06-18", "2025-11-25", "2024-11-05", "2024-10-07", "2023-06-05"] 505 def supportedVersions = ["2025-06-18", "2025-11-25", "2024-11-05", "2024-10-07", "2023-06-05"]
615 if (protocolVersion && !supportedVersions.contains(protocolVersion)) { 506 if (protocolVersion && !supportedVersions.contains(protocolVersion)) {
616 response.setStatus(HttpServletResponse.SC_BAD_REQUEST) 507 response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
...@@ -623,18 +514,15 @@ class EnhancedMcpServlet extends HttpServlet { ...@@ -623,18 +514,15 @@ class EnhancedMcpServlet extends HttpServlet {
623 return 514 return
624 } 515 }
625 516
626 // Get session ID from Mcp-Session-Id header per MCP specification 517 // Get session ID from header
627 String sessionId = request.getHeader("Mcp-Session-Id") 518 String sessionId = request.getHeader("Mcp-Session-Id")
628 logger.debug("Session ID from header: '${sessionId}', method: '${rpcRequest.method}'")
629 519
630 // For initialize and notifications/initialized methods, use visit ID as session ID if no header 520 // For initialize, use visit ID as session ID
631 if (!sessionId && ("initialize".equals(rpcRequest.method) || "notifications/initialized".equals(rpcRequest.method)) && visit) { 521 if (!sessionId && ("initialize".equals(rpcRequest.method) || "notifications/initialized".equals(rpcRequest.method)) && visit) {
632 sessionId = visit.visitId 522 sessionId = visit.visitId?.toString()
633 logger.debug("${rpcRequest.method} method: using visit ID as session ID: ${sessionId}")
634 } 523 }
635 524
636 // Validate session ID for non-initialize requests per MCP spec 525 // Validate session ID for non-initialize requests
637 // Allow notifications/initialized without session ID as it completes the initialization process
638 if (!sessionId && rpcRequest.method != "initialize" && rpcRequest.method != "notifications/initialized") { 526 if (!sessionId && rpcRequest.method != "initialize" && rpcRequest.method != "notifications/initialized") {
639 response.setStatus(HttpServletResponse.SC_BAD_REQUEST) 527 response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
640 response.setContentType("application/json") 528 response.setContentType("application/json")
...@@ -645,16 +533,13 @@ class EnhancedMcpServlet extends HttpServlet { ...@@ -645,16 +533,13 @@ class EnhancedMcpServlet extends HttpServlet {
645 ])) 533 ]))
646 return 534 return
647 } 535 }
648 536
649 // For existing sessions, set visit ID in HTTP session before web facade initialization 537 // For existing sessions, validate ownership
650 // This ensures Moqui picks up the existing Visit when initWebFacade() is called
651 if (sessionId && rpcRequest.method != "initialize") { 538 if (sessionId && rpcRequest.method != "initialize") {
652 try { 539 def session = sessionAdapter.getSession(sessionId)
653 ec.artifactExecution.disableAuthz() 540 if (!session) {
654 def existingVisit = ec.entity.find("moqui.server.Visit") 541 // Try loading from database
655 .condition("visitId", sessionId) 542 def existingVisit = getCachedVisit(ec, sessionId)
656 .one()
657
658 if (!existingVisit) { 543 if (!existingVisit) {
659 response.setStatus(HttpServletResponse.SC_NOT_FOUND) 544 response.setStatus(HttpServletResponse.SC_NOT_FOUND)
660 response.setContentType("application/json") 545 response.setContentType("application/json")
...@@ -665,9 +550,9 @@ class EnhancedMcpServlet extends HttpServlet { ...@@ -665,9 +550,9 @@ class EnhancedMcpServlet extends HttpServlet {
665 ])) 550 ]))
666 return 551 return
667 } 552 }
668 553
669 // Rely on Moqui security - only allow access if visit and current user match 554 // Verify ownership
670 if (!existingVisit.userId || !ec.user.userId || existingVisit.userId.toString() != ec.user.userId.toString()) { 555 if (existingVisit.userId?.toString() != ec.user.userId?.toString()) {
671 response.setStatus(HttpServletResponse.SC_FORBIDDEN) 556 response.setStatus(HttpServletResponse.SC_FORBIDDEN)
672 response.setContentType("application/json") 557 response.setContentType("application/json")
673 response.writer.write(JsonOutput.toJson([ 558 response.writer.write(JsonOutput.toJson([
...@@ -678,317 +563,217 @@ class EnhancedMcpServlet extends HttpServlet { ...@@ -678,317 +563,217 @@ class EnhancedMcpServlet extends HttpServlet {
678 return 563 return
679 } 564 }
680 565
681 // Set visit ID in HTTP session so Moqui web facade initialization picks it up 566 // Create session in adapter if not exists
682 request.session.setAttribute("moqui.visitId", sessionId) 567 if (!sessionAdapter.hasSession(sessionId)) {
683 logger.debug("Set existing Visit ${sessionId} in HTTP session for user ${ec.user.username}") 568 sessionAdapter.createSession(sessionId, ec.user.userId?.toString())
684 569 }
685 } catch (Exception e) { 570 } else if (session.userId != ec.user.userId?.toString()) {
686 logger.error("Error finding session ${sessionId}: ${e.message}") 571 response.setStatus(HttpServletResponse.SC_FORBIDDEN)
687 response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR)
688 response.setContentType("application/json") 572 response.setContentType("application/json")
689 response.writer.write(JsonOutput.toJson([ 573 response.writer.write(JsonOutput.toJson([
690 jsonrpc: "2.0", 574 jsonrpc: "2.0",
691 error: [code: -32603, message: "Session lookup error: ${e.message}"], 575 error: [code: -32600, message: "Access denied for session: ${sessionId}"],
692 id: rpcRequest.id 576 id: rpcRequest.id
693 ])) 577 ]))
694 return 578 return
695 } finally {
696 ec.artifactExecution.enableAuthz()
697 } 579 }
698 } 580 }
699 581
700 // Check if this is a notification (no id) - notifications get empty response 582 // Check if this is a notification (no id)
701 boolean isNotification = !rpcRequest.containsKey('id') 583 boolean isNotification = !rpcRequest.containsKey('id')
702 584
703 if (isNotification) { 585 if (isNotification) {
704 // Special handling for notifications/initialized to transition session state
705 if ("notifications/initialized".equals(rpcRequest.method)) { 586 if ("notifications/initialized".equals(rpcRequest.method)) {
706 logger.debug("Processing notifications/initialized for sessionId: ${sessionId}")
707 if (sessionId) { 587 if (sessionId) {
708 sessionStates.put(sessionId, STATE_INITIALIZED) 588 sessionAdapter.setSessionState(sessionId, McpSession.STATE_INITIALIZED)
709 // Store user mapping in memory for fast validation 589 logger.debug("Session ${sessionId} transitioned to INITIALIZED state")
710 sessionUsers.put(sessionId, ec.user.userId.toString())
711 logger.debug("Session ${sessionId} transitioned to INITIALIZED state for user ${ec.user.userId}")
712 } 590 }
713 591
714 // For notifications/initialized, return 202 Accepted per MCP HTTP Streaming spec
715 if (sessionId) { 592 if (sessionId) {
716 response.setHeader("Mcp-Session-Id", sessionId.toString()) 593 response.setHeader("Mcp-Session-Id", sessionId)
717 } 594 }
718 response.setContentType("text/event-stream") 595 response.setContentType("text/event-stream")
719 response.setStatus(HttpServletResponse.SC_ACCEPTED) // 202 Accepted 596 response.setStatus(HttpServletResponse.SC_ACCEPTED)
720 logger.debug("Sent 202 Accepted response for notifications/initialized") 597 response.flushBuffer()
721 response.flushBuffer() // Commit the response immediately 598 return
722 return
723 } 599 }
724 600
725 // For other notifications, set session header if needed but NO response per MCP spec 601 // Other notifications receive 204 No Content
726 if (sessionId) { 602 if (sessionId) {
727 response.setHeader("Mcp-Session-Id", sessionId.toString()) 603 response.setHeader("Mcp-Session-Id", sessionId)
728 } 604 }
729 605 response.setStatus(HttpServletResponse.SC_NO_CONTENT)
730 // Other notifications receive NO response per MCP specification 606 response.flushBuffer()
731 response.setStatus(HttpServletResponse.SC_NO_CONTENT) // 204 No Content
732 response.flushBuffer() // Commit the response immediately
733 return 607 return
734 } 608 }
735 609
736 // Process MCP method using Moqui services with session ID if available 610 // Process MCP method
737 def result = processMcpMethod(rpcRequest.method, rpcRequest.params, ec, sessionId, visit ?: [:]) 611 def result = processMcpMethod(rpcRequest.method, rpcRequest.params, ec, sessionId, visit)
738 612
739 // Update session activity throttled for actual user actions (not pings or tools/list) 613 // Update session activity
740 // tools/list is read-only discovery and shouldn't update session activity to prevent lock contention
741 if (sessionId && !"ping".equals(rpcRequest.method) && !"tools/list".equals(rpcRequest.method)) { 614 if (sessionId && !"ping".equals(rpcRequest.method) && !"tools/list".equals(rpcRequest.method)) {
742 updateSessionActivityThrottled(sessionId) 615 updateSessionActivityThrottled(sessionId)
743 } 616 }
744 617
745 // Set Mcp-Session-Id header BEFORE any response data (per MCP 2025-06-18 spec) 618 // Set session header
746 // For initialize method, always use sessionId we have (from visit or header)
747 String responseSessionId = null 619 String responseSessionId = null
748 if (rpcRequest.method == "initialize" && sessionId) { 620 if (rpcRequest.method == "initialize" && sessionId) {
749 responseSessionId = sessionId.toString() 621 responseSessionId = sessionId
750 } else if (result?.sessionId) { 622 } else if (result?.sessionId) {
751 responseSessionId = result.sessionId.toString() 623 responseSessionId = result.sessionId?.toString()
752 } else if (sessionId) { 624 } else if (sessionId) {
753 // For other methods, ensure we always return session ID from header 625 responseSessionId = sessionId
754 responseSessionId = sessionId.toString()
755 } 626 }
756 627
757 if (responseSessionId) { 628 if (responseSessionId) {
758 response.setHeader("Mcp-Session-Id", responseSessionId) 629 response.setHeader("Mcp-Session-Id", responseSessionId)
759 logger.debug("Set Mcp-Session-Id header to ${responseSessionId} for method ${rpcRequest.method}")
760 } 630 }
761 631
762 // Build JSON-RPC response for regular requests 632 // Build response
763 // Extract the actual result from Moqui service response
764 def actualResult = result?.result ?: result 633 def actualResult = result?.result ?: result
765 def rpcResponse = [ 634 def rpcResponse = [
766 jsonrpc: "2.0", 635 jsonrpc: "2.0",
767 id: rpcRequest.id, 636 id: rpcRequest.id,
768 result: actualResult 637 result: actualResult
769 ] 638 ]
770 639
771 // Standard MCP flow: include notifications in response content array
772 if (sessionId && notificationQueues.containsKey(sessionId)) {
773 def pendingNotifications = notificationQueues.get(sessionId)
774 if (pendingNotifications && !pendingNotifications.isEmpty()) {
775 logger.debug("Adding ${pendingNotifications.size()} pending notifications to response content for session ${sessionId}")
776
777 // Convert notifications to content items and add to result
778 def notificationContent = []
779 for (notification in pendingNotifications) {
780 notificationContent << [
781 type: "text",
782 text: "Notification [${notification.method}]: " + JsonOutput.toJson(notification.params ?: notification)
783 ]
784 }
785
786 // Merge notification content with existing result content
787 def existingContent = actualResult?.content ?: []
788 actualResult.content = existingContent + notificationContent
789
790 // Clear delivered notifications
791 notificationQueues.put(sessionId, [])
792 logger.debug("Merged ${pendingNotifications.size()} notifications into response for session ${sessionId}")
793 }
794 }
795
796 response.setContentType("application/json") 640 response.setContentType("application/json")
797 response.setCharacterEncoding("UTF-8") 641 response.setCharacterEncoding("UTF-8")
798
799 // Send the main response
800 response.writer.write(JsonOutput.toJson(rpcResponse)) 642 response.writer.write(JsonOutput.toJson(rpcResponse))
801 } 643 }
802 644
803 private Map<String, Object> processMcpMethod(String method, Map params, ExecutionContextImpl ec, String sessionId, def visit) { 645 private Map<String, Object> processMcpMethod(String method, Map params, ExecutionContextImpl ec, String sessionId, def visit) {
804 logger.debug("Enhanced METHOD: ${method} with sessionId: ${sessionId}") 646 logger.debug("Processing MCP method: ${method} with sessionId: ${sessionId}")
805 647
806 try { 648 try {
807 // Ensure params is not null 649 if (params == null) params = [:]
808 if (params == null) { 650 params.sessionId = visit?.visitId ?: sessionId
809 params = [:] 651
810 }
811
812 // Add session context to parameters for services
813 params.sessionId = visit?.visitId
814
815 // Check session state for methods that require initialization 652 // Check session state for methods that require initialization
816 // Use the sessionId from header for consistency (this is what the client tracks) 653 def session = sessionId ? sessionAdapter.getSession(sessionId) : null
817 Integer sessionState = sessionId ? sessionStates.get(sessionId) : null
818
819 // Methods that don't require initialized session
820 if (!["initialize", "ping"].contains(method)) { 654 if (!["initialize", "ping"].contains(method)) {
821 if (sessionState != STATE_INITIALIZED) { 655 if (!session || session.state != McpSession.STATE_INITIALIZED) {
822 logger.warn("Method ${method} called but session ${sessionId} not initialized (state: ${sessionState})") 656 logger.warn("Method ${method} called but session ${sessionId} not initialized")
823 return [error: "Session not initialized. Call initialize first, then send notifications/initialized."] 657 return [error: "Session not initialized. Call initialize first, then send notifications/initialized."]
824 } 658 }
825 } 659 }
826 660
827 switch (method) { 661 switch (method) {
828 case "initialize": 662 case "initialize":
829 // For initialize, use the visitId we just created instead of null sessionId from request
830 if (visit && visit.visitId) { 663 if (visit && visit.visitId) {
831 params.sessionId = visit.visitId 664 params.sessionId = visit.visitId
832 // Set session to initializing state using actual sessionId as key (for consistency) 665 // Create session in adapter with actual authenticated userId
833 sessionStates.put(params.sessionId, STATE_INITIALIZING) 666 if (!sessionAdapter.hasSession(params.sessionId?.toString())) {
834 logger.debug("Initialize - using visitId: ${visit.visitId}, set state ${params.sessionId} to INITIALIZING") 667 sessionAdapter.createSession(params.sessionId?.toString(), ec.user.userId?.toString())
835 } else { 668 }
836 logger.warn("Initialize - no visit available, using null sessionId") 669 sessionAdapter.setSessionState(params.sessionId?.toString(), McpSession.STATE_INITIALIZING)
837 } 670 }
838 params.actualUserId = ec.user.userId 671 params.actualUserId = ec.user.userId
839 logger.debug("Initialize - actualUserId: ${params.actualUserId}, sessionId: ${params.sessionId}")
840 def serviceResult = callMcpService("mcp#Initialize", params, ec) 672 def serviceResult = callMcpService("mcp#Initialize", params, ec)
841 // Add sessionId to the response for mcp.sh compatibility 673 if (serviceResult && !serviceResult.error) {
842 if (serviceResult && serviceResult.result) { 674 serviceResult.sessionId = params.sessionId
843 serviceResult.result.sessionId = params.sessionId 675 sessionAdapter.setSessionState(params.sessionId?.toString(), McpSession.STATE_INITIALIZED)
844 // Initialize successful - transition session to INITIALIZED state
845 sessionStates.put(params.sessionId, STATE_INITIALIZED)
846 logger.debug("Initialize - successful, set state ${params.sessionId} to INITIALIZED")
847 } 676 }
848 return serviceResult 677 return serviceResult
678
849 case "ping": 679 case "ping":
850 // Simple ping for testing - bypass service for now
851 return [pong: System.currentTimeMillis(), sessionId: visit?.visitId, user: ec.user.username] 680 return [pong: System.currentTimeMillis(), sessionId: visit?.visitId, user: ec.user.username]
681
852 case "tools/list": 682 case "tools/list":
853 // Ensure sessionId is available to service for notification consistency
854 if (sessionId) params.sessionId = sessionId 683 if (sessionId) params.sessionId = sessionId
855 return callMcpService("list#Tools", params, ec) 684 return callMcpService("list#Tools", params, ec)
685
856 case "tools/call": 686 case "tools/call":
857 // Ensure sessionId is available to service for notification consistency
858 if (sessionId) params.sessionId = sessionId 687 if (sessionId) params.sessionId = sessionId
859 return callMcpService("mcp#ToolsCall", params, ec) 688 return callMcpService("mcp#ToolsCall", params, ec)
689
860 case "resources/list": 690 case "resources/list":
861 return callMcpService("mcp#ResourcesList", params, ec) 691 return callMcpService("mcp#ResourcesList", params, ec)
692
862 case "resources/read": 693 case "resources/read":
863 return callMcpService("mcp#ResourcesRead", params, ec) 694 return callMcpService("mcp#ResourcesRead", params, ec)
695
864 case "resources/templates/list": 696 case "resources/templates/list":
865 return callMcpService("mcp#ResourcesTemplatesList", params, ec) 697 return callMcpService("mcp#ResourcesTemplatesList", params, ec)
698
866 case "resources/subscribe": 699 case "resources/subscribe":
867 return callMcpService("mcp#ResourcesSubscribe", params, ec) 700 return callMcpService("mcp#ResourcesSubscribe", params, ec)
701
868 case "resources/unsubscribe": 702 case "resources/unsubscribe":
869 return callMcpService("mcp#ResourcesUnsubscribe", params, ec) 703 return callMcpService("mcp#ResourcesUnsubscribe", params, ec)
704
870 case "prompts/list": 705 case "prompts/list":
871 return callMcpService("mcp#PromptsList", params, ec) 706 return callMcpService("mcp#PromptsList", params, ec)
707
872 case "prompts/get": 708 case "prompts/get":
873 return callMcpService("mcp#PromptsGet", params, ec) 709 return callMcpService("mcp#PromptsGet", params, ec)
710
874 case "roots/list": 711 case "roots/list":
875 return callMcpService("mcp#RootsList", params, ec) 712 return callMcpService("mcp#RootsList", params, ec)
713
876 case "sampling/createMessage": 714 case "sampling/createMessage":
877 return callMcpService("mcp#SamplingCreateMessage", params, ec) 715 return callMcpService("mcp#SamplingCreateMessage", params, ec)
716
878 case "elicitation/create": 717 case "elicitation/create":
879 return callMcpService("mcp#ElicitationCreate", params, ec) 718 return callMcpService("mcp#ElicitationCreate", params, ec)
880 // NOTE: notifications/initialized is handled as a notification, not a request method 719
881 // It will be processed by the notification handling logic above (lines 824-837)
882 case "notifications/tools/list_changed": 720 case "notifications/tools/list_changed":
883 // Handle tools list changed notification
884 logger.debug("Tools list changed for sessionId: ${sessionId}")
885 // Could trigger cache invalidation here if needed
886 return null
887 case "notifications/resources/list_changed": 721 case "notifications/resources/list_changed":
888 // Handle resources list changed notification 722 case "notifications/prompts/list_changed":
889 logger.debug("Resources list changed for sessionId: ${sessionId}") 723 case "notifications/roots/list_changed":
890 // Could trigger cache invalidation here if needed 724 case "logging/setLevel":
725 logger.debug("Notification ${method} for sessionId: ${sessionId}")
891 return null 726 return null
727
892 case "notifications/send": 728 case "notifications/send":
893 // Handle notification sending
894 def notificationMethod = params?.method 729 def notificationMethod = params?.method
895 def notificationParams = params?.params 730 def notificationParams = params?.params
896 if (!notificationMethod) { 731 if (!notificationMethod) {
897 throw new IllegalArgumentException("method is required for sending notification") 732 throw new IllegalArgumentException("method is required for sending notification")
898 } 733 }
899
900 logger.debug("Sending notification ${notificationMethod} for sessionId: ${sessionId}")
901
902 // Queue notification for delivery through SSE or polling
903 if (sessionId) { 734 if (sessionId) {
904 def notification = [ 735 def notification = [
736 jsonrpc: "2.0",
905 method: notificationMethod, 737 method: notificationMethod,
906 params: notificationParams, 738 params: notificationParams
907 timestamp: System.currentTimeMillis()
908 ] 739 ]
909 740 transport.sendNotification(sessionId, notification)
910 // Add to notification queue
911 def queue = notificationQueues.get(sessionId) ?: []
912 queue << notification
913 notificationQueues.put(sessionId, queue)
914
915 logger.debug("Notification queued for session ${sessionId}: ${notificationMethod}")
916 } 741 }
917
918 return [sent: true, sessionId: sessionId, method: notificationMethod] 742 return [sent: true, sessionId: sessionId, method: notificationMethod]
743
919 case "notifications/subscribe": 744 case "notifications/subscribe":
920 // Handle notification subscription
921 def subscriptionMethod = params?.method 745 def subscriptionMethod = params?.method
922 if (!sessionId || !subscriptionMethod) { 746 if (!sessionId || !subscriptionMethod) {
923 throw new IllegalArgumentException("sessionId and method are required for subscription") 747 throw new IllegalArgumentException("sessionId and method are required for subscription")
924 } 748 }
925 def subscriptions = sessionSubscriptions.get(sessionId) ?: new HashSet<>() 749 session?.subscriptions?.add(subscriptionMethod)
926 subscriptions.add(subscriptionMethod)
927 sessionSubscriptions.put(sessionId, subscriptions)
928 logger.debug("Session ${sessionId} subscribed to: ${subscriptionMethod}")
929 return [subscribed: true, sessionId: sessionId, method: subscriptionMethod] 750 return [subscribed: true, sessionId: sessionId, method: subscriptionMethod]
751
930 case "notifications/unsubscribe": 752 case "notifications/unsubscribe":
931 // Handle notification unsubscription
932 def subscriptionMethod = params?.method 753 def subscriptionMethod = params?.method
933 if (!sessionId || !subscriptionMethod) { 754 if (!sessionId || !subscriptionMethod) {
934 throw new IllegalArgumentException("sessionId and method are required for unsubscription") 755 throw new IllegalArgumentException("sessionId and method are required for unsubscription")
935 } 756 }
936 def subscriptions = sessionSubscriptions.get(sessionId) 757 session?.subscriptions?.remove(subscriptionMethod)
937 if (subscriptions) {
938 subscriptions.remove(subscriptionMethod)
939 if (subscriptions.isEmpty()) {
940 sessionSubscriptions.remove(sessionId)
941 } else {
942 sessionSubscriptions.put(sessionId, subscriptions)
943 }
944 logger.debug("Session ${sessionId} unsubscribed from: ${subscriptionMethod}")
945 }
946 return [unsubscribed: true, sessionId: sessionId, method: subscriptionMethod] 758 return [unsubscribed: true, sessionId: sessionId, method: subscriptionMethod]
759
947 case "notifications/progress": 760 case "notifications/progress":
948 // Handle progress notification
949 def progressToken = params?.progressToken 761 def progressToken = params?.progressToken
950 def progressValue = params?.progress 762 def progressValue = params?.progress
951 def total = params?.total 763 def total = params?.total
952 logger.debug("Progress notification for sessionId: ${sessionId}, token: ${progressToken}, progress: ${progressValue}/${total}") 764 logger.debug("Progress notification: ${progressToken}, ${progressValue}/${total}")
953 // Store progress for potential polling
954 if (sessionId && progressToken) {
955 def progressKey = "${sessionId}_${progressToken}"
956 sessionProgress.put(progressKey, [progress: progressValue, total: total, timestamp: System.currentTimeMillis()])
957 }
958 return null 765 return null
766
959 case "notifications/resources/updated": 767 case "notifications/resources/updated":
960 // Handle resource updated notification 768 logger.debug("Resource updated: ${params?.uri}")
961 def uri = params?.uri
962 logger.debug("Resource updated notification for sessionId: ${sessionId}, uri: ${uri}")
963 // Could trigger resource cache invalidation here
964 return null
965 case "notifications/prompts/list_changed":
966 // Handle prompts list changed notification
967 logger.debug("Prompts list changed for sessionId: ${sessionId}")
968 // Could trigger prompt cache invalidation here
969 return null 769 return null
770
970 case "notifications/message": 771 case "notifications/message":
971 // Handle general message notification
972 def level = params?.level ?: "info" 772 def level = params?.level ?: "info"
973 def message = params?.message 773 def message = params?.message
974 def data = params?.data 774 logger.debug("Message notification: level=${level}, message=${message}")
975 logger.debug("Message notification for sessionId: ${sessionId}, level: ${level}, message: ${message}")
976 // Store message for potential retrieval
977 if (sessionId) {
978 def messages = sessionMessages.get(sessionId) ?: []
979 messages << [level: level, message: message, data: data, timestamp: System.currentTimeMillis()]
980 sessionMessages.put(sessionId, messages)
981 }
982 return null
983 case "notifications/roots/list_changed":
984 // Handle roots list changed notification
985 logger.debug("Roots list changed for sessionId: ${sessionId}")
986 // Could trigger roots cache invalidation here
987 return null
988 case "logging/setLevel":
989 // Handle logging level change notification
990 logger.debug("Logging level change requested for sessionId: ${sessionId}")
991 return null 775 return null
776
992 default: 777 default:
993 throw new IllegalArgumentException("Method not found: ${method}") 778 throw new IllegalArgumentException("Method not found: ${method}")
994 } 779 }
...@@ -997,116 +782,41 @@ class EnhancedMcpServlet extends HttpServlet { ...@@ -997,116 +782,41 @@ class EnhancedMcpServlet extends HttpServlet {
997 throw e 782 throw e
998 } 783 }
999 } 784 }
1000 785
1001 private Map<String, Object> callMcpService(String serviceName, Map params, ExecutionContextImpl ec) { 786 private Map<String, Object> callMcpService(String serviceName, Map params, ExecutionContextImpl ec) {
1002 logger.debug("Enhanced Calling MCP service: ${serviceName} with params: ${params}") 787 logger.debug("Calling MCP service: ${serviceName}")
1003 788
1004 try { 789 try {
1005 ec.artifactExecution.disableAuthz() 790 ec.artifactExecution.disableAuthz()
1006 def result = ec.service.sync().name("McpServices.${serviceName}") 791 def result = ec.service.sync().name("McpServices.${serviceName}")
1007 .parameters(params ?: [:]) 792 .parameters(params ?: [:])
1008 .call() 793 .call()
1009 794
1010 logger.debug("Enhanced MCP service ${serviceName} result: ${result?.result?.size() ? 'result with ' + (result.result?.tools?.size() ?: 0) + ' tools' : 'empty result'}")
1011 if (result == null) { 795 if (result == null) {
1012 logger.error("Enhanced MCP service ${serviceName} returned null result")
1013 return [error: "Service returned null result"] 796 return [error: "Service returned null result"]
1014 } 797 }
1015 // Service framework returns result in 'result' field when out-parameters are used 798
1016 // Extract the inner result to avoid double nesting in JSON-RPC response
1017 // The MCP services already set the correct 'result' structure
1018 // Some services return result directly, others nest it in result.result
1019 if (result?.containsKey('result')) { 799 if (result?.containsKey('result')) {
1020 return result.result 800 return result.result
1021 } else {
1022 return result ?: [error: "Service returned null result"]
1023 } 801 }
802 return result
803
1024 } catch (Exception e) { 804 } catch (Exception e) {
1025 logger.error("Error calling Enhanced MCP service ${serviceName}", e) 805 logger.error("Error calling MCP service ${serviceName}", e)
1026 return [error: e.message] 806 return [error: e.message]
1027 } finally { 807 } finally {
1028 ec.artifactExecution.enableAuthz() 808 ec.artifactExecution.enableAuthz()
1029 } 809 }
1030 } 810 }
1031
1032 private void sendSseEvent(PrintWriter writer, String eventType, String data, long eventId = -1) throws IOException {
1033 try {
1034 if (eventId >= 0) {
1035 writer.write("id: " + eventId + "\n")
1036 }
1037 writer.write("event: " + eventType + "\n")
1038 writer.write("data: " + data + "\n\n")
1039 writer.flush()
1040
1041 if (writer.checkError()) {
1042 throw new IOException("Client disconnected")
1043 }
1044 } catch (Exception e) {
1045 throw new IOException("Failed to send SSE event: " + e.message, e)
1046 }
1047 }
1048
1049 // CORS handling based on MoquiServlet pattern
1050 private static boolean handleCors(HttpServletRequest request, HttpServletResponse response, String webappName, ExecutionContextFactoryImpl ecfi) {
1051 String originHeader = request.getHeader("Origin")
1052 if (originHeader) {
1053 response.setHeader("Access-Control-Allow-Origin", originHeader)
1054 response.setHeader("Access-Control-Allow-Credentials", "true")
1055 }
1056
1057 String methodHeader = request.getHeader("Access-Control-Request-Method")
1058 if (methodHeader) {
1059 response.setHeader("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
1060 response.setHeader("Access-Control-Allow-Headers", "Content-Type, Authorization, Mcp-Session-Id, MCP-Protocol-Version, Accept")
1061 response.setHeader("Access-Control-Max-Age", "3600")
1062 return true
1063 }
1064 return false
1065 }
1066
1067 /**
1068 * Queue a server notification for delivery to client
1069 */
1070 void queueNotification(String sessionId, Map notification) {
1071 if (!sessionId || !notification) return
1072 811
1073 def queue = notificationQueues.computeIfAbsent(sessionId) { [] } 812 private EntityValue getCachedVisit(ExecutionContextImpl ec, String sessionId) {
1074 queue << notification
1075 logger.info("Queued notification for session ${sessionId}: ${notification}")
1076
1077 // Session activity updates handled at JSON-RPC level, not notification level
1078 // This prevents excessive database updates during notification processing
1079
1080 // Also try to send via SSE if active connection exists
1081 def writer = activeConnections.get(sessionId)
1082 if (writer && !writer.checkError()) {
1083 try {
1084 // Send as proper JSON-RPC notification via SSE
1085 def notificationMessage = [
1086 jsonrpc: "2.0",
1087 method: notification.method ?: "notifications/message",
1088 params: notification.params ?: notification
1089 ]
1090 sendSseEvent(writer, "message", JsonOutput.toJson(notificationMessage), System.currentTimeMillis())
1091 logger.debug("Sent notification via SSE to session ${sessionId}")
1092 } catch (Exception e) {
1093 logger.warn("Failed to send notification via SSE to session ${sessionId}: ${e.message}")
1094 }
1095 }
1096 }
1097
1098 /**
1099 * Get Visit from cache to reduce database access and prevent lock contention
1100 */
1101 private EntityValue getCachedVisit(ExecutionContext ec, String sessionId) {
1102 if (!sessionId) return null 813 if (!sessionId) return null
1103 814
1104 EntityValue cachedVisit = visitCache.get(sessionId) 815 EntityValue cachedVisit = visitCache.get(sessionId)
1105 if (cachedVisit != null) { 816 if (cachedVisit != null) {
1106 return cachedVisit 817 return cachedVisit
1107 } 818 }
1108 819
1109 // Not in cache, load from database with authz disabled
1110 try { 820 try {
1111 ec.artifactExecution.disableAuthz() 821 ec.artifactExecution.disableAuthz()
1112 EntityValue visit = ec.entity.find("moqui.server.Visit") 822 EntityValue visit = ec.entity.find("moqui.server.Visit")
...@@ -1120,165 +830,101 @@ class EnhancedMcpServlet extends HttpServlet { ...@@ -1120,165 +830,101 @@ class EnhancedMcpServlet extends HttpServlet {
1120 ec.artifactExecution.enableAuthz() 830 ec.artifactExecution.enableAuthz()
1121 } 831 }
1122 } 832 }
1123 833
1124 /**
1125 * Throttled session activity update to prevent database lock contention
1126 * Uses synchronized per-session to prevent concurrent updates
1127 */
1128 private void updateSessionActivityThrottled(String sessionId) { 834 private void updateSessionActivityThrottled(String sessionId) {
1129 if (!sessionId) return 835 if (!sessionId) return
1130 836
1131 long now = System.currentTimeMillis() 837 long now = System.currentTimeMillis()
1132 Long lastUpdate = lastActivityUpdate.get(sessionId) 838 Long lastUpdate = lastActivityUpdate.get(sessionId)
1133 839
1134 // Only update if 30 seconds have passed since last update
1135 if (lastUpdate == null || (now - lastUpdate) > ACTIVITY_UPDATE_INTERVAL_MS) { 840 if (lastUpdate == null || (now - lastUpdate) > ACTIVITY_UPDATE_INTERVAL_MS) {
1136 // Use session-specific lock to avoid sessionId.intern() deadlocks 841 Object sessionLock = sessionAdapter.getSessionLock(sessionId)
1137 Object sessionLock = sessionLocks.computeIfAbsent(sessionId, { new Object() })
1138 synchronized (sessionLock) { 842 synchronized (sessionLock) {
1139 // Double-check after acquiring lock
1140 lastUpdate = lastActivityUpdate.get(sessionId) 843 lastUpdate = lastActivityUpdate.get(sessionId)
1141 if (lastUpdate == null || (now - lastUpdate) > ACTIVITY_UPDATE_INTERVAL_MS) { 844 if (lastUpdate == null || (now - lastUpdate) > ACTIVITY_UPDATE_INTERVAL_MS) {
1142 try { 845 sessionAdapter.touchSession(sessionId)
1143 // Look up Visit and update activity 846 lastActivityUpdate.put(sessionId, now)
1144 ExecutionContextFactoryImpl ecfi = (ExecutionContextFactoryImpl) getServletContext().getAttribute("executionContextFactory") 847 logger.debug("Updated activity for session ${sessionId}")
1145 if (ecfi) {
1146 def ec = ecfi.getEci()
1147 try {
1148 def visit = getCachedVisit(ec, sessionId)
1149
1150 if (visit) {
1151 visit.thruDate = ec.user.getNowTimestamp()
1152 //visit.update()
1153 // Update cache with new thruDate
1154 visitCache.put(sessionId, visit)
1155 lastActivityUpdate.put(sessionId, now)
1156 logger.debug("Updated activity for session ${sessionId} (throttled, synchronized)")
1157 }
1158 } finally {
1159 ec.destroy()
1160 }
1161 }
1162 } catch (Exception e) {
1163 logger.warn("Failed to update session activity for ${sessionId}: ${e.message}")
1164 }
1165 } 848 }
1166 } 849 }
1167 } 850 }
1168 } 851 }
1169 852
1170 @Override 853 private static boolean handleCors(HttpServletRequest request, HttpServletResponse response) {
1171 void destroy() { 854 String originHeader = request.getHeader("Origin")
1172 logger.info("Destroying EnhancedMcpServlet") 855 if (originHeader) {
1173 856 response.setHeader("Access-Control-Allow-Origin", originHeader)
1174 // Close all active connections 857 response.setHeader("Access-Control-Allow-Credentials", "true")
1175 activeConnections.values().each { writer ->
1176 try {
1177 writer.write("event: shutdown\ndata: {\"type\":\"shutdown\",\"timestamp\":\"${System.currentTimeMillis()}\"}\n\n")
1178 writer.flush()
1179 } catch (Exception e) {
1180 logger.debug("Error sending shutdown to connection: ${e.message}")
1181 }
1182 } 858 }
1183 activeConnections.clear() 859
1184 860 String methodHeader = request.getHeader("Access-Control-Request-Method")
1185 super.destroy() 861 if (methodHeader) {
862 response.setHeader("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
863 response.setHeader("Access-Control-Allow-Headers", "Content-Type, Authorization, Mcp-Session-Id, MCP-Protocol-Version, Accept")
864 response.setHeader("Access-Control-Max-Age", "3600")
865 return true
866 }
867 return false
1186 } 868 }
1187 869
1188 /** 870 /**
1189 * Broadcast message to all active MCP sessions 871 * Queue a notification for delivery to a session
1190 */ 872 */
1191 void broadcastToAllSessions(JsonRpcMessage message) { 873 void queueNotification(String sessionId, Map notification) {
1192 try { 874 if (!sessionId || !notification) return
1193 ec.artifactExecution.disableAuthz() 875 transport.sendNotification(sessionId, notification)
1194 // Look up all MCP Visits (persistent)
1195 def mcpVisits = ec.entity.find("moqui.server.Visit")
1196 .condition("initialRequest", "like", "%mcpSession%")
1197 .list()
1198
1199 logger.info("Broadcasting to ${mcpVisits.size()} MCP visits, ${activeConnections.size()} active connections")
1200
1201 int successCount = 0
1202 int failureCount = 0
1203
1204 // Send to active connections (transient)
1205 mcpVisits.each { visit ->
1206 PrintWriter writer = activeConnections.get(visit.visitId)
1207 if (writer && !writer.checkError()) {
1208 try {
1209 sendSseEvent(writer, "message", message.toJson())
1210 successCount++
1211 } catch (Exception e) {
1212 logger.warn("Failed to send broadcast to ${visit.visitId}: ${e.message}")
1213 // Remove broken connection
1214 activeConnections.remove(visit.visitId)
1215 failureCount++
1216 }
1217 } else {
1218 // No active connection for this visit
1219 failureCount++
1220 }
1221 }
1222
1223 logger.info("Broadcast completed: ${successCount} successful, ${failureCount} failed")
1224
1225 } catch (Exception e) {
1226 logger.error("Error broadcasting to all sessions: ${e.message}", e)
1227 } finally {
1228 ec.artifactExecution.enableAuthz()
1229 }
1230 } 876 }
1231 877
1232 /** 878 /**
1233 * Send SSE event to specific session (helper method) 879 * Send to a specific session
1234 */ 880 */
1235 void sendToSession(String sessionId, JsonRpcMessage message) { 881 void sendToSession(String sessionId, Map message) {
1236 try { 882 transport.sendMessage(sessionId, message)
1237 PrintWriter writer = activeConnections.get(sessionId)
1238 if (writer && !writer.checkError()) {
1239 sendSseEvent(writer, "message", message.toJson())
1240 logger.debug("Sent message to session ${sessionId}")
1241 } else {
1242 logger.warn("No active connection for session ${sessionId}")
1243 }
1244 } catch (Exception e) {
1245 logger.error("Error sending message to session ${sessionId}: ${e.message}", e)
1246 activeConnections.remove(sessionId)
1247 visitCache.remove(sessionId)
1248 sessionUsers.remove(sessionId)
1249 }
1250 } 883 }
1251 884
1252 /** 885 /**
1253 * Get session statistics for monitoring 886 * Get session statistics
1254 */ 887 */
1255 Map getSessionStatistics() { 888 Map getSessionStatistics() {
1256 try { 889 def stats = transport.getStatistics()
1257 // Look up all MCP Visits (persistent) 890 return stats + [
1258 def mcpVisits = ec.entity.find("moqui.server.Visit") 891 maxConnections: maxConnections,
1259 .condition("initialRequest", "like", "%mcpSession%") 892 endpoints: [
1260 .disableAuthz() 893 sse: sseEndpoint,
1261 .list() 894 message: messageEndpoint
1262 895 ],
1263 return [ 896 keepAliveInterval: keepAliveIntervalSeconds
1264 totalMcpVisits: mcpVisits.size(), 897 ]
1265 activeConnections: activeConnections.size(), 898 }
1266 maxConnections: maxConnections, 899
1267 architecture: "Visit-based sessions with connection registry", 900 /**
1268 message: "Enhanced MCP with session tracking", 901 * Get the notification bridge for external access
1269 endpoints: [ 902 */
1270 sse: sseEndpoint, 903 MoquiNotificationMcpBridge getNotificationBridge() {
1271 message: messageEndpoint 904 return notificationBridge
1272 ], 905 }
1273 keepAliveInterval: keepAliveIntervalSeconds 906
1274 ] 907 /**
1275 } catch (Exception e) { 908 * Get the transport for external access
1276 logger.error("Error getting session statistics: ${e.message}", e) 909 */
1277 return [ 910 SseTransport getTransport() {
1278 activeConnections: activeConnections.size(), 911 return transport
1279 maxConnections: maxConnections, 912 }
1280 error: e.message 913
1281 ] 914 @Override
915 void destroy() {
916 logger.info("Destroying EnhancedMcpServlet")
917
918 // Close all sessions
919 for (String sessionId in sessionAdapter.getAllSessionIds()) {
920 transport.closeSession(sessionId)
1282 } 921 }
922
923 // Clean up notification bridge
924 if (notificationBridge) {
925 notificationBridge.destroy()
926 }
927
928 super.destroy()
1283 } 929 }
1284 } 930 }
......
1 /*
2 * This software is in the public domain under CC0 1.0 Universal plus a
3 * Grant of Patent License.
4 *
5 * To the extent possible under law, author(s) have dedicated all
6 * copyright and related and neighboring rights to this software to the
7 * public domain worldwide. This software is distributed without any
8 * warranty.
9 *
10 * You should have received a copy of the CC0 Public Domain Dedication
11 * along with this software (see the LICENSE.md file). If not, see
12 * <http://creativecommons.org/publicdomain/zero/1.0/>.
13 */
14 package org.moqui.mcp
15
16 import groovy.json.JsonOutput
17
18 /**
19 * Simple JSON-RPC Message classes for MCP compatibility
20 */
21 class JsonRpcMessage {
22 String jsonrpc = "2.0"
23
24 String toJson() {
25 return JsonOutput.toJson(this)
26 }
27 }
28
29 class JsonRpcResponse extends JsonRpcMessage {
30 Object id
31 Object result
32 Map error
33
34 JsonRpcResponse(Object result, Object id) {
35 this.result = result
36 this.id = id
37 }
38
39 JsonRpcResponse(Map error, Object id) {
40 this.error = error
41 this.id = id
42 }
43 }
44
45 class JsonRpcNotification extends JsonRpcMessage {
46 String method
47 Object params
48
49 JsonRpcNotification(String method, Object params = null) {
50 this.method = method
51 this.params = params
52 }
53 }
...\ No newline at end of file ...\ No newline at end of file
1 /*
2 * This software is in the public domain under CC0 1.0 Universal plus a
3 * Grant of Patent License.
4 *
5 * To the extent possible under law, author(s) have dedicated all
6 * copyright and related and neighboring rights to this software to the
7 * public domain worldwide. This software is distributed without any
8 * warranty.
9 *
10 * You should have received a copy of the CC0 Public Domain Dedication
11 * along with this software (see the LICENSE.md file). If not, see
12 * <http://creativecommons.org/publicdomain/zero/1.0/>.
13 */
14 package org.moqui.mcp
15
16 /**
17 * Simple transport interface for MCP messages
18 */
19 interface MoquiMcpTransport {
20 void sendMessage(JsonRpcMessage message)
21 boolean isActive()
22 String getSessionId()
23 }
...\ No newline at end of file ...\ No newline at end of file
1 /*
2 * This software is in the public domain under CC0 1.0 Universal plus a
3 * Grant of Patent License.
4 *
5 * To the extent possible under law, author(s) have dedicated all
6 * copyright and related and neighboring rights to this software to the
7 * public domain worldwide. This software is distributed without any
8 * warranty.
9 *
10 * You should have received a copy of the CC0 Public Domain Dedication
11 * along with this software (see the LICENSE.md file). If not, see
12 * <http://creativecommons.org/publicdomain/zero/1.0/>.
13 */
14 package org.moqui.mcp
15
16 import org.moqui.context.ExecutionContext
17 import org.moqui.impl.context.ExecutionContextImpl
18 import org.moqui.entity.EntityValue
19 import org.slf4j.Logger
20 import org.slf4j.LoggerFactory
21
22 import java.util.concurrent.atomic.AtomicBoolean
23 import java.util.concurrent.atomic.AtomicLong
24
25 /**
26 * MCP Session implementation that uses Moqui's Visit entity directly
27 * Eliminates custom session management by leveraging Moqui's built-in Visit system
28 */
29 class VisitBasedMcpSession implements MoquiMcpTransport {
30 protected final static Logger logger = LoggerFactory.getLogger(VisitBasedMcpSession.class)
31
32 private final EntityValue visit // The Visit entity record
33 private final PrintWriter writer
34 private final ExecutionContextImpl ec
35 private final AtomicBoolean active = new AtomicBoolean(true)
36 private final AtomicBoolean closing = new AtomicBoolean(false)
37 private final AtomicLong messageCount = new AtomicLong(0)
38
39 VisitBasedMcpSession(EntityValue visit, PrintWriter writer, ExecutionContextImpl ec) {
40 this.visit = visit
41 this.writer = writer
42 this.ec = ec
43
44 // Initialize MCP session in Visit if not already done
45 initializeMcpSession()
46 }
47
48 private void initializeMcpSession() {
49 try {
50 def metadata = getSessionMetadata()
51 if (!metadata.mcpSession) {
52 // Mark this Visit as an MCP session
53 metadata.mcpSession = true
54 metadata.mcpProtocolVersion = "2025-11-25"
55 metadata.mcpCreatedAt = System.currentTimeMillis()
56 metadata.mcpTransportType = "SSE"
57 metadata.mcpMessageCount = 0
58 saveSessionMetadata(metadata)
59
60 logger.debug("MCP Session initialized for Visit ${visit.visitId}")
61 }
62 } catch (Exception e) {
63 logger.warn("Failed to initialize MCP session for Visit ${visit.visitId}: ${e.message}")
64 }
65 }
66
67 @Override
68 void sendMessage(JsonRpcMessage message) {
69 if (!active.get() || closing.get()) {
70 logger.warn("Attempted to send message on inactive or closing session ${visit.visitId}")
71 return
72 }
73
74 try {
75 String jsonMessage = message.toJson()
76 sendSseEvent("message", jsonMessage)
77 messageCount.incrementAndGet()
78
79 // Session activity now managed at servlet level to avoid lock contention
80
81 } catch (Exception e) {
82 logger.error("Failed to send message on session ${visit.visitId}: ${e.message}")
83 if (e.message?.contains("disconnected") || e.message?.contains("Client disconnected")) {
84 close()
85 }
86 }
87 }
88
89 void closeGracefully() {
90 if (!active.compareAndSet(true, false)) {
91 return // Already closed
92 }
93
94 closing.set(true)
95 logger.debug("Gracefully closing MCP session ${visit.visitId}")
96
97 try {
98 // Send graceful shutdown notification
99 def shutdownMessage = new JsonRpcNotification("shutdown", [
100 sessionId: visit.visitId,
101 timestamp: System.currentTimeMillis()
102 ])
103 sendMessage(shutdownMessage)
104
105 // Give some time for message to be sent
106 Thread.sleep(100)
107
108 } catch (Exception e) {
109 logger.warn("Error during graceful shutdown of session ${visit.visitId}: ${e.message}")
110 } finally {
111 close()
112 }
113 }
114
115 void close() {
116 if (!active.compareAndSet(true, false)) {
117 return // Already closed
118 }
119
120 logger.debug("Closing MCP session ${visit.visitId} (messages sent: ${messageCount.get()})")
121
122 try {
123 // Send final close event if writer is still available
124 if (writer && !writer.checkError()) {
125 sendSseEvent("close", groovy.json.JsonOutput.toJson([
126 type: "disconnected",
127 sessionId: visit.visitId,
128 messageCount: messageCount.get(),
129 timestamp: System.currentTimeMillis()
130 ]))
131 }
132
133 } catch (Exception e) {
134 logger.warn("Error during session close ${visit.visitId}: ${e.message}")
135 }
136 }
137
138 @Override
139 boolean isActive() {
140 return active.get() && !closing.get() && writer && !writer.checkError()
141 }
142
143 @Override
144 String getSessionId() {
145 return visit.visitId
146 }
147
148 String getVisitId() {
149 return visit.visitId
150 }
151
152 EntityValue getVisit() {
153 return visit
154 }
155
156 /**
157 * Get session statistics
158 */
159 Map getSessionStats() {
160 return [
161 sessionId: visit.visitId,
162 visitId: visit.visitId,
163 createdAt: visit.fromDate,
164 messageCount: messageCount.get(),
165 active: active.get(),
166 closing: closing.get(),
167 duration: System.currentTimeMillis() - visit.fromDate.time
168 ]
169 }
170
171 /**
172 * Send SSE event with proper formatting
173 */
174 private void sendSseEvent(String eventType, String data) throws IOException {
175 if (!writer || writer.checkError()) {
176 throw new IOException("Writer is closed or client disconnected")
177 }
178
179 writer.write("event: " + eventType + "\n")
180 writer.write("data: " + data + "\n\n")
181 writer.flush()
182
183 if (writer.checkError()) {
184 throw new IOException("Client disconnected during write")
185 }
186 }
187
188 // Session activity management moved to servlet level to avoid database lock contention
189 // This method is no longer called - servlet manages session updates throttled
190
191 // Session end management moved to servlet level to avoid database lock contention
192 // Servlet will handle Visit updates when connections close
193
194 /**
195 * Get session metadata from Visit's initialRequest field
196 */
197 Map getSessionMetadata() {
198 try {
199 def metadataJson = visit.initialRequest
200 if (metadataJson) {
201 return groovy.json.JsonSlurper().parseText(metadataJson) as Map
202 }
203 } catch (Exception e) {
204 logger.debug("Failed to parse session metadata: ${e.message}")
205 }
206 return [:]
207 }
208
209 /**
210 * Add custom metadata to session
211 */
212 void addSessionMetadata(String key, Object value) {
213 def metadata = getSessionMetadata()
214 metadata[key] = value
215 saveSessionMetadata(metadata)
216 }
217
218 /**
219 * Save session metadata to Visit's initialRequest field
220 */
221 private void saveSessionMetadata(Map metadata) {
222 // Session metadata stored in memory only - no Visit updates to prevent lock contention
223 try {
224 sessionMetadata.putAll(metadata)
225 } catch (Exception e) {
226 logger.debug("Failed to save session metadata: ${e.message}")
227 }
228 }
229 }
...\ No newline at end of file ...\ No newline at end of file
1 /*
2 * This software is in the public domain under CC0 1.0 Universal plus a
3 * Grant of Patent License.
4 *
5 * To the extent possible under law, author(s) have dedicated all
6 * copyright and related and neighboring rights to this software to the
7 * public domain worldwide. This software is distributed without any
8 * warranty.
9 *
10 * You should have received a copy of the CC0 Public Domain Dedication
11 * along with this software (see the LICENSE.md file). If not, see
12 * <http://creativecommons.org/publicdomain/zero/1.0/>.
13 */
14 package org.moqui.mcp.adapter
15
16 import org.moqui.entity.EntityValue
17 import org.slf4j.Logger
18 import org.slf4j.LoggerFactory
19
20 import java.util.concurrent.ConcurrentHashMap
21
22 /**
23 * Adapter that maps Moqui Visit sessions to MCP sessions.
24 * Provides in-memory session tracking to avoid database lock contention.
25 */
26 class McpSessionAdapter {
27 protected final static Logger logger = LoggerFactory.getLogger(McpSessionAdapter.class)
28
29 // Visit ID → MCP Session state
30 private final Map<String, McpSession> sessions = new ConcurrentHashMap<>()
31
32 // User ID → Set of Visit IDs (for user-targeted notifications)
33 private final Map<String, Set<String>> userSessions = new ConcurrentHashMap<>()
34
35 // Session-specific locks to avoid sessionId.intern() deadlocks
36 private final Map<String, Object> sessionLocks = new ConcurrentHashMap<>()
37
38 /**
39 * Create a new MCP session from a Moqui Visit
40 * @param visit The Moqui Visit entity
41 * @return The created McpSession
42 */
43 McpSession createSession(EntityValue visit) {
44 String visitId = visit.visitId?.toString()
45 String userId = visit.userId?.toString()
46
47 if (!visitId) {
48 throw new IllegalArgumentException("Visit must have a visitId")
49 }
50
51 def session = new McpSession(
52 visitId: visitId,
53 userId: userId,
54 state: McpSession.STATE_INITIALIZED
55 )
56 sessions.put(visitId, session)
57
58 // Track user → sessions mapping
59 if (userId) {
60 def userSet = userSessions.computeIfAbsent(userId) { new ConcurrentHashMap<>().newKeySet() }
61 userSet.add(visitId)
62 }
63
64 logger.debug("Created MCP session ${visitId} for user ${userId}")
65 return session
66 }
67
68 /**
69 * Create a new MCP session with explicit parameters
70 * @param visitId The Visit/session ID
71 * @param userId The user ID
72 * @return The created McpSession
73 */
74 McpSession createSession(String visitId, String userId) {
75 if (!visitId) {
76 throw new IllegalArgumentException("visitId is required")
77 }
78
79 def session = new McpSession(
80 visitId: visitId,
81 userId: userId,
82 state: McpSession.STATE_INITIALIZED
83 )
84 sessions.put(visitId, session)
85
86 // Track user → sessions mapping
87 if (userId) {
88 def userSet = userSessions.computeIfAbsent(userId) { new ConcurrentHashMap<>().newKeySet() }
89 userSet.add(visitId)
90 }
91
92 logger.debug("Created MCP session ${visitId} for user ${userId}")
93 return session
94 }
95
96 /**
97 * Close and remove a session
98 * @param visitId The session/visit ID to close
99 */
100 void closeSession(String visitId) {
101 def session = sessions.remove(visitId)
102 if (session) {
103 // Remove from user tracking
104 if (session.userId) {
105 def userSet = userSessions.get(session.userId)
106 if (userSet) {
107 userSet.remove(visitId)
108 if (userSet.isEmpty()) {
109 userSessions.remove(session.userId)
110 }
111 }
112 }
113 // Clean up session lock
114 sessionLocks.remove(visitId)
115 logger.debug("Closed MCP session ${visitId}")
116 }
117 }
118
119 /**
120 * Get a session by visit ID
121 * @param visitId The session/visit ID
122 * @return The McpSession or null if not found
123 */
124 McpSession getSession(String visitId) {
125 return sessions.get(visitId)
126 }
127
128 /**
129 * Check if a session exists and is active
130 * @param visitId The session/visit ID
131 * @return true if the session exists
132 */
133 boolean hasSession(String visitId) {
134 return sessions.containsKey(visitId)
135 }
136
137 /**
138 * Get all session IDs for a specific user
139 * @param userId The user ID
140 * @return Set of session/visit IDs (empty set if none)
141 */
142 Set<String> getSessionsForUser(String userId) {
143 return userSessions.get(userId) ?: Collections.emptySet()
144 }
145
146 /**
147 * Get all active session IDs
148 * @return Set of all session IDs
149 */
150 Set<String> getAllSessionIds() {
151 return sessions.keySet()
152 }
153
154 /**
155 * Get the count of active sessions
156 * @return Number of active sessions
157 */
158 int getSessionCount() {
159 return sessions.size()
160 }
161
162 /**
163 * Get a session-specific lock for synchronized operations
164 * @param visitId The session/visit ID
165 * @return The lock object
166 */
167 Object getSessionLock(String visitId) {
168 return sessionLocks.computeIfAbsent(visitId) { new Object() }
169 }
170
171 /**
172 * Update session state
173 * @param visitId The session/visit ID
174 * @param state The new state
175 */
176 void setSessionState(String visitId, int state) {
177 def session = sessions.get(visitId)
178 if (session) {
179 session.state = state
180 logger.debug("Session ${visitId} state changed to ${state}")
181 }
182 }
183
184 /**
185 * Update session activity timestamp
186 * @param visitId The session/visit ID
187 */
188 void touchSession(String visitId) {
189 def session = sessions.get(visitId)
190 if (session) {
191 session.touch()
192 }
193 }
194
195 /**
196 * Get session statistics for monitoring
197 * @return Map of session statistics
198 */
199 Map getStatistics() {
200 return [
201 totalSessions: sessions.size(),
202 usersWithSessions: userSessions.size(),
203 sessionsPerUser: userSessions.collectEntries { userId, sessionSet ->
204 [(userId): sessionSet.size()]
205 }
206 ]
207 }
208 }
209
210 /**
211 * Represents an MCP session state
212 */
213 class McpSession {
214 static final int STATE_UNINITIALIZED = 0
215 static final int STATE_INITIALIZING = 1
216 static final int STATE_INITIALIZED = 2
217
218 String visitId
219 String userId
220 int state = STATE_UNINITIALIZED
221 long lastActivity = System.currentTimeMillis()
222 long createdAt = System.currentTimeMillis()
223
224 // SSE writer reference (for active connections)
225 PrintWriter sseWriter
226
227 // Notification queue for this session
228 List<Map> notificationQueue = Collections.synchronizedList(new ArrayList<>())
229
230 // Subscriptions (method names this session is subscribed to)
231 Set<String> subscriptions = Collections.newSetFromMap(new ConcurrentHashMap<>())
232
233 void touch() {
234 lastActivity = System.currentTimeMillis()
235 }
236
237 boolean isActive() {
238 return state == STATE_INITIALIZED && sseWriter != null && !sseWriter.checkError()
239 }
240
241 boolean hasActiveWriter() {
242 return sseWriter != null && !sseWriter.checkError()
243 }
244
245 long getDurationMs() {
246 return System.currentTimeMillis() - createdAt
247 }
248
249 Map toMap() {
250 return [
251 visitId: visitId,
252 userId: userId,
253 state: state,
254 lastActivity: lastActivity,
255 createdAt: createdAt,
256 durationMs: getDurationMs(),
257 active: isActive(),
258 hasWriter: sseWriter != null,
259 queuedNotifications: notificationQueue.size(),
260 subscriptions: subscriptions.toList()
261 ]
262 }
263 }
1 /*
2 * This software is in the public domain under CC0 1.0 Universal plus a
3 * Grant of Patent License.
4 *
5 * To the extent possible under law, author(s) have dedicated all
6 * copyright and related and neighboring rights to this software to the
7 * public domain worldwide. This software is distributed without any
8 * warranty.
9 *
10 * You should have received a copy of the CC0 Public Domain Dedication
11 * along with this software (see the LICENSE.md file). If not, see
12 * <http://creativecommons.org/publicdomain/zero/1.0/>.
13 */
14 package org.moqui.mcp.adapter
15
16 import org.moqui.context.ExecutionContext
17 import org.slf4j.Logger
18 import org.slf4j.LoggerFactory
19
20 /**
21 * Adapter that maps MCP tool calls to Moqui services.
22 * Provides a clean translation layer between MCP protocol and Moqui service framework.
23 */
24 class McpToolAdapter {
25 protected final static Logger logger = LoggerFactory.getLogger(McpToolAdapter.class)
26
27 // MCP tool name → Moqui service name mapping
28 private static final Map<String, String> TOOL_SERVICE_MAP = [
29 'moqui_browse_screens': 'McpServices.mcp#BrowseScreens',
30 'moqui_search_screens': 'McpServices.mcp#SearchScreens',
31 'moqui_get_screen_details': 'McpServices.mcp#GetScreenDetails',
32 'moqui_get_help': 'McpServices.mcp#GetHelp'
33 ]
34
35 // MCP method → Moqui service name mapping for JSON-RPC methods
36 private static final Map<String, String> METHOD_SERVICE_MAP = [
37 'initialize': 'McpServices.mcp#Initialize',
38 'ping': 'McpServices.mcp#Ping',
39 'tools/list': 'McpServices.list#Tools',
40 'tools/call': 'McpServices.mcp#ToolsCall',
41 'resources/list': 'McpServices.mcp#ResourcesList',
42 'resources/read': 'McpServices.mcp#ResourcesRead',
43 'resources/templates/list': 'McpServices.mcp#ResourcesTemplatesList',
44 'resources/subscribe': 'McpServices.mcp#ResourcesSubscribe',
45 'resources/unsubscribe': 'McpServices.mcp#ResourcesUnsubscribe',
46 'prompts/list': 'McpServices.mcp#PromptsList',
47 'prompts/get': 'McpServices.mcp#PromptsGet',
48 'roots/list': 'McpServices.mcp#RootsList',
49 'sampling/createMessage': 'McpServices.mcp#SamplingCreateMessage',
50 'elicitation/create': 'McpServices.mcp#ElicitationCreate'
51 ]
52
53 // Tool descriptions for MCP tool definitions
54 private static final Map<String, String> TOOL_DESCRIPTIONS = [
55 'moqui_browse_screens': 'Browse Moqui screen hierarchy and render screen content',
56 'moqui_search_screens': 'Search for screens by name to find their paths',
57 'moqui_get_screen_details': 'Get screen field details including dropdown options',
58 'moqui_get_help': 'Fetch extended documentation for a screen or service'
59 ]
60
61 /**
62 * Call an MCP tool, translating to the appropriate Moqui service
63 * @param ec The execution context
64 * @param toolName The MCP tool name
65 * @param arguments The tool arguments
66 * @return The result map or error map
67 */
68 Map callTool(ExecutionContext ec, String toolName, Map arguments) {
69 String serviceName = TOOL_SERVICE_MAP.get(toolName)
70 if (!serviceName) {
71 logger.warn("Unknown tool: ${toolName}")
72 return [error: [code: -32601, message: "Unknown tool: ${toolName}"]]
73 }
74
75 logger.debug("Calling tool ${toolName} -> service ${serviceName} with args: ${arguments}")
76
77 try {
78 ec.artifactExecution.disableAuthz()
79 def result = ec.service.sync()
80 .name(serviceName)
81 .parameters(arguments ?: [:])
82 .call()
83
84 logger.debug("Tool ${toolName} completed successfully")
85
86 // Extract result from service response if wrapped
87 if (result?.containsKey('result')) {
88 return result.result
89 }
90 return result ?: [:]
91
92 } catch (Exception e) {
93 logger.error("Error calling tool ${toolName}: ${e.message}", e)
94 return [error: [code: -32000, message: e.message]]
95 } finally {
96 ec.artifactExecution.enableAuthz()
97 }
98 }
99
100 /**
101 * Call an MCP method, translating to the appropriate Moqui service
102 * @param ec The execution context
103 * @param method The MCP method name
104 * @param params The method parameters
105 * @return The result map or error map
106 */
107 Map callMethod(ExecutionContext ec, String method, Map params) {
108 String serviceName = METHOD_SERVICE_MAP.get(method)
109 if (!serviceName) {
110 logger.warn("Unknown method: ${method}")
111 return [error: [code: -32601, message: "Method not found: ${method}"]]
112 }
113
114 logger.debug("Calling method ${method} -> service ${serviceName}")
115
116 try {
117 ec.artifactExecution.disableAuthz()
118 def result = ec.service.sync()
119 .name(serviceName)
120 .parameters(params ?: [:])
121 .call()
122
123 logger.debug("Method ${method} completed successfully")
124
125 // Extract result from service response if wrapped
126 if (result?.containsKey('result')) {
127 return result.result
128 }
129 return result ?: [:]
130
131 } catch (Exception e) {
132 logger.error("Error calling method ${method}: ${e.message}", e)
133 return [error: [code: -32603, message: "Internal error: ${e.message}"]]
134 } finally {
135 ec.artifactExecution.enableAuthz()
136 }
137 }
138
139 /**
140 * Check if a tool name is valid
141 * @param toolName The tool name to check
142 * @return true if the tool is known
143 */
144 boolean isValidTool(String toolName) {
145 return TOOL_SERVICE_MAP.containsKey(toolName)
146 }
147
148 /**
149 * Check if a method name is valid (has a service mapping)
150 * @param method The method name to check
151 * @return true if the method has a service mapping
152 */
153 boolean isValidMethod(String method) {
154 return METHOD_SERVICE_MAP.containsKey(method)
155 }
156
157 /**
158 * Get the service name for a given tool
159 * @param toolName The tool name
160 * @return The service name or null if not found
161 */
162 String getServiceForTool(String toolName) {
163 return TOOL_SERVICE_MAP.get(toolName)
164 }
165
166 /**
167 * Get the service name for a given method
168 * @param method The method name
169 * @return The service name or null if not found
170 */
171 String getServiceForMethod(String method) {
172 return METHOD_SERVICE_MAP.get(method)
173 }
174
175 /**
176 * Get the list of available tools with their definitions
177 * @return List of tool definition maps
178 */
179 List<Map> listTools() {
180 return TOOL_SERVICE_MAP.keySet().collect { toolName ->
181 [
182 name: toolName,
183 description: TOOL_DESCRIPTIONS.get(toolName) ?: "MCP tool: ${toolName}",
184 serviceName: TOOL_SERVICE_MAP.get(toolName)
185 ]
186 }
187 }
188
189 /**
190 * Get tool description
191 * @param toolName The tool name
192 * @return The tool description or null if not found
193 */
194 String getToolDescription(String toolName) {
195 return TOOL_DESCRIPTIONS.get(toolName)
196 }
197
198 /**
199 * Get all supported tool names
200 * @return Set of tool names
201 */
202 Set<String> getToolNames() {
203 return TOOL_SERVICE_MAP.keySet()
204 }
205
206 /**
207 * Get all supported method names
208 * @return Set of method names
209 */
210 Set<String> getMethodNames() {
211 return METHOD_SERVICE_MAP.keySet()
212 }
213 }
1 /*
2 * This software is in the public domain under CC0 1.0 Universal plus a
3 * Grant of Patent License.
4 *
5 * To the extent possible under law, author(s) have dedicated all
6 * copyright and related and neighboring rights to this software to the
7 * public domain worldwide. This software is distributed without any
8 * warranty.
9 *
10 * You should have received a copy of the CC0 Public Domain Dedication
11 * along with this software (see the LICENSE.md file). If not, see
12 * <http://creativecommons.org/publicdomain/zero/1.0/>.
13 */
14 package org.moqui.mcp.adapter
15
16 import org.moqui.context.ExecutionContextFactory
17 import org.moqui.context.NotificationMessage
18 import org.moqui.context.NotificationMessageListener
19 import org.moqui.mcp.transport.MoquiMcpTransport
20 import org.slf4j.Logger
21 import org.slf4j.LoggerFactory
22
23 /**
24 * Bridge that connects Moqui's NotificationMessage system to MCP notifications.
25 * Implements NotificationMessageListener to receive all Moqui notifications
26 * and forwards them to MCP clients via the transport layer.
27 */
28 class MoquiNotificationMcpBridge implements NotificationMessageListener {
29 protected final static Logger logger = LoggerFactory.getLogger(MoquiNotificationMcpBridge.class)
30
31 private ExecutionContextFactory ecf
32 private MoquiMcpTransport transport
33
34 // Topic prefix for MCP-specific notifications (optional filtering)
35 private static final String MCP_TOPIC_PREFIX = "mcp."
36
37 // Whether to forward all notifications or only MCP-prefixed ones
38 private boolean forwardAllNotifications = true
39
40 /**
41 * Initialize the bridge with the ECF and transport
42 * Note: This method signature matches what the ECF registration expects
43 */
44 @Override
45 void init(ExecutionContextFactory ecf) {
46 this.ecf = ecf
47 logger.info("MoquiNotificationMcpBridge initialized (transport not yet set)")
48 }
49
50 /**
51 * Set the transport after initialization
52 * @param transport The MCP transport to use for sending notifications
53 */
54 void setTransport(MoquiMcpTransport transport) {
55 this.transport = transport
56 logger.info("MoquiNotificationMcpBridge transport configured: ${transport?.class?.simpleName}")
57 }
58
59 /**
60 * Configure whether to forward all notifications or only MCP-prefixed ones
61 * @param forwardAll If true, forward all notifications; if false, only forward those with topic starting with 'mcp.'
62 */
63 void setForwardAllNotifications(boolean forwardAll) {
64 this.forwardAllNotifications = forwardAll
65 logger.info("MoquiNotificationMcpBridge forwardAllNotifications set to: ${forwardAll}")
66 }
67
68 @Override
69 void onMessage(NotificationMessage nm) {
70 if (transport == null) {
71 logger.trace("Transport not configured, skipping notification: ${nm.topic}")
72 return
73 }
74
75 // Optionally filter by topic prefix
76 if (!forwardAllNotifications && !nm.topic?.startsWith(MCP_TOPIC_PREFIX)) {
77 logger.trace("Skipping non-MCP notification: ${nm.topic}")
78 return
79 }
80
81 try {
82 // Convert Moqui notification → MCP notification format
83 Map mcpNotification = convertToMcpNotification(nm)
84
85 // Get target users
86 Set<String> notifyUserIds = nm.getNotifyUserIds()
87
88 if (notifyUserIds && !notifyUserIds.isEmpty()) {
89 // Send to each target user's active MCP sessions
90 int sentCount = 0
91 for (String userId in notifyUserIds) {
92 try {
93 transport.sendNotificationToUser(userId, mcpNotification)
94 sentCount++
95 logger.debug("Sent MCP notification to user ${userId}: ${nm.topic}")
96 } catch (Exception e) {
97 logger.warn("Failed to send MCP notification to user ${userId}: ${e.message}")
98 }
99 }
100 logger.info("Forwarded Moqui notification '${nm.topic}' to ${sentCount} users via MCP")
101 } else {
102 // No specific users, could broadcast or log
103 logger.debug("Notification '${nm.topic}' has no target users, skipping MCP forward")
104 }
105
106 } catch (Exception e) {
107 logger.error("Error converting/sending Moqui notification to MCP: ${e.message}", e)
108 }
109 }
110
111 /**
112 * Convert a Moqui NotificationMessage to MCP notification format
113 * @param nm The Moqui notification
114 * @return The MCP notification map
115 */
116 private Map convertToMcpNotification(NotificationMessage nm) {
117 return [
118 jsonrpc: "2.0",
119 method: "notifications/message",
120 params: [
121 topic: nm.topic,
122 subTopic: nm.subTopic,
123 title: nm.title,
124 type: nm.type,
125 message: nm.getMessageMap() ?: [:],
126 link: nm.link,
127 showAlert: nm.isShowAlert(),
128 notificationMessageId: nm.notificationMessageId,
129 timestamp: System.currentTimeMillis()
130 ]
131 ]
132 }
133
134 /**
135 * Create a custom MCP notification and send to specific users
136 * @param topic The notification topic
137 * @param title The notification title
138 * @param message The message content
139 * @param userIds The target user IDs
140 */
141 void sendMcpNotification(String topic, String title, Map message, Set<String> userIds) {
142 if (transport == null) {
143 logger.warn("Cannot send MCP notification: transport not configured")
144 return
145 }
146
147 Map mcpNotification = [
148 jsonrpc: "2.0",
149 method: "notifications/message",
150 params: [
151 topic: topic,
152 title: title,
153 message: message,
154 timestamp: System.currentTimeMillis()
155 ]
156 ]
157
158 for (String userId in userIds) {
159 try {
160 transport.sendNotificationToUser(userId, mcpNotification)
161 logger.debug("Sent custom MCP notification to user ${userId}: ${topic}")
162 } catch (Exception e) {
163 logger.warn("Failed to send custom MCP notification to user ${userId}: ${e.message}")
164 }
165 }
166 }
167
168 /**
169 * Broadcast an MCP notification to all active sessions
170 * @param topic The notification topic
171 * @param title The notification title
172 * @param message The message content
173 */
174 void broadcastMcpNotification(String topic, String title, Map message) {
175 if (transport == null) {
176 logger.warn("Cannot broadcast MCP notification: transport not configured")
177 return
178 }
179
180 Map mcpNotification = [
181 jsonrpc: "2.0",
182 method: "notifications/message",
183 params: [
184 topic: topic,
185 title: title,
186 message: message,
187 timestamp: System.currentTimeMillis()
188 ]
189 ]
190
191 try {
192 transport.broadcastNotification(mcpNotification)
193 logger.info("Broadcast MCP notification: ${topic}")
194 } catch (Exception e) {
195 logger.error("Failed to broadcast MCP notification: ${e.message}", e)
196 }
197 }
198
199 /**
200 * Send a tools/list_changed notification to inform clients that available tools have changed
201 */
202 void notifyToolsChanged() {
203 if (transport == null) {
204 logger.warn("Cannot send tools changed notification: transport not configured")
205 return
206 }
207
208 Map notification = [
209 jsonrpc: "2.0",
210 method: "notifications/tools/list_changed",
211 params: [:]
212 ]
213
214 try {
215 transport.broadcastNotification(notification)
216 logger.info("Broadcast tools/list_changed notification")
217 } catch (Exception e) {
218 logger.error("Failed to broadcast tools changed notification: ${e.message}", e)
219 }
220 }
221
222 /**
223 * Send a resources/list_changed notification
224 */
225 void notifyResourcesChanged() {
226 if (transport == null) return
227
228 Map notification = [
229 jsonrpc: "2.0",
230 method: "notifications/resources/list_changed",
231 params: [:]
232 ]
233
234 try {
235 transport.broadcastNotification(notification)
236 logger.info("Broadcast resources/list_changed notification")
237 } catch (Exception e) {
238 logger.error("Failed to broadcast resources changed notification: ${e.message}", e)
239 }
240 }
241
242 /**
243 * Send a progress notification for a long-running operation
244 * @param sessionId The target session
245 * @param progressToken The progress token
246 * @param progress Current progress value
247 * @param total Total progress value (optional)
248 */
249 void sendProgressNotification(String sessionId, String progressToken, Number progress, Number total = null) {
250 if (transport == null) return
251
252 Map notification = [
253 jsonrpc: "2.0",
254 method: "notifications/progress",
255 params: [
256 progressToken: progressToken,
257 progress: progress,
258 total: total
259 ]
260 ]
261
262 try {
263 transport.sendNotification(sessionId, notification)
264 logger.debug("Sent progress notification to session ${sessionId}: ${progress}/${total ?: '?'}")
265 } catch (Exception e) {
266 logger.warn("Failed to send progress notification: ${e.message}")
267 }
268 }
269
270 @Override
271 void destroy() {
272 logger.info("MoquiNotificationMcpBridge destroyed")
273 this.ecf = null
274 this.transport = null
275 }
276 }
1 /*
2 * This software is in the public domain under CC0 1.0 Universal plus a
3 * Grant of Patent License.
4 *
5 * To the extent possible under law, author(s) have dedicated all
6 * copyright and related and neighboring rights to this software to the
7 * public domain worldwide. This software is distributed without any
8 * warranty.
9 *
10 * You should have received a copy of the CC0 Public Domain Dedication
11 * along with this software (see the LICENSE.md file). If not, see
12 * <http://creativecommons.org/publicdomain/zero/1.0/>.
13 */
14 package org.moqui.mcp.transport
15
16 /**
17 * Transport interface for MCP messages.
18 * Abstracts transport concerns so implementations can be swapped (SSE, WebSocket, etc.)
19 */
20 interface MoquiMcpTransport {
21
22 // Session lifecycle
23
24 /**
25 * Open a new MCP session for the given user
26 * @param sessionId The session ID (typically Visit ID)
27 * @param userId The user ID associated with this session
28 */
29 void openSession(String sessionId, String userId)
30
31 /**
32 * Close an existing MCP session
33 * @param sessionId The session ID to close
34 */
35 void closeSession(String sessionId)
36
37 /**
38 * Check if a session is currently active
39 * @param sessionId The session ID to check
40 * @return true if the session is active
41 */
42 boolean isSessionActive(String sessionId)
43
44 // Message sending
45
46 /**
47 * Send a JSON-RPC message to a specific session
48 * @param sessionId The target session ID
49 * @param message The message to send (will be JSON-serialized)
50 */
51 void sendMessage(String sessionId, Map message)
52
53 /**
54 * Send an MCP notification to a specific session
55 * @param sessionId The target session ID
56 * @param notification The notification to send
57 */
58 void sendNotification(String sessionId, Map notification)
59
60 /**
61 * Send an MCP notification to all sessions for a specific user
62 * @param userId The target user ID
63 * @param notification The notification to send
64 */
65 void sendNotificationToUser(String userId, Map notification)
66
67 // Broadcast
68
69 /**
70 * Broadcast a notification to all active sessions
71 * @param notification The notification to broadcast
72 */
73 void broadcastNotification(Map notification)
74
75 /**
76 * Get the number of active sessions
77 * @return count of active sessions
78 */
79 int getActiveSessionCount()
80
81 /**
82 * Get session IDs for a specific user
83 * @param userId The user ID
84 * @return Set of session IDs for this user
85 */
86 Set<String> getSessionsForUser(String userId)
87 }
1 /*
2 * This software is in the public domain under CC0 1.0 Universal plus a
3 * Grant of Patent License.
4 *
5 * To the extent possible under law, author(s) have dedicated all
6 * copyright and related and neighboring rights to this software to the
7 * public domain worldwide. This software is distributed without any
8 * warranty.
9 *
10 * You should have received a copy of the CC0 Public Domain Dedication
11 * along with this software (see the LICENSE.md file). If not, see
12 * <http://creativecommons.org/publicdomain/zero/1.0/>.
13 */
14 package org.moqui.mcp.transport
15
16 import groovy.json.JsonOutput
17 import org.moqui.mcp.adapter.McpSession
18 import org.moqui.mcp.adapter.McpSessionAdapter
19 import org.slf4j.Logger
20 import org.slf4j.LoggerFactory
21
22 /**
23 * SSE (Server-Sent Events) implementation of MoquiMcpTransport.
24 * Uses McpSessionAdapter for session management and provides SSE-based message delivery.
25 */
26 class SseTransport implements MoquiMcpTransport {
27 protected final static Logger logger = LoggerFactory.getLogger(SseTransport.class)
28
29 private final McpSessionAdapter sessionAdapter
30
31 // Event ID counter for SSE
32 private long eventIdCounter = 0
33
34 SseTransport(McpSessionAdapter sessionAdapter) {
35 this.sessionAdapter = sessionAdapter
36 }
37
38 @Override
39 void openSession(String sessionId, String userId) {
40 if (!sessionAdapter.hasSession(sessionId)) {
41 sessionAdapter.createSession(sessionId, userId)
42 logger.info("Opened SSE session ${sessionId} for user ${userId}")
43 } else {
44 logger.debug("Session ${sessionId} already exists")
45 }
46 }
47
48 @Override
49 void closeSession(String sessionId) {
50 def session = sessionAdapter.getSession(sessionId)
51 if (session) {
52 // Try to send close event before removing
53 if (session.hasActiveWriter()) {
54 try {
55 def closeData = [
56 type: "disconnected",
57 sessionId: sessionId,
58 timestamp: System.currentTimeMillis()
59 ]
60 sendSseEvent(session.sseWriter, "close", JsonOutput.toJson(closeData))
61 } catch (Exception e) {
62 logger.debug("Could not send close event to session ${sessionId}: ${e.message}")
63 }
64 }
65 sessionAdapter.closeSession(sessionId)
66 logger.info("Closed SSE session ${sessionId}")
67 }
68 }
69
70 @Override
71 boolean isSessionActive(String sessionId) {
72 def session = sessionAdapter.getSession(sessionId)
73 return session?.isActive() ?: false
74 }
75
76 @Override
77 void sendMessage(String sessionId, Map message) {
78 def session = sessionAdapter.getSession(sessionId)
79 if (!session) {
80 logger.warn("Cannot send message: session ${sessionId} not found")
81 return
82 }
83
84 if (!session.hasActiveWriter()) {
85 // Queue message for later delivery
86 session.notificationQueue.add(message)
87 logger.debug("Queued message for session ${sessionId} (no active writer)")
88 return
89 }
90
91 try {
92 String jsonMessage = JsonOutput.toJson(message)
93 sendSseEvent(session.sseWriter, "message", jsonMessage)
94 session.touch()
95 logger.debug("Sent message to session ${sessionId}")
96 } catch (Exception e) {
97 logger.warn("Failed to send message to session ${sessionId}: ${e.message}")
98 // Queue for later if send fails
99 session.notificationQueue.add(message)
100 }
101 }
102
103 @Override
104 void sendNotification(String sessionId, Map notification) {
105 def session = sessionAdapter.getSession(sessionId)
106 if (!session) {
107 logger.warn("Cannot send notification: session ${sessionId} not found")
108 return
109 }
110
111 // Ensure notification has proper JSON-RPC format
112 if (!notification.jsonrpc) {
113 notification = [
114 jsonrpc: "2.0",
115 method: notification.method ?: "notifications/message",
116 params: notification.params ?: notification
117 ]
118 }
119
120 if (!session.hasActiveWriter()) {
121 // Queue notification for later delivery
122 session.notificationQueue.add(notification)
123 logger.debug("Queued notification for session ${sessionId} (no active writer)")
124 return
125 }
126
127 try {
128 String jsonNotification = JsonOutput.toJson(notification)
129 sendSseEvent(session.sseWriter, "message", jsonNotification)
130 session.touch()
131 logger.debug("Sent notification to session ${sessionId}: ${notification.method}")
132 } catch (Exception e) {
133 logger.warn("Failed to send notification to session ${sessionId}: ${e.message}")
134 // Queue for later if send fails
135 session.notificationQueue.add(notification)
136 }
137 }
138
139 @Override
140 void sendNotificationToUser(String userId, Map notification) {
141 Set<String> sessionIds = sessionAdapter.getSessionsForUser(userId)
142 if (sessionIds.isEmpty()) {
143 logger.debug("No active sessions for user ${userId}")
144 return
145 }
146
147 int sentCount = 0
148 int queuedCount = 0
149
150 for (String sessionId in sessionIds) {
151 def session = sessionAdapter.getSession(sessionId)
152 if (session) {
153 if (session.hasActiveWriter()) {
154 try {
155 String jsonNotification = JsonOutput.toJson(notification)
156 sendSseEvent(session.sseWriter, "message", jsonNotification)
157 session.touch()
158 sentCount++
159 } catch (Exception e) {
160 logger.warn("Failed to send notification to session ${sessionId}: ${e.message}")
161 session.notificationQueue.add(notification)
162 queuedCount++
163 }
164 } else {
165 session.notificationQueue.add(notification)
166 queuedCount++
167 }
168 }
169 }
170
171 logger.debug("Sent notification to user ${userId}: ${sentCount} delivered, ${queuedCount} queued")
172 }
173
174 @Override
175 void broadcastNotification(Map notification) {
176 Set<String> allSessionIds = sessionAdapter.getAllSessionIds()
177 if (allSessionIds.isEmpty()) {
178 logger.debug("No active sessions for broadcast")
179 return
180 }
181
182 // Ensure notification has proper JSON-RPC format
183 if (!notification.jsonrpc) {
184 notification = [
185 jsonrpc: "2.0",
186 method: notification.method ?: "notifications/message",
187 params: notification.params ?: notification
188 ]
189 }
190
191 int sentCount = 0
192 int failedCount = 0
193
194 for (String sessionId in allSessionIds) {
195 def session = sessionAdapter.getSession(sessionId)
196 if (session?.hasActiveWriter()) {
197 try {
198 String jsonNotification = JsonOutput.toJson(notification)
199 sendSseEvent(session.sseWriter, "message", jsonNotification)
200 session.touch()
201 sentCount++
202 } catch (Exception e) {
203 logger.debug("Failed to broadcast to session ${sessionId}: ${e.message}")
204 failedCount++
205 }
206 } else {
207 // Queue for sessions without active writers
208 session?.notificationQueue?.add(notification)
209 }
210 }
211
212 logger.info("Broadcast notification: ${sentCount} delivered, ${failedCount} failed")
213 }
214
215 @Override
216 int getActiveSessionCount() {
217 return sessionAdapter.getSessionCount()
218 }
219
220 @Override
221 Set<String> getSessionsForUser(String userId) {
222 return sessionAdapter.getSessionsForUser(userId)
223 }
224
225 /**
226 * Register an SSE writer for a session
227 * @param sessionId The session ID
228 * @param writer The PrintWriter for SSE output
229 */
230 void registerSseWriter(String sessionId, PrintWriter writer) {
231 def session = sessionAdapter.getSession(sessionId)
232 if (session) {
233 session.sseWriter = writer
234 logger.debug("Registered SSE writer for session ${sessionId}")
235
236 // Deliver any queued notifications
237 deliverQueuedNotifications(sessionId)
238 } else {
239 logger.warn("Cannot register SSE writer: session ${sessionId} not found")
240 }
241 }
242
243 /**
244 * Unregister the SSE writer for a session (e.g., on disconnect)
245 * @param sessionId The session ID
246 */
247 void unregisterSseWriter(String sessionId) {
248 def session = sessionAdapter.getSession(sessionId)
249 if (session) {
250 session.sseWriter = null
251 logger.debug("Unregistered SSE writer for session ${sessionId}")
252 }
253 }
254
255 /**
256 * Deliver any queued notifications to a session
257 * @param sessionId The session ID
258 */
259 void deliverQueuedNotifications(String sessionId) {
260 def session = sessionAdapter.getSession(sessionId)
261 if (!session || !session.hasActiveWriter()) {
262 return
263 }
264
265 List<Map> queue = session.notificationQueue
266 if (queue.isEmpty()) {
267 return
268 }
269
270 // Take snapshot and clear queue
271 List<Map> toDeliver
272 synchronized (queue) {
273 toDeliver = new ArrayList<>(queue)
274 queue.clear()
275 }
276
277 int deliveredCount = 0
278 for (Map notification in toDeliver) {
279 try {
280 String jsonNotification = JsonOutput.toJson(notification)
281 sendSseEvent(session.sseWriter, "message", jsonNotification)
282 deliveredCount++
283 } catch (Exception e) {
284 logger.warn("Failed to deliver queued notification to ${sessionId}: ${e.message}")
285 // Re-queue failed notifications
286 queue.add(notification)
287 }
288 }
289
290 if (deliveredCount > 0) {
291 logger.debug("Delivered ${deliveredCount} queued notifications to session ${sessionId}")
292 }
293 }
294
295 /**
296 * Send a keep-alive ping to a session
297 * @param sessionId The session ID
298 * @return true if ping was sent successfully
299 */
300 boolean sendPing(String sessionId) {
301 def session = sessionAdapter.getSession(sessionId)
302 if (!session?.hasActiveWriter()) {
303 return false
304 }
305
306 try {
307 def pingData = [
308 type: "ping",
309 timestamp: System.currentTimeMillis(),
310 sessionId: sessionId
311 ]
312 sendSseEvent(session.sseWriter, "ping", JsonOutput.toJson(pingData))
313 session.touch()
314 return true
315 } catch (Exception e) {
316 logger.debug("Failed to send ping to session ${sessionId}: ${e.message}")
317 return false
318 }
319 }
320
321 /**
322 * Send an SSE event with proper formatting
323 * @param writer The output writer
324 * @param eventType The SSE event type
325 * @param data The data payload
326 */
327 private void sendSseEvent(PrintWriter writer, String eventType, String data) throws IOException {
328 if (writer == null || writer.checkError()) {
329 throw new IOException("Writer is closed or in error state")
330 }
331
332 long eventId = ++eventIdCounter
333 writer.write("id: ${eventId}\n")
334 writer.write("event: ${eventType}\n")
335 writer.write("data: ${data}\n\n")
336 writer.flush()
337
338 if (writer.checkError()) {
339 throw new IOException("Client disconnected during write")
340 }
341 }
342
343 /**
344 * Send an SSE event with a specific event ID
345 */
346 void sendSseEventWithId(PrintWriter writer, String eventType, String data, long eventId) throws IOException {
347 if (writer == null || writer.checkError()) {
348 throw new IOException("Writer is closed or in error state")
349 }
350
351 if (eventId >= 0) {
352 writer.write("id: ${eventId}\n")
353 }
354 writer.write("event: ${eventType}\n")
355 writer.write("data: ${data}\n\n")
356 writer.flush()
357
358 if (writer.checkError()) {
359 throw new IOException("Client disconnected during write")
360 }
361 }
362
363 /**
364 * Get the session adapter (for direct access if needed)
365 */
366 McpSessionAdapter getSessionAdapter() {
367 return sessionAdapter
368 }
369
370 /**
371 * Get transport statistics
372 */
373 Map getStatistics() {
374 def adapterStats = sessionAdapter.getStatistics()
375 int activeWriters = 0
376 int totalQueued = 0
377
378 for (String sessionId in sessionAdapter.getAllSessionIds()) {
379 def session = sessionAdapter.getSession(sessionId)
380 if (session) {
381 if (session.hasActiveWriter()) activeWriters++
382 totalQueued += session.notificationQueue.size()
383 }
384 }
385
386 return adapterStats + [
387 transportType: "SSE",
388 activeWriters: activeWriters,
389 queuedNotifications: totalQueued,
390 eventIdCounter: eventIdCounter
391 ]
392 }
393 }