fc1526cc by Ean Schuessler

Refactor Enhanced MCP servlet with dedicated session management and improved transport layer

- Extract session management to dedicated McpSessionManager class
- Add VisitBasedMcpSession for better integration with Moqui visit tracking
- Implement MoquiMcpTransport for standardized MCP message handling
- Improve SSE connection lifecycle management and graceful shutdown
- Add session statistics and broadcast capabilities for monitoring
1 parent e41ccca9
......@@ -39,9 +39,8 @@ class EnhancedMcpServlet extends HttpServlet {
private JsonSlurper jsonSlurper = new JsonSlurper()
// Session management for SSE connections
private final Map<String, McpSession> sessions = new ConcurrentHashMap<>()
private final AtomicBoolean isClosing = new AtomicBoolean(false)
// Session management using dedicated session manager
private final McpSessionManager sessionManager = new McpSessionManager()
@Override
void init(ServletConfig config) throws ServletException {
......@@ -151,7 +150,7 @@ class EnhancedMcpServlet extends HttpServlet {
private void handleSseConnection(HttpServletRequest request, HttpServletResponse response, ExecutionContextImpl ec)
throws IOException {
if (isClosing.get()) {
if (sessionManager.isShuttingDown()) {
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Server is shutting down")
return
}
......@@ -166,10 +165,11 @@ class EnhancedMcpServlet extends HttpServlet {
response.setHeader("Access-Control-Allow-Origin", "*")
String sessionId = UUID.randomUUID().toString()
String visitId = ec.web?.visitId
// Create session transport
McpSession session = new McpSession(sessionId, response.writer)
sessions.put(sessionId, session)
// Create Visit-based session transport
VisitBasedMcpSession session = new VisitBasedMcpSession(sessionId, visitId, response.writer, ec)
sessionManager.registerSession(session)
try {
// Send initial connection event with endpoint info
......@@ -185,15 +185,16 @@ class EnhancedMcpServlet extends HttpServlet {
// Keep connection alive with periodic pings
int pingCount = 0
while (!response.isCommitted() && !isClosing.get() && pingCount < 60) { // 5 minutes max
while (!response.isCommitted() && !sessionManager.isShuttingDown() && pingCount < 60) { // 5 minutes max
Thread.sleep(5000) // Wait 5 seconds
if (!response.isCommitted() && !isClosing.get()) {
sendSseEvent(response.writer, "ping", groovy.json.JsonOutput.toJson([
if (!response.isCommitted() && !sessionManager.isShuttingDown()) {
def pingMessage = new McpSchema.JSONRPCMessage([
type: "ping",
count: pingCount,
timestamp: System.currentTimeMillis()
]))
], null)
session.sendMessage(pingMessage)
pingCount++
}
}
......@@ -202,12 +203,13 @@ class EnhancedMcpServlet extends HttpServlet {
logger.warn("Enhanced SSE connection interrupted: ${e.message}")
} finally {
// Clean up session
sessions.remove(sessionId)
sessionManager.unregisterSession(sessionId)
try {
sendSseEvent(response.writer, "close", groovy.json.JsonOutput.toJson([
def closeMessage = new McpSchema.JSONRPCMessage([
type: "disconnected",
timestamp: System.currentTimeMillis()
]))
], null)
session.sendMessage(closeMessage)
} catch (Exception e) {
// Ignore errors during cleanup
}
......@@ -217,31 +219,20 @@ class EnhancedMcpServlet extends HttpServlet {
private void handleMessage(HttpServletRequest request, HttpServletResponse response, ExecutionContextImpl ec)
throws IOException {
if (isClosing.get()) {
if (sessionManager.isShuttingDown()) {
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Server is shutting down")
return
}
// Get session ID from request parameter
String sessionId = request.getParameter("sessionId")
if (sessionId == null) {
response.setContentType("application/json")
response.setCharacterEncoding("UTF-8")
response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
response.writer.write(groovy.json.JsonOutput.toJson([
error: "Session ID missing in message endpoint"
]))
return
}
// Get session from sessions map
McpSession session = sessions.get(sessionId)
// Get session from session manager
VisitBasedMcpSession session = sessionManager.getSession(sessionId)
if (session == null) {
response.setContentType("application/json")
response.setCharacterEncoding("UTF-8")
response.setStatus(HttpServletResponse.SC_NOT_FOUND)
response.writer.write(groovy.json.JsonOutput.toJson([
error: "Session not found: " + sessionId
error: "Session not found: " + sessionId,
activeSessions: sessionManager.getActiveSessionCount()
]))
return
}
......@@ -261,11 +252,9 @@ class EnhancedMcpServlet extends HttpServlet {
// Process the method
def result = processMcpMethod(rpcRequest.method, rpcRequest.params, ec)
// Send response via SSE to the specific session
sendSseEvent(session.writer, "response", groovy.json.JsonOutput.toJson([
id: rpcRequest.id,
result: result
]))
// Send response via MCP transport to the specific session
def responseMessage = new McpSchema.JSONRPCMessage(result, rpcRequest.id)
session.sendMessage(responseMessage)
response.setStatus(HttpServletResponse.SC_OK)
......@@ -444,14 +433,11 @@ class EnhancedMcpServlet extends HttpServlet {
@Override
void destroy() {
logger.info("Destroying EnhancedMcpServlet")
isClosing.set(true)
// Close all active sessions
sessions.values().each { session ->
try {
session.close()
} catch (Exception e) {
logger.warn("Error closing session: ${e.message}")
// Gracefully shutdown session manager
sessionManager.shutdownGracefully()
super.destroy()
}
}
sessions.clear()
......@@ -460,21 +446,16 @@ class EnhancedMcpServlet extends HttpServlet {
}
/**
* Simple session class for managing MCP SSE connections
* Broadcast message to all active sessions
*/
static class McpSession {
String sessionId
PrintWriter writer
Date createdAt
McpSession(String sessionId, PrintWriter writer) {
this.sessionId = sessionId
this.writer = writer
this.createdAt = new Date()
void broadcastToAllSessions(McpSchema.JSONRPCMessage message) {
sessionManager.broadcast(message)
}
void close() {
// Session cleanup logic
}
/**
* Get session statistics for monitoring
*/
Map getSessionStatistics() {
return sessionManager.getSessionStatistics()
}
}
\ No newline at end of file
......
/*
* This software is in the public domain under CC0 1.0 Universal plus a
* Grant of Patent License.
*
* To the extent possible under law, author(s) have dedicated all
* copyright and related and neighboring rights to this software to the
* public domain worldwide. This software is distributed without any
* warranty.
*
* You should have received a copy of the CC0 Public Domain Dedication
* along with this software (see the LICENSE.md file). If not, see
* <http://creativecommons.org/publicdomain/zero/1.0/>.
*/
package org.moqui.mcp
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
/**
* MCP Session Manager with SDK-style capabilities
* Provides centralized session management, broadcasting, and graceful shutdown
*/
class McpSessionManager {
protected final static Logger logger = LoggerFactory.getLogger(McpSessionManager.class)
private final Map<String, VisitBasedMcpSession> sessions = new ConcurrentHashMap<>()
private final AtomicBoolean isShuttingDown = new AtomicBoolean(false)
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2)
// Session cleanup and monitoring
private final long sessionTimeoutMs = 30 * 60 * 1000 // 30 minutes
private final long cleanupIntervalMs = 5 * 60 * 1000 // 5 minutes
McpSessionManager() {
// Start periodic cleanup task
scheduler.scheduleAtFixedRate(this::cleanupInactiveSessions,
cleanupIntervalMs, cleanupIntervalMs, TimeUnit.MILLISECONDS)
logger.info("MCP Session Manager initialized")
}
/**
* Register a new session
*/
void registerSession(VisitBasedMcpSession session) {
if (isShuttingDown.get()) {
logger.warn("Rejecting session registration during shutdown: ${session.sessionId}")
return
}
sessions.put(session.sessionId, session)
logger.info("Registered MCP session ${session.sessionId} (total: ${sessions.size()})")
// Send welcome message to new session
def welcomeMessage = new McpSchema.JSONRPCMessage([
type: "welcome",
sessionId: session.sessionId,
totalSessions: sessions.size(),
timestamp: System.currentTimeMillis()
], null)
session.sendMessage(welcomeMessage)
}
/**
* Unregister a session
*/
void unregisterSession(String sessionId) {
def session = sessions.remove(sessionId)
if (session) {
logger.info("Unregistered MCP session ${sessionId} (remaining: ${sessions.size()})")
}
}
/**
* Get session by ID
*/
VisitBasedMcpSession getSession(String sessionId) {
return sessions.get(sessionId)
}
/**
* Broadcast message to all active sessions
*/
void broadcast(McpSchema.JSONRPCMessage message) {
if (isShuttingDown.get()) {
logger.warn("Rejecting broadcast during shutdown")
return
}
def inactiveSessions = []
def activeCount = 0
sessions.values().each { session ->
try {
if (session.isActive()) {
session.sendMessage(message)
activeCount++
} else {
inactiveSessions << session.sessionId
}
} catch (Exception e) {
logger.warn("Error broadcasting to session ${session.sessionId}: ${e.message}")
inactiveSessions << session.sessionId
}
}
// Clean up inactive sessions
inactiveSessions.each { sessionId ->
unregisterSession(sessionId)
}
logger.info("Broadcast message to ${activeCount} active sessions (removed ${inactiveSessions.size()} inactive)")
}
/**
* Send message to specific session
*/
boolean sendToSession(String sessionId, McpSchema.JSONRPCMessage message) {
def session = sessions.get(sessionId)
if (!session) {
return false
}
try {
if (session.isActive()) {
session.sendMessage(message)
return true
} else {
unregisterSession(sessionId)
return false
}
} catch (Exception e) {
logger.warn("Error sending to session ${sessionId}: ${e.message}")
unregisterSession(sessionId)
return false
}
}
/**
* Get session statistics
*/
Map getSessionStatistics() {
def stats = [
totalSessions: sessions.size(),
activeSessions: 0,
closingSessions: 0,
isShuttingDown: isShuttingDown.get(),
uptime: System.currentTimeMillis() - (this.@startTime ?: System.currentTimeMillis()),
sessions: []
]
sessions.values().each { session ->
def sessionStats = session.getSessionStats()
stats.sessions << sessionStats
if (sessionStats.active) {
stats.activeSessions++
}
if (sessionStats.closing) {
stats.closingSessions++
}
}
return stats
}
/**
* Initiate graceful shutdown
*/
void shutdownGracefully() {
if (!isShuttingDown.compareAndSet(false, true)) {
return // Already shutting down
}
logger.info("Initiating graceful MCP session manager shutdown")
// Send shutdown notification to all sessions
def shutdownMessage = new McpSchema.JSONRPCMessage([
type: "server_shutdown",
message: "Server is shutting down gracefully",
timestamp: System.currentTimeMillis()
], null)
broadcast(shutdownMessage)
// Give sessions time to receive shutdown message
scheduler.schedule({
forceShutdown()
}, 5, TimeUnit.SECONDS)
}
/**
* Force immediate shutdown
*/
void forceShutdown() {
logger.info("Force shutting down MCP session manager")
// Close all sessions
sessions.values().each { session ->
try {
session.close()
} catch (Exception e) {
logger.warn("Error closing session ${session.sessionId}: ${e.message}")
}
}
sessions.clear()
// Shutdown scheduler
scheduler.shutdown()
try {
if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
scheduler.shutdownNow()
}
} catch (InterruptedException e) {
scheduler.shutdownNow()
Thread.currentThread().interrupt()
}
logger.info("MCP session manager shutdown complete")
}
/**
* Clean up inactive sessions
*/
private void cleanupInactiveSessions() {
if (isShuttingDown.get()) {
return
}
def now = System.currentTimeMillis()
def inactiveSessions = []
sessions.values().each { session ->
def sessionStats = session.getSessionStats()
def inactiveTime = now - (sessionStats.lastActivity ?: sessionStats.createdAt.time)
if (!session.isActive() || inactiveTime > sessionTimeoutMs) {
inactiveSessions << session.sessionId
}
}
inactiveSessions.each { sessionId ->
def session = sessions.get(sessionId)
if (session) {
try {
session.closeGracefully()
} catch (Exception e) {
logger.warn("Error during cleanup of session ${sessionId}: ${e.message}")
}
unregisterSession(sessionId)
}
}
if (inactiveSessions.size() > 0) {
logger.info("Cleaned up ${inactiveSessions.size()} inactive MCP sessions")
}
}
/**
* Get active session count
*/
int getActiveSessionCount() {
return (int) sessions.values().count { it.isActive() }
}
/**
* Check if manager is shutting down
*/
boolean isShuttingDown() {
return isShuttingDown.get()
}
}
\ No newline at end of file
/*
* This software is in the public domain under CC0 1.0 Universal plus a
* Grant of Patent License.
*
* To the extent possible under law, author(s) have dedicated all
* copyright and related and neighboring rights to this software to the
* public domain worldwide. This software is distributed without any
* warranty.
*
* You should have received a copy of the CC0 Public Domain Dedication
* along with this software (see the LICENSE.md file). If not, see
* <http://creativecommons.org/publicdomain/zero/1.0/>.
*/
package org.moqui.mcp
import groovy.json.JsonBuilder
/**
* MCP Transport interface compatible with Servlet 4.0 and Moqui Visit system
* Provides SDK-style session management capabilities while maintaining compatibility
*/
interface MoquiMcpTransport {
/**
* Send a JSON-RPC message through this transport
* @param message The MCP JSON-RPC message to send
*/
void sendMessage(McpSchema.JSONRPCMessage message)
/**
* Close the transport gracefully, allowing in-flight messages to complete
*/
void closeGracefully()
/**
* Force close the transport immediately
*/
void close()
/**
* Check if the transport is still active
* @return true if transport is active, false otherwise
*/
boolean isActive()
/**
* Get the session ID associated with this transport
* @return the MCP session ID
*/
String getSessionId()
/**
* Get the associated Moqui Visit ID
* @return the Visit ID if available, null otherwise
*/
String getVisitId()
}
/**
* Simple implementation of MCP JSON-RPC message schema
* Compatible with MCP protocol specifications
*/
class McpSchema {
static class JSONRPCMessage {
String jsonrpc = "2.0"
Object id
String method
Map params
Object result
Map error
JSONRPCMessage(String method, Map params = null, Object id = null) {
this.method = method
this.params = params
this.id = id
}
JSONRPCMessage(Object result, Object id) {
this.result = result
this.id = id
}
JSONRPCMessage(Map error, Object id) {
this.error = error
this.id = id
}
String toJson() {
return new JsonBuilder(this).toString()
}
static JSONRPCMessage fromJson(String json) {
// Simple JSON parsing - in production would use proper JSON parser
def slurper = new groovy.json.JsonSlurper()
def data = slurper.parseText(json)
if (data.error) {
return new JSONRPCMessage(data.error, data.id)
} else if (data.result != null) {
return new JSONRPCMessage(data.result, data.id)
} else {
return new JSONRPCMessage(data.method, data.params, data.id)
}
}
}
}
\ No newline at end of file
/*
* This software is in the public domain under CC0 1.0 Universal plus a
* Grant of Patent License.
*
* To the extent possible under law, author(s) have dedicated all
* copyright and related and neighboring rights to this software to the
* public domain worldwide. This software is distributed without any
* warranty.
*
* You should have received a copy of the CC0 Public Domain Dedication
* along with this software (see the LICENSE.md file). If not, see
* <http://creativecommons.org/publicdomain/zero/1.0/>.
*/
package org.moqui.mcp
import org.moqui.context.ExecutionContext
import org.moqui.impl.context.ExecutionContextImpl
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong
/**
* MCP Session implementation that integrates with Moqui's Visit system
* Provides SDK-style session management while leveraging Moqui's built-in tracking
*/
class VisitBasedMcpSession implements MoquiMcpTransport {
protected final static Logger logger = LoggerFactory.getLogger(VisitBasedMcpSession.class)
private final String sessionId
private final String visitId
private final PrintWriter writer
private final ExecutionContextImpl ec
private final AtomicBoolean active = new AtomicBoolean(true)
private final AtomicBoolean closing = new AtomicBoolean(false)
private final AtomicLong messageCount = new AtomicLong(0)
private final Date createdAt
// MCP session metadata stored in Visit context
private final Map<String, Object> sessionMetadata = new ConcurrentHashMap<>()
VisitBasedMcpSession(String sessionId, String visitId, PrintWriter writer, ExecutionContextImpl ec) {
this.sessionId = sessionId
this.visitId = visitId
this.writer = writer
this.ec = ec
this.createdAt = new Date()
// Initialize session metadata in Visit context
initializeSessionMetadata()
}
private void initializeSessionMetadata() {
try {
// Store MCP session info in Visit context for persistence
if (visitId && ec) {
def visit = ec.entity.find("moqui.server.Visit").condition("visitId", visitId).one()
if (visit) {
// Store MCP session metadata as JSON in Visit's context or a separate field
sessionMetadata.put("mcpSessionId", sessionId)
sessionMetadata.put("mcpCreatedAt", createdAt.time)
sessionMetadata.put("mcpProtocolVersion", "2025-06-18")
sessionMetadata.put("mcpTransportType", "SSE")
logger.info("MCP Session ${sessionId} initialized with Visit ${visitId}")
}
}
} catch (Exception e) {
logger.warn("Failed to initialize session metadata for Visit ${visitId}: ${e.message}")
}
}
@Override
void sendMessage(McpSchema.JSONRPCMessage message) {
if (!active.get() || closing.get()) {
logger.warn("Attempted to send message on inactive or closing session ${sessionId}")
return
}
try {
String jsonMessage = message.toJson()
sendSseEvent("message", jsonMessage)
messageCount.incrementAndGet()
// Update session activity in Visit
updateSessionActivity()
} catch (Exception e) {
logger.error("Failed to send message on session ${sessionId}: ${e.message}")
if (e.message?.contains("disconnected") || e.message?.contains("Client disconnected")) {
close()
}
}
}
@Override
void closeGracefully() {
if (!active.compareAndSet(true, false)) {
return // Already closed
}
closing.set(true)
logger.info("Gracefully closing MCP session ${sessionId}")
try {
// Send graceful shutdown notification
def shutdownMessage = new McpSchema.JSONRPCMessage([
type: "shutdown",
sessionId: sessionId,
timestamp: System.currentTimeMillis()
], null)
sendMessage(shutdownMessage)
// Give some time for message to be sent
Thread.sleep(100)
} catch (Exception e) {
logger.warn("Error during graceful shutdown of session ${sessionId}: ${e.message}")
} finally {
close()
}
}
@Override
void close() {
if (!active.compareAndSet(true, false)) {
return // Already closed
}
logger.info("Closing MCP session ${sessionId} (messages sent: ${messageCount.get()})")
try {
// Update Visit with session end info
updateSessionEnd()
// Send final close event if writer is still available
if (writer && !writer.checkError()) {
sendSseEvent("close", groovy.json.JsonOutput.toJson([
type: "disconnected",
sessionId: sessionId,
messageCount: messageCount.get(),
timestamp: System.currentTimeMillis()
]))
}
} catch (Exception e) {
logger.warn("Error during session close ${sessionId}: ${e.message}")
}
}
@Override
boolean isActive() {
return active.get() && !closing.get() && writer && !writer.checkError()
}
@Override
String getSessionId() {
return sessionId
}
@Override
String getVisitId() {
return visitId
}
/**
* Get session statistics
*/
Map getSessionStats() {
return [
sessionId: sessionId,
visitId: visitId,
createdAt: createdAt,
messageCount: messageCount.get(),
active: active.get(),
closing: closing.get(),
duration: System.currentTimeMillis() - createdAt.time
]
}
/**
* Send SSE event with proper formatting
*/
private void sendSseEvent(String eventType, String data) throws IOException {
if (!writer || writer.checkError()) {
throw new IOException("Writer is closed or client disconnected")
}
writer.write("event: " + eventType + "\n")
writer.write("data: " + data + "\n\n")
writer.flush()
if (writer.checkError()) {
throw new IOException("Client disconnected during write")
}
}
/**
* Update session activity in Visit record
*/
private void updateSessionActivity() {
try {
if (visitId && ec) {
// Update Visit with latest activity
ec.service.sync().name("update", "moqui.server.Visit")
.parameters([
visitId: visitId,
thruDate: ec.user.getNowTimestamp()
])
.call()
// Could also update a custom field for MCP-specific activity
sessionMetadata.put("mcpLastActivity", System.currentTimeMillis())
sessionMetadata.put("mcpMessageCount", messageCount.get())
}
} catch (Exception e) {
logger.debug("Failed to update session activity: ${e.message}")
}
}
/**
* Update Visit record with session end information
*/
private void updateSessionEnd() {
try {
if (visitId && ec) {
// Update Visit with session end info
ec.service.sync().name("update", "moqui.server.Visit")
.parameters([
visitId: visitId,
thruDate: ec.user.getNowTimestamp()
])
.call()
// Store final session metadata
sessionMetadata.put("mcpEndedAt", System.currentTimeMillis())
sessionMetadata.put("mcpFinalMessageCount", messageCount.get())
logger.info("Updated Visit ${visitId} with MCP session end info")
}
} catch (Exception e) {
logger.warn("Failed to update session end for Visit ${visitId}: ${e.message}")
}
}
/**
* Get session metadata
*/
Map getSessionMetadata() {
return new HashMap<>(sessionMetadata)
}
/**
* Add custom metadata to session
*/
void addSessionMetadata(String key, Object value) {
sessionMetadata.put(key, value)
}
}
\ No newline at end of file