Fix session lock contention and notification handling - avoid sessionId.intern()…
… deadlocks and merge notifications into response content
Showing
1 changed file
with
35 additions
and
8 deletions
| ... | @@ -80,6 +80,9 @@ class EnhancedMcpServlet extends HttpServlet { | ... | @@ -80,6 +80,9 @@ class EnhancedMcpServlet extends HttpServlet { |
| 80 | private final Map<String, Long> lastActivityUpdate = new ConcurrentHashMap<>() | 80 | private final Map<String, Long> lastActivityUpdate = new ConcurrentHashMap<>() |
| 81 | private static final long ACTIVITY_UPDATE_INTERVAL_MS = 30000 // 30 seconds | 81 | private static final long ACTIVITY_UPDATE_INTERVAL_MS = 30000 // 30 seconds |
| 82 | 82 | ||
| 83 | // Session-specific locks to avoid sessionId.intern() deadlocks | ||
| 84 | private final Map<String, Object> sessionLocks = new ConcurrentHashMap<>() | ||
| 85 | |||
| 83 | // Configuration parameters | 86 | // Configuration parameters |
| 84 | private String sseEndpoint = "/sse" | 87 | private String sseEndpoint = "/sse" |
| 85 | private String messageEndpoint = "/message" | 88 | private String messageEndpoint = "/message" |
| ... | @@ -785,8 +788,9 @@ try { | ... | @@ -785,8 +788,9 @@ try { |
| 785 | // Process MCP method using Moqui services with session ID if available | 788 | // Process MCP method using Moqui services with session ID if available |
| 786 | def result = processMcpMethod(rpcRequest.method, rpcRequest.params, ec, sessionId, visit ?: [:]) | 789 | def result = processMcpMethod(rpcRequest.method, rpcRequest.params, ec, sessionId, visit ?: [:]) |
| 787 | 790 | ||
| 788 | // Update session activity throttled for actual user actions (not pings) | 791 | // Update session activity throttled for actual user actions (not pings or tools/list) |
| 789 | if (sessionId && !"ping".equals(rpcRequest.method)) { | 792 | // tools/list is read-only discovery and shouldn't update session activity to prevent lock contention |
| 793 | if (sessionId && !"ping".equals(rpcRequest.method) && !"tools/list".equals(rpcRequest.method)) { | ||
| 790 | updateSessionActivityThrottled(sessionId) | 794 | updateSessionActivityThrottled(sessionId) |
| 791 | } | 795 | } |
| 792 | 796 | ||
| ... | @@ -821,20 +825,36 @@ try { | ... | @@ -821,20 +825,36 @@ try { |
| 821 | result: actualResult | 825 | result: actualResult |
| 822 | ] | 826 | ] |
| 823 | 827 | ||
| 824 | // Check for pending server notifications and include them in response | 828 | // Standard MCP flow: include notifications in response content array |
| 825 | if (sessionId && notificationQueues.containsKey(sessionId)) { | 829 | if (sessionId && notificationQueues.containsKey(sessionId)) { |
| 826 | def pendingNotifications = notificationQueues.get(sessionId) | 830 | def pendingNotifications = notificationQueues.get(sessionId) |
| 827 | if (pendingNotifications && !pendingNotifications.isEmpty()) { | 831 | if (pendingNotifications && !pendingNotifications.isEmpty()) { |
| 828 | rpcResponse.notifications = pendingNotifications | 832 | logger.info("Adding ${pendingNotifications.size()} pending notifications to response content for session ${sessionId}") |
| 833 | |||
| 834 | // Convert notifications to content items and add to result | ||
| 835 | def notificationContent = [] | ||
| 836 | for (notification in pendingNotifications) { | ||
| 837 | notificationContent << [ | ||
| 838 | type: "notification", | ||
| 839 | text: JsonOutput.toJson(notification.params ?: notification), | ||
| 840 | method: notification.method | ||
| 841 | ] | ||
| 842 | } | ||
| 843 | |||
| 844 | // Merge notification content with existing result content | ||
| 845 | def existingContent = actualResult?.content ?: [] | ||
| 846 | actualResult.content = existingContent + notificationContent | ||
| 847 | |||
| 829 | // Clear delivered notifications | 848 | // Clear delivered notifications |
| 830 | notificationQueues.put(sessionId, []) | 849 | notificationQueues.put(sessionId, []) |
| 831 | logger.info("Delivered ${pendingNotifications.size()} pending notifications to session ${sessionId}") | 850 | logger.info("Merged ${pendingNotifications.size()} notifications into response for session ${sessionId}") |
| 832 | } | 851 | } |
| 833 | } | 852 | } |
| 834 | 853 | ||
| 835 | response.setContentType("application/json") | 854 | response.setContentType("application/json") |
| 836 | response.setCharacterEncoding("UTF-8") | 855 | response.setCharacterEncoding("UTF-8") |
| 837 | 856 | ||
| 857 | // Send the main response | ||
| 838 | response.writer.write(JsonOutput.toJson(rpcResponse)) | 858 | response.writer.write(JsonOutput.toJson(rpcResponse)) |
| 839 | } | 859 | } |
| 840 | 860 | ||
| ... | @@ -1111,7 +1131,13 @@ try { | ... | @@ -1111,7 +1131,13 @@ try { |
| 1111 | def writer = activeConnections.get(sessionId) | 1131 | def writer = activeConnections.get(sessionId) |
| 1112 | if (writer && !writer.checkError()) { | 1132 | if (writer && !writer.checkError()) { |
| 1113 | try { | 1133 | try { |
| 1114 | sendSseEvent(writer, "notification", JsonOutput.toJson(notification), System.currentTimeMillis()) | 1134 | // Send as proper JSON-RPC notification via SSE |
| 1135 | def notificationMessage = [ | ||
| 1136 | jsonrpc: "2.0", | ||
| 1137 | method: notification.method ?: "notifications/message", | ||
| 1138 | params: notification.params ?: notification | ||
| 1139 | ] | ||
| 1140 | sendSseEvent(writer, "notification", JsonOutput.toJson(notificationMessage), System.currentTimeMillis()) | ||
| 1115 | logger.info("Sent notification via SSE to session ${sessionId}") | 1141 | logger.info("Sent notification via SSE to session ${sessionId}") |
| 1116 | } catch (Exception e) { | 1142 | } catch (Exception e) { |
| 1117 | logger.warn("Failed to send notification via SSE to session ${sessionId}: ${e.message}") | 1143 | logger.warn("Failed to send notification via SSE to session ${sessionId}: ${e.message}") |
| ... | @@ -1157,8 +1183,9 @@ try { | ... | @@ -1157,8 +1183,9 @@ try { |
| 1157 | 1183 | ||
| 1158 | // Only update if 30 seconds have passed since last update | 1184 | // Only update if 30 seconds have passed since last update |
| 1159 | if (lastUpdate == null || (now - lastUpdate) > ACTIVITY_UPDATE_INTERVAL_MS) { | 1185 | if (lastUpdate == null || (now - lastUpdate) > ACTIVITY_UPDATE_INTERVAL_MS) { |
| 1160 | // Synchronize per session to prevent concurrent updates | 1186 | // Use session-specific lock to avoid sessionId.intern() deadlocks |
| 1161 | synchronized (sessionId.intern()) { | 1187 | Object sessionLock = sessionLocks.computeIfAbsent(sessionId, { new Object() }) |
| 1188 | synchronized (sessionLock) { | ||
| 1162 | // Double-check after acquiring lock | 1189 | // Double-check after acquiring lock |
| 1163 | lastUpdate = lastActivityUpdate.get(sessionId) | 1190 | lastUpdate = lastActivityUpdate.get(sessionId) |
| 1164 | if (lastUpdate == null || (now - lastUpdate) > ACTIVITY_UPDATE_INTERVAL_MS) { | 1191 | if (lastUpdate == null || (now - lastUpdate) > ACTIVITY_UPDATE_INTERVAL_MS) { | ... | ... |
-
Please register or sign in to post a comment