1c226c77 by Ean Schuessler

Continued effort to rough in a notifications bus

1 parent 6bbfd372
......@@ -312,8 +312,41 @@ try {
private void handleSseConnection(HttpServletRequest request, HttpServletResponse response, ExecutionContextImpl ec, String webappName)
throws IOException {
logger.info("Handling Enhanced SSE connection from ${request.remoteAddr}")
logger.info("Handling Enhanced SSE connection from ${request.remoteAddr}")
// Check for existing session ID first
String sessionId = request.getHeader("Mcp-Session-Id")
def visit = null
// If we have a session ID, try to find existing Visit
if (sessionId) {
try {
visit = ec.entity.find("moqui.server.Visit")
.condition("visitId", sessionId)
.disableAuthz()
.one()
if (visit) {
// Verify user has access to this Visit
if (!visit.userId || !ec.user.userId || visit.userId.toString() != ec.user.userId.toString()) {
logger.warn("Visit userId ${visit.userId} doesn't match current user userId ${ec.user.userId} - access denied")
response.sendError(HttpServletResponse.SC_FORBIDDEN, "Access denied for session: " + sessionId)
return
}
// Set existing visit ID in HTTP session
request.session.setAttribute("moqui.visitId", sessionId)
logger.info("Reusing existing Visit ${sessionId} for user ${ec.user.username}")
} else {
logger.warn("Session ID ${sessionId} not found, will create new Visit")
}
} catch (Exception e) {
logger.warn("Error looking up existing session ${sessionId}: ${e.message}")
}
}
// Only create new Visit if we didn't find an existing one
if (!visit) {
// Initialize web facade for Visit creation, but avoid screen resolution
// Modify request path to avoid ScreenResourceNotFoundException
String originalRequestURI = request.getRequestURI()
......@@ -321,8 +354,6 @@ logger.info("Handling Enhanced SSE connection from ${request.remoteAddr}")
request.setAttribute("javax.servlet.include.request_uri", "/mcp")
request.setAttribute("javax.servlet.include.path_info", "")
def visit = null
try {
ec.initWebFacade(webappName, request, response)
// Web facade was successful, get the Visit it created
......@@ -330,6 +361,7 @@ logger.info("Handling Enhanced SSE connection from ${request.remoteAddr}")
if (!visit) {
throw new Exception("Web facade succeeded but no Visit created")
}
logger.info("Created new Visit ${visit.visitId} for user ${ec.user.username}")
} catch (Exception e) {
logger.warn("Web facade initialization failed: ${e.message}, trying manual Visit creation")
// Try to create Visit manually using the same pattern as UserFacadeImpl
......@@ -379,6 +411,7 @@ logger.info("Handling Enhanced SSE connection from ${request.remoteAddr}")
return
}
}
}
// Final check that we have a Visit
if (!visit) {
......@@ -406,7 +439,14 @@ logger.info("Handling Enhanced SSE connection from ${request.remoteAddr}")
VisitBasedMcpSession session = new VisitBasedMcpSession(visit, response.writer, ec)
try {
// Send initial connection event
// Check if this is old HTTP+SSE transport (no session ID, no prior initialization)
// Send endpoint event first for backwards compatibility
if (!request.getHeader("Mcp-Session-Id")) {
logger.info("No Mcp-Session-Id header detected, assuming old HTTP+SSE transport")
sendSseEvent(response.writer, "endpoint", "/mcp", 0)
}
// Send initial connection event for new transport
def connectData = [
version: "2.0.2",
protocolVersion: "2025-06-18",
......@@ -415,11 +455,9 @@ logger.info("Handling Enhanced SSE connection from ${request.remoteAddr}")
// Set MCP session ID header per specification BEFORE sending any data
response.setHeader("Mcp-Session-Id", visit.visitId.toString())
logger.info("Set Mcp-Session-Id header to ${visit.visitId} for SSE connection")
sendSseEvent(response.writer, "connect", JsonOutput.toJson(connectData), 0)
// Send endpoint info for message posting (for compatibility)
sendSseEvent(response.writer, "endpoint", "/mcp", 1)
sendSseEvent(response.writer, "connect", JsonOutput.toJson(connectData), 1)
// Keep connection alive with periodic pings
int pingCount = 0
......@@ -849,10 +887,19 @@ logger.info("Handling Enhanced SSE connection from ${request.remoteAddr}")
// Set Mcp-Session-Id header BEFORE any response data (per MCP 2025-06-18 spec)
// For initialize method, always use sessionId we have (from visit or header)
String responseSessionId = null
if (rpcRequest.method == "initialize" && sessionId) {
response.setHeader("Mcp-Session-Id", sessionId.toString())
responseSessionId = sessionId.toString()
} else if (result?.sessionId) {
response.setHeader("Mcp-Session-Id", result.sessionId.toString())
responseSessionId = result.sessionId.toString()
} else if (sessionId) {
// For other methods, ensure we always return the session ID from header
responseSessionId = sessionId.toString()
}
if (responseSessionId) {
response.setHeader("Mcp-Session-Id", responseSessionId)
logger.info("Set Mcp-Session-Id header to ${responseSessionId} for method ${rpcRequest.method}")
}
// Build JSON-RPC response for regular requests
......@@ -910,9 +957,9 @@ logger.info("Handling Enhanced SSE connection from ${request.remoteAddr}")
// For initialize, use the visitId we just created instead of null sessionId from request
if (visit && visit.visitId) {
params.sessionId = visit.visitId
// Set session to initializing state using the header sessionId as key (for consistency)
sessionStates.put(sessionId, STATE_INITIALIZING)
logger.info("Initialize - using visitId: ${visit.visitId}, set state ${sessionId} to INITIALIZING")
// Set session to initializing state using actual sessionId as key (for consistency)
sessionStates.put(params.sessionId, STATE_INITIALIZING)
logger.info("Initialize - using visitId: ${visit.visitId}, set state ${params.sessionId} to INITIALIZING")
} else {
logger.warn("Initialize - no visit available, using null sessionId")
}
......