39daa815 by Ean Schuessler

Implement Reliable Agent Architecture using CommunicationEvent thread logic

- Update AgentServices.xml: Single-turn state machine (process one turn, re-queue if tool called)
- Add Agent.secas.xml: Trigger Agent Turn on new CommunicationEvent to Agent Party
- Update AgentEntities.xml: Add rootCommEventId to SystemMessage for thread tracking
- Update AgentData.xml: Define Agent Party, Role, and default VLLM config
1 parent 85eedd26
......@@ -2,20 +2,18 @@
<entity-facade-xml xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="http://moqui.org/xsd/entity-facade-3.xsd" type="seed">
<!-- ========================================================= -->
<!-- Agent User Account (for authentication) -->
<!-- ========================================================= -->
<moqui.security.UserGroup userGroupId="AgentUsers" description="AI Agent Users"/>
<moqui.security.UserAccount
userId="AGENT_CLAUDE"
username="agent-claude"
currentPassword="16ac58bbfa332c1c55bd98b53e60720bfa90d394"
passwordHashType="SHA"/>
<!-- Agent Party -->
<mantle.party.Party partyId="AGENT_CLAUDE_PARTY" partyTypeEnumId="PtyPerson"/>
<mantle.party.Person partyId="AGENT_CLAUDE_PARTY" firstName="Claude" lastName="Agent"/>
<mantle.party.PartyRole partyId="AGENT_CLAUDE_PARTY" roleTypeId="Agent"/>
<moqui.security.UserGroup userGroupId="AgentUsers" description="AI Agent Users"/>
<moqui.security.UserAccount userId="AGENT_CLAUDE" username="agent-claude" partyId="AGENT_CLAUDE_PARTY"
currentPassword="16ac58bbfa332c1c55bd98b53e60720bfa90d394" passwordHashType="SHA"/>
<moqui.security.UserGroupMember userGroupId="AgentUsers" userId="AGENT_CLAUDE" fromDate="2026-02-04 00:00:00.000"/>
<!-- Agent users have permission to execute the delegation service -->
<!-- Agent users have permission to execute delegation service -->
<moqui.security.ArtifactGroup artifactGroupId="AgentDelegationServices" description="Agent Tool Delegation Services"/>
<moqui.security.ArtifactGroupMember artifactGroupId="AgentDelegationServices" artifactName="AgentServices.call#McpToolWithDelegation" artifactTypeEnumId="AT_SERVICE"/>
<moqui.security.ArtifactAuthz userGroupId="AgentUsers" artifactGroupId="AgentDelegationServices" authzTypeEnumId="AUTHZT_ALLOW" authzActionEnumId="AUTHZA_ALL"/>
......@@ -27,7 +25,13 @@
<!-- Agent Task Message Type -->
<moqui.service.message.SystemMessageType systemMessageTypeId="SmtyAgentTask" description="Agent Task"
contentType="application/json"
consumeServiceName="AgentServices.poll#AgentQueue"
receiveServiceName=""/>
consumeServiceName="AgentServices.poll#AgentQueue"/>
<!-- Default AI Config (Brainfood VLLM) -->
<moqui.mcp.agent.ProductStoreAiConfig
productStoreId="POPC_DEFAULT" aiConfigId="DEFAULT"
serviceTypeEnumId="AistVllm" description="Brainfood VLLM"
endpointUrl="http://crunchy.private.brainfood.com:11434/v1" apiKey="brainfood"
modelName="bf-ai" temperature="0.7" maxTokens="4096"/>
</entity-facade-xml>
......
......@@ -19,6 +19,9 @@
<field name="aiConfigId" type="id">
<description>Specific AI configuration used for this task.</description>
</field>
<field name="rootCommEventId" type="id">
<description>The root CommunicationEvent ID for the conversation thread.</description>
</field>
<relationship type="one" title="RequestedBy" related="mantle.party.Party">
<key-map field-name="requestedByPartyId" related="partyId"/>
......
<?xml version="1.0" encoding="UTF-8"?>
<secas xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://moqui.org/xsd/service-eca-3.xsd">
<seca id="AgentTriggerOnCommunication" service="create#mantle.party.communication.CommunicationEvent" when="post-service">
<condition>
<expression>toPartyId == 'AGENT_CLAUDE_PARTY'</expression>
</condition>
<actions>
<!-- Ensure rootCommEventId is set (thread tracking) -->
<script><![CDATA[
def rootId = rootCommEventId ?: communicationEventId
if (!rootCommEventId) {
ec.service.sync().name("update#mantle.party.communication.CommunicationEvent")
.parameters([communicationEventId: communicationEventId, rootCommEventId: rootId])
.call()
}
// Trigger Agent Turn
ec.service.sync().name("create#moqui.service.message.SystemMessage").parameters([
systemMessageTypeId: 'SmtyAgentTask',
statusId: 'SmsgReceived',
requestedByPartyId: fromPartyId,
effectiveUserId: ec.user.userId, // Use the actual human user ID for RBAC
productStoreId: 'POPC_DEFAULT',
aiConfigId: 'DEFAULT',
rootCommEventId: rootId,
isOutgoing: 'N'
]).call()
]]></script>
</actions>
</seca>
</secas>
......@@ -32,16 +32,13 @@
try {
// 2. Switch identity to target user
// 'false' arg means don't trigger history/visit updates for this switch
boolean loggedIn = ec.user.internalLoginUser(runAsUserId, false)
if (!loggedIn) throw new Exception("Could not switch to user ${runAsUserId}")
ec.logger.info("Agent ${agentUsername} executing ${toolName} AS ${ec.user.username} (${runAsUserId})")
// 3. Execute Tool (Standard RBAC applies to this user)
// 3. Execute Tool
McpToolAdapter adapter = new McpToolAdapter()
// The adapter MUST NOT disableAuthz internally for this to be secure
result = adapter.callTool(ec, toolName, arguments)
} finally {
......@@ -79,7 +76,6 @@
import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
// Construct payload
def payloadMap = [
model: model,
messages: messages,
......@@ -87,38 +83,27 @@
stream: false
]
if (maxTokens) payloadMap.maxTokens = maxTokens
if (maxTokens) payloadMap.max_tokens = maxTokens
if (tools) payloadMap.tools = tools
String jsonPayload = new JsonBuilder(payloadMap).toString()
// Setup connection
URL url = new URL(endpointUrl + "/chat/completions")
HttpURLConnection conn = (HttpURLConnection) url.openConnection()
conn.setRequestMethod("POST")
conn.setRequestProperty("Content-Type", "application/json")
if (apiKey) conn.setRequestProperty("Authorization", "Bearer " + apiKey)
conn.setDoOutput(true)
conn.setConnectTimeout(10000) // 10s connect
conn.setReadTimeout(60000) // 60s read (LLMs are slow)
conn.setConnectTimeout(10000)
conn.setReadTimeout(60000)
try {
conn.outputStream.write(jsonPayload.getBytes("UTF-8"))
httpStatus = conn.responseCode
InputStream is = (httpStatus >= 200 && httpStatus < 300) ? conn.inputStream : conn.errorStream
String responseText = is?.text
if (responseText) {
response = new JsonSlurper().parseText(responseText)
}
if (httpStatus >= 300) {
error = "HTTP ${httpStatus}: ${responseText}"
ec.logger.error("OpenAI Client Error: ${error}")
}
if (responseText) response = new JsonSlurper().parseText(responseText)
if (httpStatus >= 300) error = "HTTP ${httpStatus}: ${responseText}"
} catch (Exception e) {
error = e.message
httpStatus = 500
......@@ -129,13 +114,13 @@
</service>
<!-- ========================================================= -->
<!-- Agent Runner (The Loop) -->
<!-- Agent Runner (Single Turn State Machine) -->
<!-- ========================================================= -->
<service verb="run" noun="AgentTask" authenticate="false">
<service verb="run" noun="AgentTaskTurn" authenticate="false">
<description>
Processes a single Agent Task SystemMessage.
Handles the loop of: Prompt -> LLM -> Tool Call -> Tool Execution -> Prompt.
Processes ONE turn of an Agent Task.
Loads thread history, calls LLM, executes ONE set of tools, saves state, and re-queues if needed.
</description>
<in-parameters>
<parameter name="systemMessageId" required="true"/>
......@@ -144,147 +129,136 @@
<script><![CDATA[
import groovy.json.JsonOutput
import groovy.json.JsonSlurper
// 1. Load SystemMessage and Config
import org.moqui.mcp.adapter.McpToolAdapter
// 1. Load SystemMessage Task
def taskMsg = ec.entity.find("moqui.service.message.SystemMessage")
.condition("systemMessageId", systemMessageId)
.one()
if (!taskMsg) return
.condition("systemMessageId", systemMessageId).one()
if (!taskMsg || taskMsg.statusId != "SmsgReceived") return
// Get AI Config
def aiConfig = ec.entity.find("moqui.mcp.agent.ProductStoreAiConfig")
.condition("productStoreId", taskMsg.productStoreId)
.condition("aiConfigId", taskMsg.aiConfigId)
.one()
.condition("aiConfigId", taskMsg.aiConfigId).one()
// Fallback to ProductStoreSetting if no specific AI Config found
def endpointUrl, apiKey, modelName, temperature
if (aiConfig) {
endpointUrl = aiConfig.endpointUrl
apiKey = aiConfig.apiKey
modelName = aiConfig.modelName
temperature = aiConfig.temperature
} else if (taskMsg.productStoreId) {
// Try ProductStoreSettings
def settings = ec.entity.find("mantle.product.store.ProductStoreSetting")
.condition("productStoreId", taskMsg.productStoreId)
.condition("settingTypeEnumId", ["AiEndpointUrl", "AiApiKey", "AiModelName", "AiTemperature"])
.list()
endpointUrl = settings.find { it.settingTypeEnumId == "AiEndpointUrl" }?.settingValue
apiKey = settings.find { it.settingTypeEnumId == "AiApiKey" }?.settingValue
modelName = settings.find { it.settingTypeEnumId == "AiModelName" }?.settingValue
temperature = settings.find { it.settingTypeEnumId == "AiTemperature" }?.settingValue?.toBigDecimal()
}
if (!endpointUrl || !modelName) {
ec.logger.error("No AI Configuration (Entity or Settings) found for task ${systemMessageId}")
taskMsg.statusId = "SmsError"
taskMsg.messageText = "Missing AI Configuration (Endpoint or Model)"
taskMsg.update()
if (!aiConfig?.endpointUrl || !aiConfig?.modelName) {
taskMsg.statusId = "SmsgError"; taskMsg.update()
return
}
// 2. Reconstruct Conversation History from CommunicationEvents
def messages = []
messages.add([role: "system", content: "You are a helpful Moqui ERP assistant. You act as user ${taskMsg.effectiveUserId}."])
// Default temperature if missing
if (temperature == null) temperature = 0.7
// Filter out dangerous tools if needed? For now, we rely on RBAC delegation.
def openAiTools = moquiTools.collect { tool ->
[
type: "function",
function: [
name: tool.name,
description: tool.description,
// Helper to build schema (simplified for now, ideally strictly typed)
parameters: [
type: "object",
properties: [
path: [type: "string", description: "Screen path or resource URI"],
action: [type: "string", description: "Action to perform (create, update, etc)"],
parameters: [type: "object", description: "Key-value pairs for the action"]
]
]
]
]
}
// 3. Build Conversation History
// TODO: Load history if this is a continuation. For now, simple start.
def messages = [
[role: "system", content: "You are a helpful Moqui ERP assistant. You act as user ${taskMsg.requestedByUserId}."],
[role: "user", content: taskMsg.messageText]
]
// 4. The Loop (Max 5 turns for safety)
int maxTurns = 5
int currentTurn = 0
boolean taskComplete = false
while (currentTurn < maxTurns && !taskComplete) {
currentTurn++
// Call LLM
def llmResult = ec.service.sync().name("AgentServices.call#OpenAiChatCompletion").parameters([
endpointUrl: aiConfig.endpointUrl,
apiKey: aiConfig.apiKey, // Decrypt if needed
model: aiConfig.modelName,
messages: messages,
tools: openAiTools,
temperature: aiConfig.temperature
]).call()
if (llmResult.error) {
taskMsg.statusId = "SmsError"
taskMsg.messageText += "\nError: ${llmResult.error}"
taskMsg.update()
return
}
def responseMsg = llmResult.response.choices[0].message
messages.add(responseMsg) // Add assistant response to history
if (taskMsg.rootCommEventId) {
def threadEvents = ec.entity.find("mantle.party.communication.CommunicationEvent")
.condition("rootCommEventId", taskMsg.rootCommEventId)
.orderBy("entryDate").list()
// Check for Tool Calls
if (responseMsg.tool_calls) {
ec.logger.info("Agent requesting ${responseMsg.tool_calls.size()} tools")
threadEvents.each { ev ->
// Distinguish roles based on fromPartyId
String role = (ev.fromPartyId == "AGENT_CLAUDE_PARTY") ? "assistant" : "user"
responseMsg.tool_calls.each { toolCall ->
def functionName = toolCall.function.name
def functionArgs = new JsonSlurper().parseText(toolCall.function.arguments)
def toolCallId = toolCall.id
// EXECUTE TOOL via Secure Bridge
def executionResult = [:]
try {
def runResult = ec.service.sync().name("AgentServices.call#McpToolWithDelegation").parameters([
toolName: functionName,
arguments: functionArgs,
runAsUserId: taskMsg.effectiveUserId // DELEGATION!
]).call()
executionResult = runResult.result
} catch (Exception e) {
executionResult = [error: e.message]
// Check if it's a tool result (stored in contentType application/json)
if (ev.contentType == "application/json") {
def json = new JsonSlurper().parseText(ev.body)
if (json.tool_call_id) {
messages.add([role: "tool", tool_call_id: json.tool_call_id, content: json.content])
} else if (json.tool_calls) {
messages.add([role: "assistant", tool_calls: json.tool_calls])
}
// Add result to history
messages.add([
role: "tool",
tool_call_id: toolCallId,
content: JsonOutput.toJson(executionResult)
])
} else {
messages.add([role: role, content: ev.body])
}
} else {
// No tool calls = Final Response
taskComplete = true
}
} else {
// Initial task message
messages.add([role: "user", content: taskMsg.messageText])
}
// 3. Prepare Tools
def mcpToolAdapter = new org.moqui.mcp.adapter.McpToolAdapter()
def moquiTools = mcpToolAdapter.listTools()
def openAiTools = moquiTools.collect { tool ->
[type: "function", function: [
name: tool.name, description: tool.description,
parameters: [type: "object", properties: [
path: [type: "string"], action: [type: "string"], parameters: [type: "object"]
]]
]]
}
// 4. Call LLM
def llmResult = ec.service.sync().name("AgentServices.call#OpenAiChatCompletion").parameters([
endpointUrl: aiConfig.endpointUrl, apiKey: aiConfig.apiKey,
model: aiConfig.modelName, messages: messages, tools: openAiTools,
temperature: aiConfig.temperature
]).call()
if (llmResult.error) {
taskMsg.statusId = "SmsgError"; taskMsg.update()
return
}
def responseMsg = llmResult.response.choices[0].message
// 5. Handle Response
if (responseMsg.tool_calls) {
// SAVE Assistant "Thought" (Tool Calls)
def assistantComm = ec.service.sync().name("create#mantle.party.communication.CommunicationEvent").parameters([
fromPartyId: "AGENT_CLAUDE_PARTY", toPartyId: taskMsg.requestedByPartyId,
rootCommEventId: taskMsg.rootCommEventId, parentCommEventId: taskMsg.rootCommEventId,
communicationEventTypeId: "Message", contentType: "application/json",
body: JsonOutput.toJson([tool_calls: responseMsg.tool_calls]),
entryDate: ec.user.nowTimestamp, statusId: "CeReceived"
]).call()
// EXECUTE Tools and Save Results
responseMsg.tool_calls.each { toolCall ->
def result = [:]
try {
def runResult = ec.service.sync().name("AgentServices.call#McpToolWithDelegation").parameters([
toolName: toolCall.function.name, arguments: new JsonSlurper().parseText(toolCall.function.arguments),
runAsUserId: taskMsg.effectiveUserId
]).call()
result = runResult.result
} catch (Exception e) { result = [error: e.message] }
// Save Tool Result as CommEvent
ec.service.sync().name("create#mantle.party.communication.CommunicationEvent").parameters([
fromPartyId: taskMsg.requestedByPartyId, toPartyId: "AGENT_CLAUDE_PARTY",
rootCommEventId: taskMsg.rootCommEventId, parentCommEventId: assistantComm.communicationEventId,
communicationEventTypeId: "Message", contentType: "application/json",
body: JsonOutput.toJson([tool_call_id: toolCall.id, content: JsonOutput.toJson(result)]),
entryDate: ec.user.nowTimestamp, statusId: "CeReceived"
]).call()
}
// 6. RE-QUEUE: Create next turn message
ec.service.sync().name("create#moqui.service.message.SystemMessage").parameters([
systemMessageTypeId: "SmtyAgentTask", statusId: "SmsgReceived",
productStoreId: taskMsg.productStoreId, aiConfigId: taskMsg.aiConfigId,
requestedByPartyId: taskMsg.requestedByPartyId, effectiveUserId: taskMsg.effectiveUserId,
rootCommEventId: taskMsg.rootCommEventId, isOutgoing: "N"
]).call()
taskMsg.statusId = "SmsgConsumed"; taskMsg.update()
} else {
// FINAL Response
ec.service.sync().name("create#mantle.party.communication.CommunicationEvent").parameters([
fromPartyId: "AGENT_CLAUDE_PARTY", toPartyId: taskMsg.requestedByPartyId,
rootCommEventId: taskMsg.rootCommEventId, parentCommEventId: taskMsg.rootCommEventId,
communicationEventTypeId: "Message", contentType: "text/plain",
body: responseMsg.content, entryDate: ec.user.nowTimestamp, statusId: "CeReceived"
]).call()
taskMsg.statusId = "SmsgConfirmed"; taskMsg.update()
}
]]></script>
</actions>
</service>
<!-- ========================================================= -->
<!-- Task Scheduler (Polls Queue) -->
<!-- ========================================================= -->
......@@ -293,27 +267,18 @@
<description>Scheduled service to pick up pending tasks and process them.</description>
<actions>
<script><![CDATA[
import org.moqui.entity.EntityCondition
// Find pending tasks
def pendingTasks = ec.entity.find("moqui.service.message.SystemMessage")
.condition("statusId", "SmsReceived") // Or generic 'Pending'
.condition("statusId", "SmsgReceived")
.condition("systemMessageTypeId", "SmtyAgentTask")
.limit(5) // Batch size
.disableAuthz() // System service needs to see all tasks
.limit(5)
.disableAuthz()
.list()
pendingTasks.each { task ->
// Mark as In Progress
task.statusId = "SmsConsumed"
task.update()
// Run Agent Task service (noStatusUpdate=false to prevent auto status change)
ec.service.sync().name("AgentServices.run#AgentTask")
.parameters([
systemMessageId: task.systemMessageId,
noStatusUpdate: false
])
// Run Agent Task Turn
ec.service.sync().name("AgentServices.run#AgentTaskTurn")
.parameters([systemMessageId: task.systemMessageId])
.call()
}
]]></script>
......