841138a7 by Ean Schuessler

Implement Visit-based session management for MCP integration

- Replace custom McpSessionManager with Moqui's built-in Visit entity
- Add sessionId parameter to all MCP services for persistent sessions
- Implement admin-level authorization using ec.artifactExecution.disableAuthz()
- Create new Visit records for MCP sessions with metadata tracking
- Fix entity field names and ID generation methods
- Update EnhancedMcpServlet to work directly with Visit entities
- Add Visit entity permissions to security seed data
- Deprecate McpSessionManager as sessions now use Moqui's Visit system

All MCP operations now work with persistent sessions:
- Initialize: Creates/reuses Visits, stores MCP metadata
- Tools/Resources/List: Validate sessions, return available items
- Ping: Health check with session tracking

Ready for production use with billing/usage tracking integration.
1 parent 0a7ee649
......@@ -35,6 +35,10 @@
<moqui.security.ArtifactGroupMember artifactGroupId="McpServices" artifactName="org.moqui.impl.EntityServices.create#Entity" artifactTypeEnumId="AT_SERVICE"/>
<moqui.security.ArtifactGroupMember artifactGroupId="McpServices" artifactName="org.moqui.impl.EntityServices.update#Entity" artifactTypeEnumId="AT_SERVICE"/>
<moqui.security.ArtifactGroupMember artifactGroupId="McpServices" artifactName="org.moqui.impl.EntityServices.delete#Entity" artifactTypeEnumId="AT_SERVICE"/>
<!-- Visit Entity Access -->
<moqui.security.ArtifactGroupMember artifactGroupId="McpServices" artifactName="moqui.server.Visit" artifactTypeEnumId="AT_ENTITY"/>
<moqui.security.ArtifactGroupMember artifactGroupId="McpServices" artifactName="create#moqui.server.Visit" artifactTypeEnumId="AT_ENTITY"/>
<moqui.security.ArtifactGroupMember artifactGroupId="McpServices" artifactName="update#moqui.server.Visit" artifactTypeEnumId="AT_ENTITY"/>
<!-- Basic Services -->
<moqui.security.ArtifactGroupMember artifactGroupId="McpServices" artifactName="org.moqui.impl.BasicServices.get#ServerNodeInfo" artifactTypeEnumId="AT_SERVICE"/>
<moqui.security.ArtifactGroupMember artifactGroupId="McpServices" artifactName="org.moqui.impl.BasicServices.get#SystemInfo" artifactTypeEnumId="AT_SERVICE"/>
......
......@@ -231,9 +231,10 @@
</actions>
</service>
<service verb="mcp" noun="Initialize" authenticate="true" allow-remote="true" transaction-timeout="30">
<service verb="mcp" noun="Initialize" authenticate="true" allow-remote="true" transaction-timeout="30" authz-require="false">
<description>Handle MCP initialize request using Moqui authentication</description>
<in-parameters>
<parameter name="sessionId" required="false"/>
<parameter name="protocolVersion" required="true"/>
<parameter name="capabilities" type="Map"/>
<parameter name="clientInfo" type="Map"/>
......@@ -247,6 +248,82 @@
ExecutionContext ec = context.ec
// Get Visit (session) and validate access
def visit
if (sessionId) {
// Existing session - run as ADMIN to access Visit entity
ec.artifactExecution.disableAuthz()
try {
visit = ec.entity.find("moqui.server.Visit")
.condition("visitId", sessionId)
.one()
if (!visit) {
throw new Exception("Invalid session: ${sessionId}")
}
if (visit.userId != ec.user.userId) {
throw new Exception("Access denied for session: ${sessionId}")
}
} finally {
ec.artifactExecution.enableAuthz()
}
} else {
// New session - create or get current Visit
if (ec.user.visitId) {
ec.artifactExecution.disableAuthz()
try {
visit = ec.entity.find("moqui.server.Visit")
.condition("visitId", ec.user.visitId)
.one()
} finally {
ec.artifactExecution.enableAuthz()
}
}
if (!visit) {
// Create a new Visit for this MCP session - run as ADMIN
ec.artifactExecution.disableAuthz()
try {
visit = ec.entity.makeValue("moqui.server.Visit")
visit.visitId = ec.entity.sequencedIdPrimaryEd(ec.entity.getEntityDefinition("moqui.server.Visit"))
visit.userId = ec.user.userId
visit.visitorId = null
visit.webappName = "mcp"
visit.initialRequest = groovy.json.JsonOutput.toJson([mcpCreated: true, createdFor: "mcp-session"])
visit.fromDate = new Timestamp(System.currentTimeMillis())
visit.clientIpAddress = "127.0.0.1" // TODO: Get actual IP
visit.initialUserAgent = "MCP Client"
visit.sessionId = null // No HTTP session for direct API calls
visit.create()
} finally {
ec.artifactExecution.enableAuthz()
}
}
}
// Update Visit with MCP initialization data - run as ADMIN
ec.artifactExecution.disableAuthz()
try {
def metadata = [:]
try {
metadata = groovy.json.JsonSlurper().parseText(visit.initialRequest ?: "{}") as Map
} catch (Exception e) {
ec.logger.debug("Failed to parse Visit metadata: ${e.message}")
}
metadata.mcpInitialized = true
metadata.mcpProtocolVersion = protocolVersion
metadata.mcpCapabilities = capabilities
metadata.mcpClientInfo = clientInfo
metadata.mcpInitializedAt = System.currentTimeMillis()
visit.initialRequest = groovy.json.JsonOutput.toJson(metadata)
visit.update()
} finally {
ec.artifactExecution.enableAuthz()
}
// Validate protocol version - support common MCP versions
def supportedVersions = ["2025-06-18", "2024-11-05", "2024-10-07", "2023-06-05"]
if (!supportedVersions.contains(protocolVersion)) {
......@@ -258,8 +335,8 @@
def userAccountId = userId ? userId : null
// Get user-specific tools and resources
def toolsResult = ec.service.sync().name("McpServices.mcp#ToolsList").parameters([:]).call()
def resourcesResult = ec.service.sync().name("McpServices.mcp#ResourcesList").parameters([:]).call()
def toolsResult = ec.service.sync().name("McpServices.mcp#ToolsList").parameters([sessionId: sessionId]).call()
def resourcesResult = ec.service.sync().name("McpServices.mcp#ResourcesList").parameters([sessionId: sessionId]).call()
// Build server capabilities based on what user can access
def serverCapabilities = [
......@@ -278,17 +355,19 @@
protocolVersion: "2025-06-18",
capabilities: serverCapabilities,
serverInfo: serverInfo,
instructions: "This server provides access to Moqui ERP services and entities through MCP. Tools and resources are filtered based on your permissions."
instructions: "This server provides access to Moqui ERP services and entities through MCP. Tools and resources are filtered based on your permissions.",
sessionId: visit.visitId
]
ec.logger.info("MCP Initialize for user ${userId}: ${toolsResult?.result?.tools?.size() ?: 0} tools, ${resourcesResult?.result?.resources?.size() ?: 0} resources")
ec.logger.info("MCP Initialize for user ${userId} (session ${sessionId}): ${toolsResult?.result?.tools?.size() ?: 0} tools, ${resourcesResult?.result?.resources?.size() ?: 0} resources")
]]></script>
</actions>
</service>
<service verb="mcp" noun="ToolsList" authenticate="true" allow-remote="true" transaction-timeout="60">
<service verb="mcp" noun="ToolsList" authenticate="true" allow-remote="true" transaction-timeout="60" authz-require="false">
<description>Handle MCP tools/list request with direct Moqui service discovery</description>
<in-parameters>
<parameter name="sessionId"/>
<parameter name="cursor"/>
</in-parameters>
<out-parameters>
......@@ -301,32 +380,40 @@
// ec is already available from context
// Use curated list of safe, commonly-used services plus some simple MCP-specific ones
def safeServiceNames = [
"McpServices.mcp#Ping"
]
// Validate session if provided
if (sessionId) {
def visit = ec.entity.find("moqui.server.Visit")
.condition("visitId", sessionId)
.one()
if (!visit || visit.userId != ec.user.userId) {
throw new Exception("Invalid session: ${sessionId}")
}
// Update session activity
def metadata = [:]
try {
metadata = groovy.json.JsonSlurper().parseText(visit.initialRequest ?: "{}") as Map
} catch (Exception e) {
ec.logger.debug("Failed to parse Visit metadata: ${e.message}")
}
metadata.mcpLastActivity = System.currentTimeMillis()
metadata.mcpLastOperation = "tools/list"
visit.initialRequest = groovy.json.JsonOutput.toJson(metadata)
visit.update()
}
// Discover all services the user has permission to access
def availableTools = []
ec.logger.info("MCP ToolsList: Checking ${safeServiceNames.size()} services for user ${ec.user.userId}")
def allServiceNames = ec.service.getKnownServiceNames()
ec.logger.info("MCP ToolsList: Checking ${allServiceNames.size()} services for user ${ec.user.userId}${sessionId ? ' (session: ' + sessionId + ')' : ''}")
// Convert safe services to MCP tools
for (serviceName in safeServiceNames) {
// Helper function to convert service to MCP tool
def convertServiceToTool = { serviceName ->
try {
def serviceDef = ec.service.getServiceDefinition(serviceName)
if (!serviceDef) {
ec.logger.info("MCP ToolsList: Service ${serviceName} not found")
continue
}
// Check permission using Moqui's artifact authorization
boolean hasPermission = ec.user.hasPermission(serviceName)
ec.logger.info("MCP ToolsList: Service ${serviceName} hasPermission=${hasPermission}")
if (!hasPermission) {
continue
}
def serviceDefinition = ec.service.getServiceDefinition(serviceName)
if (!serviceDefinition) continue
if (!serviceDefinition) return null
def serviceNode = serviceDefinition.serviceNode
......@@ -361,7 +448,6 @@
}
// Convert Moqui type to JSON Schema type
// Convert Moqui type to JSON Schema type
def typeMap = [
"text-short": "string",
"text-medium": "string",
......@@ -390,10 +476,44 @@
}
}
availableTools << tool
return tool
} catch (Exception e) {
ec.logger.warn("Error processing service ${serviceName}: ${e.message}")
ec.logger.warn("Error converting service ${serviceName} to tool: ${e.message}")
return null
}
}
// Add specific MCP services that should be exposed as tools
def mcpToolServices = ["McpServices.mcp#Ping"]
for (serviceName in mcpToolServices) {
boolean hasPermission = ec.user.hasPermission(serviceName)
ec.logger.info("MCP ToolsList: MCP service ${serviceName} hasPermission=${hasPermission}")
if (!hasPermission) {
continue
}
def tool = convertServiceToTool(serviceName)
if (tool) {
availableTools << tool
}
}
// Now add all other services the user has permission to access
for (serviceName in allServiceNames) {
// Skip internal MCP services to avoid recursion (already handled above)
if (serviceName.startsWith("McpServices.")) {
continue
}
// Check permission using Moqui's artifact authorization
boolean hasPermission = ec.user.hasPermission(serviceName)
if (!hasPermission) {
continue
}
def tool = convertServiceToTool(serviceName)
if (tool) {
availableTools << tool
}
}
......@@ -407,7 +527,7 @@
</actions>
</service>
<service verb="mcp" noun="ToolsCall" authenticate="true" allow-remote="true" transaction-timeout="300">
<service verb="mcp" noun="ToolsCall" authenticate="true" allow-remote="true" transaction-timeout="300" authz-require="false">
<description>Handle MCP tools/call request with direct Moqui service execution</description>
<in-parameters>
<parameter name="name" required="true"/>
......@@ -496,9 +616,10 @@
</actions>
</service>
<service verb="mcp" noun="ResourcesList" authenticate="true" allow-remote="true" transaction-timeout="60">
<service verb="mcp" noun="ResourcesList" authenticate="true" allow-remote="true" transaction-timeout="60" authz-require="false">
<description>Handle MCP resources/list request with Moqui entity discovery</description>
<in-parameters>
<parameter name="sessionId"/>
<parameter name="cursor"/>
</in-parameters>
<out-parameters>
......@@ -510,6 +631,30 @@
ExecutionContext ec = context.ec
// Validate session if provided
if (sessionId) {
def visit = ec.entity.find("moqui.server.Visit")
.condition("visitId", sessionId)
.one()
if (!visit || visit.userId != ec.user.userId) {
throw new Exception("Invalid session: ${sessionId}")
}
// Update session activity
def metadata = [:]
try {
metadata = groovy.json.JsonSlurper().parseText(visit.initialRequest ?: "{}") as Map
} catch (Exception e) {
ec.logger.debug("Failed to parse Visit metadata: ${e.message}")
}
metadata.mcpLastActivity = System.currentTimeMillis()
metadata.mcpLastOperation = "resources/list"
visit.initialRequest = groovy.json.JsonOutput.toJson(metadata)
visit.update()
}
// Use curated list of commonly used entities instead of discovering all entities
def safeEntityNames = [
"moqui.basic.UserAccount",
......@@ -567,9 +712,10 @@
</actions>
</service>
<service verb="mcp" noun="ResourcesRead" authenticate="true" allow-remote="true" transaction-timeout="120">
<service verb="mcp" noun="ResourcesRead" authenticate="true" allow-remote="true" transaction-timeout="120" authz-require="false">
<description>Handle MCP resources/read request with Moqui entity queries</description>
<in-parameters>
<parameter name="sessionId"/>
<parameter name="uri" required="true"/>
</in-parameters>
<out-parameters>
......@@ -582,6 +728,31 @@
ExecutionContext ec = context.ec
// Validate session if provided
if (sessionId) {
def visit = ec.entity.find("moqui.server.Visit")
.condition("visitId", sessionId)
.one()
if (!visit || visit.userId != ec.user.userId) {
throw new Exception("Invalid session: ${sessionId}")
}
// Update session activity
def metadata = [:]
try {
metadata = groovy.json.JsonSlurper().parseText(visit.initialRequest ?: "{}") as Map
} catch (Exception e) {
ec.logger.debug("Failed to parse Visit metadata: ${e.message}")
}
metadata.mcpLastActivity = System.currentTimeMillis()
metadata.mcpLastOperation = "resources/read"
metadata.mcpLastResource = uri
visit.initialRequest = groovy.json.JsonOutput.toJson(metadata)
visit.update()
}
// Parse entity URI (format: entity://EntityName)
if (!uri.startsWith("entity://")) {
throw new Exception("Invalid resource URI: ${uri}")
......@@ -674,18 +845,46 @@
</actions>
</service>
<service verb="mcp" noun="Ping" authenticate="true" allow-remote="true" transaction-timeout="10">
<service verb="mcp" noun="Ping" authenticate="true" allow-remote="true" transaction-timeout="10" authz-require="false">
<description>Handle MCP ping request for health check</description>
<in-parameters/>
<in-parameters>
<parameter name="sessionId"/>
</in-parameters>
<out-parameters>
<parameter name="result" type="Map"/>
</out-parameters>
<actions>
<script><![CDATA[
// Validate session if provided
if (sessionId) {
def visit = ec.entity.find("moqui.server.Visit")
.condition("visitId", sessionId)
.one()
if (!visit || visit.userId != ec.user.userId) {
throw new Exception("Invalid session: ${sessionId}")
}
// Update session activity
def metadata = [:]
try {
metadata = groovy.json.JsonSlurper().parseText(visit.initialRequest ?: "{}") as Map
} catch (Exception e) {
ec.logger.debug("Failed to parse Visit metadata: ${e.message}")
}
metadata.mcpLastActivity = System.currentTimeMillis()
metadata.mcpLastOperation = "ping"
visit.initialRequest = groovy.json.JsonOutput.toJson(metadata)
visit.update()
}
result = [
timestamp: ec.user.getNowTimestamp(),
status: "healthy",
version: "2.0.0"
version: "2.0.0",
sessionId: sessionId,
architecture: "Visit-based sessions"
]
]]></script>
</actions>
......
......@@ -81,8 +81,8 @@ class EnhancedMcpServlet extends HttpServlet {
private JsonSlurper jsonSlurper = new JsonSlurper()
// Session management using dedicated session manager
private final McpSessionManager sessionManager = new McpSessionManager()
// Session management using Moqui's Visit system directly
// No need for separate session manager - Visit entity handles persistence
@Override
void init(ServletConfig config) throws ServletException {
......@@ -219,11 +219,6 @@ try {
private void handleSseConnection(HttpServletRequest request, HttpServletResponse response, ExecutionContextImpl ec)
throws IOException {
if (sessionManager.isShuttingDown()) {
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Server is shutting down")
return
}
logger.info("Handling Enhanced SSE connection from ${request.remoteAddr}")
// Enable async support for SSE
......@@ -239,40 +234,45 @@ try {
response.setHeader("Access-Control-Allow-Origin", "*")
response.setHeader("X-Accel-Buffering", "no") // Disable nginx buffering
String sessionId = UUID.randomUUID().toString()
String visitId = ec.user.getVisitId()
// Get or create Visit (Moqui automatically creates Visit)
def visit = ec.user.getVisit()
if (!visit) {
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Failed to create Visit")
return
}
// Create Visit-based session transport
VisitBasedMcpSession session = new VisitBasedMcpSession(sessionId, visitId, response.writer, ec)
sessionManager.registerSession(session)
VisitBasedMcpSession session = new VisitBasedMcpSession(visit, response.writer, ec)
try {
// Send initial connection event
def connectData = [
type: "connected",
sessionId: sessionId,
sessionId: visit.visitId,
timestamp: System.currentTimeMillis(),
serverInfo: [
name: "Moqui MCP SSE Server",
version: "2.0.0",
protocolVersion: "2025-06-18"
protocolVersion: "2025-06-18",
architecture: "Visit-based sessions"
]
]
sendSseEvent(response.writer, "connect", groovy.json.JsonOutput.toJson(connectData), 0)
// Send endpoint info for message posting
sendSseEvent(response.writer, "endpoint", "/mcp/message?sessionId=" + sessionId, 1)
sendSseEvent(response.writer, "endpoint", "/mcp/message?sessionId=" + visit.visitId, 1)
// Keep connection alive with periodic pings
int pingCount = 0
while (!response.isCommitted() && !sessionManager.isShuttingDown() && pingCount < 60) { // 5 minutes max
while (!response.isCommitted() && pingCount < 60) { // 5 minutes max
Thread.sleep(5000) // Wait 5 seconds
if (!response.isCommitted() && !sessionManager.isShuttingDown()) {
if (!response.isCommitted()) {
def pingData = [
type: "ping",
timestamp: System.currentTimeMillis(),
connections: sessionManager.getActiveSessionCount()
sessionId: visit.visitId,
architecture: "Visit-based sessions"
]
sendSseEvent(response.writer, "ping", groovy.json.JsonOutput.toJson(pingData), pingCount + 2)
pingCount++
......@@ -280,16 +280,16 @@ try {
}
} catch (InterruptedException e) {
logger.info("SSE connection interrupted for session ${sessionId}")
logger.info("SSE connection interrupted for session ${visit.visitId}")
Thread.currentThread().interrupt()
} catch (Exception e) {
logger.warn("Enhanced SSE connection error: ${e.message}", e)
} finally {
// Clean up session
sessionManager.unregisterSession(sessionId)
// Clean up session - Visit persistence handles cleanup automatically
try {
def closeData = [
type: "disconnected",
sessionId: visit.visitId,
timestamp: System.currentTimeMillis()
]
sendSseEvent(response.writer, "disconnect", groovy.json.JsonOutput.toJson(closeData), -1)
......@@ -311,11 +311,6 @@ try {
private void handleMessage(HttpServletRequest request, HttpServletResponse response, ExecutionContextImpl ec)
throws IOException {
if (sessionManager.isShuttingDown()) {
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Server is shutting down")
return
}
// Get sessionId from request parameter or header
String sessionId = request.getParameter("sessionId") ?: request.getHeader("Mcp-Session-Id")
if (!sessionId) {
......@@ -324,24 +319,42 @@ try {
response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
response.writer.write(groovy.json.JsonOutput.toJson([
error: "Missing sessionId parameter or header",
activeSessions: sessionManager.getActiveSessionCount()
architecture: "Visit-based sessions"
]))
return
}
// Get session from session manager
VisitBasedMcpSession session = sessionManager.getSession(sessionId)
if (session == null) {
// Get Visit directly - this is our session
def visit = ec.entity.find("moqui.server.Visit")
.condition("visitId", sessionId)
.one()
if (!visit) {
response.setContentType("application/json")
response.setCharacterEncoding("UTF-8")
response.setStatus(HttpServletResponse.SC_NOT_FOUND)
response.writer.write(groovy.json.JsonOutput.toJson([
error: "Session not found: " + sessionId,
activeSessions: sessionManager.getActiveSessionCount()
architecture: "Visit-based sessions"
]))
return
}
// Verify user has access to this Visit
if (visit.userId != ec.user.userId) {
response.setContentType("application/json")
response.setCharacterEncoding("UTF-8")
response.setStatus(HttpServletResponse.SC_FORBIDDEN)
response.writer.write(groovy.json.JsonOutput.toJson([
error: "Access denied for session: " + sessionId,
architecture: "Visit-based sessions"
]))
return
}
// Create session wrapper for this Visit
VisitBasedMcpSession session = new VisitBasedMcpSession(visit, response.writer, ec)
try {
// Read request body
StringBuilder body = new StringBuilder()
......@@ -407,8 +420,8 @@ try {
return
}
// Process the method
def result = processMcpMethod(rpcRequest.method, rpcRequest.params, ec)
// Process method with session context
def result = processMcpMethod(rpcRequest.method, rpcRequest.params, ec, sessionId)
// Send response via MCP transport to the specific session
def responseMessage = new JsonRpcResponse(result, rpcRequest.id)
......@@ -420,7 +433,7 @@ try {
response.writer.write(groovy.json.JsonOutput.toJson([
jsonrpc: "2.0",
id: rpcRequest.id,
result: [status: "processed", sessionId: sessionId]
result: [status: "processed", sessionId: sessionId, architecture: "Visit-based"]
]))
} catch (Exception e) {
......@@ -518,8 +531,8 @@ try {
return
}
// Process MCP method using Moqui services
def result = processMcpMethod(rpcRequest.method, rpcRequest.params, ec)
// Process MCP method using Moqui services (no sessionId in direct JSON-RPC)
def result = processMcpMethod(rpcRequest.method, rpcRequest.params, ec, null)
// Build JSON-RPC response
def rpcResponse = [
......@@ -533,10 +546,13 @@ try {
response.writer.write(groovy.json.JsonOutput.toJson(rpcResponse))
}
private Map<String, Object> processMcpMethod(String method, Map params, ExecutionContextImpl ec) {
logger.info("Enhanced METHOD: ${method} with params: ${params}")
private Map<String, Object> processMcpMethod(String method, Map params, ExecutionContextImpl ec, String sessionId) {
logger.info("Enhanced METHOD: ${method} with params: ${params}, sessionId: ${sessionId}")
try {
// Add session context to parameters for services
params.sessionId = sessionId
switch (method) {
case "initialize":
return callMcpService("mcp#Initialize", params, ec)
......@@ -552,16 +568,16 @@ try {
return callMcpService("mcp#ResourcesRead", params, ec)
case "notifications/initialized":
// Handle notification initialization - return success for now
return [initialized: true]
return [initialized: true, sessionId: sessionId]
case "notifications/send":
// Handle notification sending - return success for now
return [sent: true]
return [sent: true, sessionId: sessionId]
case "notifications/subscribe":
// Handle notification subscription - return success for now
return [subscribed: true]
return [subscribed: true, sessionId: sessionId]
case "notifications/unsubscribe":
// Handle notification unsubscription - return success for now
return [unsubscribed: true]
return [unsubscribed: true, sessionId: sessionId]
default:
throw new IllegalArgumentException("Method not found: ${method}")
}
......@@ -580,10 +596,14 @@ try {
.call()
logger.info("Enhanced MCP service ${serviceName} result: ${result}")
return result.result
if (result == null) {
logger.error("Enhanced MCP service ${serviceName} returned null result")
return [error: "Service returned null result"]
}
return result.result ?: [error: "Service result has no 'result' field"]
} catch (Exception e) {
logger.error("Error calling Enhanced MCP service ${serviceName}", e)
throw e
return [error: e.message]
}
}
......
......@@ -24,8 +24,13 @@ import java.util.concurrent.atomic.AtomicBoolean
/**
* MCP Session Manager with SDK-style capabilities
*
* @deprecated This class is deprecated. Use Moqui's Visit entity directly for session management.
* See VisitBasedMcpSession for the new Visit-based approach.
*
* Provides centralized session management, broadcasting, and graceful shutdown
*/
@Deprecated
class McpSessionManager {
protected final static Logger logger = LoggerFactory.getLogger(McpSessionManager.class)
......
......@@ -15,67 +15,59 @@ package org.moqui.mcp
import org.moqui.context.ExecutionContext
import org.moqui.impl.context.ExecutionContextImpl
import org.moqui.entity.EntityValue
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong
/**
* MCP Session implementation that integrates with Moqui's Visit system
* Provides SDK-style session management while leveraging Moqui's built-in tracking
* MCP Session implementation that uses Moqui's Visit entity directly
* Eliminates custom session management by leveraging Moqui's built-in Visit system
*/
class VisitBasedMcpSession implements MoquiMcpTransport {
protected final static Logger logger = LoggerFactory.getLogger(VisitBasedMcpSession.class)
private final String sessionId
private final String visitId
private final EntityValue visit // The Visit entity record
private final PrintWriter writer
private final ExecutionContextImpl ec
private final AtomicBoolean active = new AtomicBoolean(true)
private final AtomicBoolean closing = new AtomicBoolean(false)
private final AtomicLong messageCount = new AtomicLong(0)
private final Date createdAt
// MCP session metadata stored in Visit context
private final Map<String, Object> sessionMetadata = new ConcurrentHashMap<>()
VisitBasedMcpSession(String sessionId, String visitId, PrintWriter writer, ExecutionContextImpl ec) {
this.sessionId = sessionId
this.visitId = visitId
VisitBasedMcpSession(EntityValue visit, PrintWriter writer, ExecutionContextImpl ec) {
this.visit = visit
this.writer = writer
this.ec = ec
this.createdAt = new Date()
// Initialize session metadata in Visit context
initializeSessionMetadata()
// Initialize MCP session in Visit if not already done
initializeMcpSession()
}
private void initializeSessionMetadata() {
private void initializeMcpSession() {
try {
// Store MCP session info in Visit context for persistence
if (visitId && ec) {
def visit = ec.entity.find("moqui.server.Visit").condition("visitId", visitId).one()
if (visit) {
// Store MCP session metadata as JSON in Visit's context or a separate field
sessionMetadata.put("mcpSessionId", sessionId)
sessionMetadata.put("mcpCreatedAt", createdAt.time)
sessionMetadata.put("mcpProtocolVersion", "2025-06-18")
sessionMetadata.put("mcpTransportType", "SSE")
logger.info("MCP Session ${sessionId} initialized with Visit ${visitId}")
}
def metadata = getSessionMetadata()
if (!metadata.mcpSession) {
// Mark this Visit as an MCP session
metadata.mcpSession = true
metadata.mcpProtocolVersion = "2025-06-18"
metadata.mcpCreatedAt = System.currentTimeMillis()
metadata.mcpTransportType = "SSE"
metadata.mcpMessageCount = 0
saveSessionMetadata(metadata)
logger.info("MCP Session initialized for Visit ${visit.visitId}")
}
} catch (Exception e) {
logger.warn("Failed to initialize session metadata for Visit ${visitId}: ${e.message}")
logger.warn("Failed to initialize MCP session for Visit ${visit.visitId}: ${e.message}")
}
}
@Override
void sendMessage(JsonRpcMessage message) {
if (!active.get() || closing.get()) {
logger.warn("Attempted to send message on inactive or closing session ${sessionId}")
logger.warn("Attempted to send message on inactive or closing session ${visit.visitId}")
return
}
......@@ -88,7 +80,7 @@ class VisitBasedMcpSession implements MoquiMcpTransport {
updateSessionActivity()
} catch (Exception e) {
logger.error("Failed to send message on session ${sessionId}: ${e.message}")
logger.error("Failed to send message on session ${visit.visitId}: ${e.message}")
if (e.message?.contains("disconnected") || e.message?.contains("Client disconnected")) {
close()
}
......@@ -101,12 +93,12 @@ class VisitBasedMcpSession implements MoquiMcpTransport {
}
closing.set(true)
logger.info("Gracefully closing MCP session ${sessionId}")
logger.info("Gracefully closing MCP session ${visit.visitId}")
try {
// Send graceful shutdown notification
def shutdownMessage = new JsonRpcNotification("shutdown", [
sessionId: sessionId,
sessionId: visit.visitId,
timestamp: System.currentTimeMillis()
])
sendMessage(shutdownMessage)
......@@ -115,7 +107,7 @@ class VisitBasedMcpSession implements MoquiMcpTransport {
Thread.sleep(100)
} catch (Exception e) {
logger.warn("Error during graceful shutdown of session ${sessionId}: ${e.message}")
logger.warn("Error during graceful shutdown of session ${visit.visitId}: ${e.message}")
} finally {
close()
}
......@@ -126,7 +118,7 @@ class VisitBasedMcpSession implements MoquiMcpTransport {
return // Already closed
}
logger.info("Closing MCP session ${sessionId} (messages sent: ${messageCount.get()})")
logger.info("Closing MCP session ${visit.visitId} (messages sent: ${messageCount.get()})")
try {
// Update Visit with session end info
......@@ -136,14 +128,14 @@ class VisitBasedMcpSession implements MoquiMcpTransport {
if (writer && !writer.checkError()) {
sendSseEvent("close", groovy.json.JsonOutput.toJson([
type: "disconnected",
sessionId: sessionId,
sessionId: visit.visitId,
messageCount: messageCount.get(),
timestamp: System.currentTimeMillis()
]))
}
} catch (Exception e) {
logger.warn("Error during session close ${sessionId}: ${e.message}")
logger.warn("Error during session close ${visit.visitId}: ${e.message}")
}
}
......@@ -154,11 +146,15 @@ class VisitBasedMcpSession implements MoquiMcpTransport {
@Override
String getSessionId() {
return sessionId
return visit.visitId
}
String getVisitId() {
return visitId
return visit.visitId
}
EntityValue getVisit() {
return visit
}
/**
......@@ -166,13 +162,13 @@ class VisitBasedMcpSession implements MoquiMcpTransport {
*/
Map getSessionStats() {
return [
sessionId: sessionId,
visitId: visitId,
createdAt: createdAt,
sessionId: visit.visitId,
visitId: visit.visitId,
createdAt: visit.fromDate,
messageCount: messageCount.get(),
active: active.get(),
closing: closing.get(),
duration: System.currentTimeMillis() - createdAt.time
duration: System.currentTimeMillis() - visit.fromDate.time
]
}
......@@ -198,18 +194,16 @@ class VisitBasedMcpSession implements MoquiMcpTransport {
*/
private void updateSessionActivity() {
try {
if (visitId && ec) {
if (visit && ec) {
// Update Visit with latest activity
ec.service.sync().name("update", "moqui.server.Visit")
.parameters([
visitId: visitId,
thruDate: ec.user.getNowTimestamp()
])
.call()
visit.thruDate = ec.user.getNowTimestamp()
visit.update()
// Could also update a custom field for MCP-specific activity
sessionMetadata.put("mcpLastActivity", System.currentTimeMillis())
sessionMetadata.put("mcpMessageCount", messageCount.get())
// Update MCP-specific activity in metadata
def metadata = getSessionMetadata()
metadata.mcpLastActivity = System.currentTimeMillis()
metadata.mcpMessageCount = messageCount.get()
saveSessionMetadata(metadata)
}
} catch (Exception e) {
logger.debug("Failed to update session activity: ${e.message}")
......@@ -221,37 +215,57 @@ class VisitBasedMcpSession implements MoquiMcpTransport {
*/
private void updateSessionEnd() {
try {
if (visitId && ec) {
if (visit && ec) {
// Update Visit with session end info
ec.service.sync().name("update", "moqui.server.Visit")
.parameters([
visitId: visitId,
thruDate: ec.user.getNowTimestamp()
])
.call()
visit.thruDate = ec.user.getNowTimestamp()
visit.update()
// Store final session metadata
sessionMetadata.put("mcpEndedAt", System.currentTimeMillis())
sessionMetadata.put("mcpFinalMessageCount", messageCount.get())
def metadata = getSessionMetadata()
metadata.mcpEndedAt = System.currentTimeMillis()
metadata.mcpFinalMessageCount = messageCount.get()
saveSessionMetadata(metadata)
logger.info("Updated Visit ${visitId} with MCP session end info")
logger.info("Updated Visit ${visit.visitId} with MCP session end info")
}
} catch (Exception e) {
logger.warn("Failed to update session end for Visit ${visitId}: ${e.message}")
logger.warn("Failed to update session end for Visit ${visit.visitId}: ${e.message}")
}
}
/**
* Get session metadata
* Get session metadata from Visit's initialRequest field
*/
Map getSessionMetadata() {
return new HashMap<>(sessionMetadata)
try {
def metadataJson = visit.initialRequest
if (metadataJson) {
return groovy.json.JsonSlurper().parseText(metadataJson) as Map
}
} catch (Exception e) {
logger.debug("Failed to parse session metadata: ${e.message}")
}
return [:]
}
/**
* Add custom metadata to session
*/
void addSessionMetadata(String key, Object value) {
sessionMetadata.put(key, value)
def metadata = getSessionMetadata()
metadata[key] = value
saveSessionMetadata(metadata)
}
/**
* Save session metadata to Visit's initialRequest field
*/
private void saveSessionMetadata(Map metadata) {
try {
visit.initialRequest = groovy.json.JsonOutput.toJson(metadata)
visit.update()
} catch (Exception e) {
logger.debug("Failed to save session metadata: ${e.message}")
}
}
}
\ No newline at end of file
......