fdb76042 by Ean Schuessler

Implement MCP notification system for server-to-client communication

- Add notification queue mechanism in EnhancedMcpServlet
- Register servlet instance for service access
- Implement queueNotification() method for async notifications
- Add notification delivery in JSON-RPC responses
- Include notifications in both tools/list and tools/call services
- Add execution timing and success metrics
- Handle notification errors gracefully
- Support both polling and SSE delivery methods

This completes the MCP server notification system allowing real-time
communication about tool execution status and metrics.
1 parent 0f3c1f68
...@@ -180,7 +180,10 @@ ...@@ -180,7 +180,10 @@
180 } 180 }
181 } 181 }
182 } 182 }
183 */ 183 */
184
185 // Start timing for execution metrics
186 def startTime = System.currentTimeMillis()
184 187
185 // Store original user context before switching to ADMIN 188 // Store original user context before switching to ADMIN
186 def originalUsername = ec.user.username 189 def originalUsername = ec.user.username
...@@ -358,13 +361,34 @@ ...@@ -358,13 +361,34 @@
358 result.nextCursor = String.valueOf(endIndex) 361 result.nextCursor = String.valueOf(endIndex)
359 } 362 }
360 363
361 ec.logger.info("MCP ToolsList: Returning ${availableTools.size()} tools for user ${originalUsername}") 364 ec.logger.info("MCP ToolsList: Returning ${availableTools.size()} tools for user ${originalUsername}")
362 365
363 } finally { 366 } finally {
364 // Always restore original user context 367 // Always restore original user context
365 if (adminUserInfo != null) { 368 if (adminUserInfo != null) {
366 ec.user.popUser() 369 ec.user.popUser()
367 } 370 }
371
372 // Send a simple notification about tool execution
373 try {
374 def servlet = ec.web.getServletContext().getAttribute("enhancedMcpServlet")
375 ec.logger.info("TOOLS CALL: Got servlet reference: ${servlet != null}, sessionId: ${sessionId}")
376 if (servlet && sessionId) {
377 def notification = [
378 method: "notifications/tool_execution",
379 params: [
380 toolName: "tools/list",
381 executionTime: (System.currentTimeMillis() - startTime) / 1000.0,
382 success: !result?.result?.isError,
383 timestamp: System.currentTimeMillis()
384 ]
385 ]
386 servlet.queueNotification(sessionId, notification)
387 ec.logger.info("Queued tool execution notification for session ${sessionId}")
388 }
389 } catch (Exception e) {
390 ec.logger.warn("Failed to send tool execution notification: ${e.message}")
391 }
368 } 392 }
369 ]]></script> 393 ]]></script>
370 </actions> 394 </actions>
...@@ -511,6 +535,27 @@ ...@@ -511,6 +535,27 @@
511 if (adminUserInfo != null) { 535 if (adminUserInfo != null) {
512 ec.user.popUser() 536 ec.user.popUser()
513 } 537 }
538
539 // Send a simple notification about tool execution
540 try {
541 def servlet = ec.web.getServletContext().getAttribute("enhancedMcpServlet")
542 ec.logger.info("TOOLS CALL: Got servlet reference: ${servlet != null}, sessionId: ${sessionId}")
543 if (servlet && sessionId) {
544 def notification = [
545 method: "notifications/tool_execution",
546 params: [
547 toolName: name,
548 executionTime: (System.currentTimeMillis() - startTime) / 1000.0,
549 success: !result?.result?.isError,
550 timestamp: System.currentTimeMillis()
551 ]
552 ]
553 servlet.queueNotification(sessionId, notification)
554 ec.logger.info("Queued tool execution notification for session ${sessionId}")
555 }
556 } catch (Exception e) {
557 ec.logger.warn("Failed to send tool execution notification: ${e.message}")
558 }
514 } 559 }
515 ]]></script> 560 ]]></script>
516 </actions> 561 </actions>
......
...@@ -56,6 +56,9 @@ class EnhancedMcpServlet extends HttpServlet { ...@@ -56,6 +56,9 @@ class EnhancedMcpServlet extends HttpServlet {
56 // No need for separate session manager - Visit entity handles persistence 56 // No need for separate session manager - Visit entity handles persistence
57 private final Map<String, Integer> sessionStates = new ConcurrentHashMap<>() 57 private final Map<String, Integer> sessionStates = new ConcurrentHashMap<>()
58 58
59 // Notification queue for server-initiated notifications (for non-SSE clients)
60 private final Map<String, List<Map>> notificationQueues = new ConcurrentHashMap<>()
61
59 // Configuration parameters 62 // Configuration parameters
60 private String sseEndpoint = "/sse" 63 private String sseEndpoint = "/sse"
61 private String messageEndpoint = "/message" 64 private String messageEndpoint = "/message"
...@@ -75,9 +78,13 @@ class EnhancedMcpServlet extends HttpServlet { ...@@ -75,9 +78,13 @@ class EnhancedMcpServlet extends HttpServlet {
75 String webappName = config.getInitParameter("moqui-name") ?: 78 String webappName = config.getInitParameter("moqui-name") ?:
76 config.getServletContext().getInitParameter("moqui-name") 79 config.getServletContext().getInitParameter("moqui-name")
77 80
81 // Register servlet instance in context for service access
82 config.getServletContext().setAttribute("enhancedMcpServlet", this)
83
78 logger.info("EnhancedMcpServlet initialized for webapp ${webappName}") 84 logger.info("EnhancedMcpServlet initialized for webapp ${webappName}")
79 logger.info("SSE endpoint: ${sseEndpoint}, Message endpoint: ${messageEndpoint}") 85 logger.info("SSE endpoint: ${sseEndpoint}, Message endpoint: ${messageEndpoint}")
80 logger.info("Keep-alive interval: ${keepAliveIntervalSeconds}s, Max connections: ${maxConnections}") 86 logger.info("Keep-alive interval: ${keepAliveIntervalSeconds}s, Max connections: ${maxConnections}")
87 logger.info("Servlet instance registered in context as 'enhancedMcpServlet'")
81 } 88 }
82 89
83 @Override 90 @Override
...@@ -828,6 +835,17 @@ logger.info("Handling Enhanced SSE connection from ${request.remoteAddr}") ...@@ -828,6 +835,17 @@ logger.info("Handling Enhanced SSE connection from ${request.remoteAddr}")
828 result: actualResult 835 result: actualResult
829 ] 836 ]
830 837
838 // Check for pending server notifications and include them in response
839 if (sessionId && notificationQueues.containsKey(sessionId)) {
840 def pendingNotifications = notificationQueues.get(sessionId)
841 if (pendingNotifications && !pendingNotifications.isEmpty()) {
842 rpcResponse.notifications = pendingNotifications
843 // Clear delivered notifications
844 notificationQueues.put(sessionId, [])
845 logger.info("Delivered ${pendingNotifications.size()} pending notifications to session ${sessionId}")
846 }
847 }
848
831 response.setContentType("application/json") 849 response.setContentType("application/json")
832 response.setCharacterEncoding("UTF-8") 850 response.setCharacterEncoding("UTF-8")
833 851
...@@ -872,6 +890,10 @@ logger.info("Handling Enhanced SSE connection from ${request.remoteAddr}") ...@@ -872,6 +890,10 @@ logger.info("Handling Enhanced SSE connection from ${request.remoteAddr}")
872 params.actualUserId = ec.user.userId 890 params.actualUserId = ec.user.userId
873 logger.info("Initialize - actualUserId: ${params.actualUserId}, sessionId: ${params.sessionId}") 891 logger.info("Initialize - actualUserId: ${params.actualUserId}, sessionId: ${params.sessionId}")
874 def serviceResult = callMcpService("mcp#Initialize", params, ec) 892 def serviceResult = callMcpService("mcp#Initialize", params, ec)
893 // Add sessionId to the response for mcp.sh compatibility
894 if (serviceResult && serviceResult.result) {
895 serviceResult.result.sessionId = params.sessionId
896 }
875 return serviceResult 897 return serviceResult
876 case "ping": 898 case "ping":
877 // Simple ping for testing - bypass service for now 899 // Simple ping for testing - bypass service for now
...@@ -983,6 +1005,28 @@ logger.info("Handling Enhanced SSE connection from ${request.remoteAddr}") ...@@ -983,6 +1005,28 @@ logger.info("Handling Enhanced SSE connection from ${request.remoteAddr}")
983 return false 1005 return false
984 } 1006 }
985 1007
1008 /**
1009 * Queue a server notification for delivery to client
1010 */
1011 void queueNotification(String sessionId, Map notification) {
1012 if (!sessionId) return
1013
1014 def queue = notificationQueues.computeIfAbsent(sessionId) { [] }
1015 queue << notification
1016 logger.info("Queued notification for session ${sessionId}: ${notification}")
1017
1018 // Also try to send via SSE if active connection exists
1019 def writer = activeConnections.get(sessionId)
1020 if (writer && !writer.checkError()) {
1021 try {
1022 sendSseEvent(writer, "notification", JsonOutput.toJson(notification), System.currentTimeMillis())
1023 logger.info("Sent notification via SSE to session ${sessionId}")
1024 } catch (Exception e) {
1025 logger.warn("Failed to send notification via SSE to session ${sessionId}: ${e.message}")
1026 }
1027 }
1028 }
1029
986 @Override 1030 @Override
987 void destroy() { 1031 void destroy() {
988 logger.info("Destroying EnhancedMcpServlet") 1032 logger.info("Destroying EnhancedMcpServlet")
......