f3c195cf by Ean Schuessler

Initial service version, static support files

1 parent 6413f550
...@@ -7,5 +7,14 @@ ...@@ -7,5 +7,14 @@
7 <classpath type="jar" location="lib/*"/> 7 <classpath type="jar" location="lib/*"/>
8 <classpath type="jar" location="build/lib/*"/> 8 <classpath type="jar" location="build/lib/*"/>
9 9
10 <service-resource type="model" loader="main" location="servicedef/services.xml"/>
11
12 <webapp name="comet-messenger"
13 title="Comet Messenger"
14 server="default-server"
15 location="webapp"
16 mount-point="/comet"
17 app-bar-display="false"/>
18
10 </ofbiz-component> 19 </ofbiz-component>
11 20
......
...@@ -5,4 +5,19 @@ ...@@ -5,4 +5,19 @@
5 <vendor>DriverUp</vendor> 5 <vendor>DriverUp</vendor>
6 <version>1.0</version> 6 <version>1.0</version>
7 7
8 <service name="sendCometMessage" engine="java"
9 location="com.brainfood.ofbiz.CometMessengerServlet"
10 invoke="sendMessage" auth="false">
11 <attribute name="realm" type="String" mode="IN" optional="false"/>
12 <attribute name="channel" type="String" mode="IN" optional="false"/>
13 <attribute name="message" type="String" mode="IN" optional="false"/>
14 </service>
15
16 <service name="receiveCometMessage" engine="java"
17 location="com.brainfood.ofbiz.CometMessengerServlet"
18 invoke="receiveMessage" auth="false">
19 <attribute name="realm" type="String" mode="IN" optional="false"/>
20 <attribute name="channel" type="String" mode="IN" optional="false"/>
21 <attribute name="message" type="String" mode="IN" optional="false"/>
22 </service>
8 </services> 23 </services>
......
...@@ -2,38 +2,257 @@ package com.brainfood.ofbiz; ...@@ -2,38 +2,257 @@ package com.brainfood.ofbiz;
2 2
3 import org.apache.catalina.comet.CometEvent; 3 import org.apache.catalina.comet.CometEvent;
4 import org.apache.catalina.comet.CometProcessor; 4 import org.apache.catalina.comet.CometProcessor;
5
6 import java.util.concurrent.ConcurrentHashMap;
7 import java.util.ArrayList;
8 import java.util.Map;
9
10 import java.io.IOException;
11 import java.io.PrintWriter;
12 import java.io.InputStream;
13
5 import javax.servlet.http.HttpServlet; 14 import javax.servlet.http.HttpServlet;
6 import javax.servlet.http.HttpServletRequest; 15 import javax.servlet.http.HttpServletRequest;
7 import javax.servlet.http.HttpServletResponse; 16 import javax.servlet.http.HttpServletResponse;
17
8 import javax.servlet.ServletException; 18 import javax.servlet.ServletException;
9 import java.io.IOException; 19 import javax.servlet.ServletContext;
20
21 import org.ofbiz.base.util.Debug;
22 import org.ofbiz.base.util.UtilMisc;
23
24 import org.ofbiz.service.DispatchContext;
25 import org.ofbiz.service.LocalDispatcher;
26 import org.ofbiz.service.ServiceContainer;
27 import org.ofbiz.service.GenericServiceException;
28 import org.ofbiz.service.ServiceUtil;;
29
30 import org.ofbiz.entity.Delegator;
31 import org.ofbiz.entity.DelegatorFactory;
10 32
11 public class CometMessengerServlet extends HttpServlet implements CometProcessor { 33 public class CometMessengerServlet extends HttpServlet implements CometProcessor {
12 34
35 public static final String module = CometMessengerServlet.class.getName();
13 private static final Integer TIMEOUT = 60 * 1000; 36 private static final Integer TIMEOUT = 60 * 1000;
14 37
38 protected String realm = "default";
39 public static ConcurrentHashMap <String, CometMessengerServlet>realms = new <String, CometMessengerServlet>ConcurrentHashMap();
40
41 protected ArrayList<HttpServletResponse> connections =
42 new ArrayList<HttpServletResponse>();
43 protected MessageSender messageSender = null;
44 private LocalDispatcher dispatcher = null;
45
15 @Override 46 @Override
16 public void destroy() { 47 public void destroy() {
48 connections.clear();
49 realms.remove(realm);
50
51 messageSender.stop();
52 messageSender = null;
17 } 53 }
18 54
19 @Override 55 @Override
20 public void init() throws ServletException { 56 public void init() throws ServletException {
57 Debug.logInfo("INIT Comet Servlet", module);
58 if (realms.get(realm) != null) Debug.logError("Realm [" + realm + "] already registered", module);
59
60 realms.put(realm, this);
61
62 messageSender = new MessageSender(realm);
63 Thread messageSenderThread =
64 new Thread(messageSender, "MessageSender[" + getServletContext().getContextPath() + "]");
65 messageSenderThread.setDaemon(true);
66 messageSenderThread.start();
67
68 dispatcher = getDispatcher(getServletContext());
69 }
70
71 public void error(CometEvent event, HttpServletRequest request, HttpServletResponse response) {
72 Debug.logInfo("ERROR " + event + " " + request + " " + response, module);
21 } 73 }
22 74
23 public void event(final CometEvent event) throws IOException, ServletException { 75 public void event(final CometEvent event) throws IOException, ServletException {
24 HttpServletRequest request = event.getHttpServletRequest(); 76 HttpServletRequest request = event.getHttpServletRequest();
25 HttpServletResponse response = event.getHttpServletResponse(); 77 HttpServletResponse response = event.getHttpServletResponse();
78 response.setContentType("application/json");
26 if (event.getEventType() == CometEvent.EventType.BEGIN) { 79 if (event.getEventType() == CometEvent.EventType.BEGIN) {
27 request.setAttribute("org.apache.tomcat.comet.timeout", TIMEOUT); 80 Debug.logInfo("Begin for session: " + request.getSession(true).getId(), module);
28 log("Begin for session: " + request.getSession(true).getId()); 81 PrintWriter writer = response.getWriter();
82 writer.println("{\"channel\":\"system\", \"message\":\"CONNECT\"}");
83 writer.flush();
84 synchronized(connections) {
85 connections.add(response);
86 }
29 } else if (event.getEventType() == CometEvent.EventType.ERROR) { 87 } else if (event.getEventType() == CometEvent.EventType.ERROR) {
30 log("Error for session: " + request.getSession(true).getId()); 88 Debug.logInfo("Error for session: " + request.getSession(true).getId(), module);
89 synchronized(connections) {
90 connections.remove(response);
91 }
31 event.close(); 92 event.close();
32 } else if (event.getEventType() == CometEvent.EventType.END) { 93 } else if (event.getEventType() == CometEvent.EventType.END) {
33 log("End for session: " + request.getSession(true).getId()); 94 Debug.logInfo("End for session: " + request.getSession(true).getId(), module);
95 PrintWriter writer = response.getWriter();
96 writer.println("{\"channel\":\"system\", \"message\":\"DISCONNECT\"}");
97 writer.flush();
98 synchronized(connections) {
99 connections.remove(response);
100 }
34 event.close(); 101 event.close();
35 } else if (event.getEventType() == CometEvent.EventType.READ) { 102 } else if (event.getEventType() == CometEvent.EventType.READ) {
36 throw new UnsupportedOperationException("This servlet does not accept data"); 103 InputStream is = request.getInputStream();
104 byte[] buf = new byte[512];
105 do {
106 int n = is.read(buf); //can throw an IOException
107 if (n > 0) {
108 Debug.logInfo("Read " + n + " bytes: " + new String(buf, 0, n)
109 + " for session: " + request.getSession(true).getId(), module);
110 } else if (n < 0) {
111 error(event, request, response);
112 return;
113 }
114 } while (is.available() > 0);
115 }
116 }
117
118 protected void sendMessage(String channel, String message) {
119 messageSender.send(channel, message);
120 }
121
122 public static Map sendMessage(DispatchContext context, Map attr) {
123 String realm = (String) attr.get("realm");
124 String channel = (String) attr.get("channel");
125 String message = (String) attr.get("message");
126
127 CometMessengerServlet servlet = realms.get(realm);
128 if (servlet == null) {
129 return ServiceUtil.returnError("No such realm");
130 }
131
132 servlet.sendMessage(channel, message);
133
134 return ServiceUtil.returnSuccess();
135 }
136
137 public static Map receiveMessage(DispatchContext context, Map attr) {
138 // This is a NO-OP that exists for other services to subscribe SECAs
139 return ServiceUtil.returnSuccess();
140 }
141
142 protected static LocalDispatcher getDispatcher(ServletContext servletContext) {
143 LocalDispatcher dispatcher = (LocalDispatcher) servletContext.getAttribute("dispatcher");
144 if (dispatcher == null) {
145 Delegator delegator = getDelegator(servletContext);
146 dispatcher = makeWebappDispatcher(servletContext, delegator);
147 servletContext.setAttribute("dispatcher", dispatcher);
148 }
149 return dispatcher;
150 }
151
152 /** This method only sets up a dispatcher for the current webapp and passed in delegator, it does not save it to the ServletContext or anywhere else, just returns it */
153 public static LocalDispatcher makeWebappDispatcher(ServletContext servletContext, Delegator delegator) {
154 if (delegator == null) {
155 Debug.logInfo("[ContextFilter.init] ERROR: delegator not defined.", module);
156 return null;
157 }
158 // get the unique name of this dispatcher
159 String dispatcherName = servletContext.getInitParameter("localDispatcherName");
160
161 if (dispatcherName == null) {
162 Debug.logInfo("No localDispatcherName specified in the web.xml file", module);
163 dispatcherName = delegator.getDelegatorName();
164 }
165
166 LocalDispatcher dispatcher = ServiceContainer.getLocalDispatcher(dispatcherName, delegator);
167 if (dispatcher == null) {
168 Debug.logInfo("[ContextFilter.init] ERROR: dispatcher could not be initialized.", module);
169 }
170
171 return dispatcher;
172 }
173
174 public static Delegator getDelegator(ServletContext servletContext) {
175 Delegator delegator = (Delegator) servletContext.getAttribute("delegator");
176 if (delegator == null) {
177 String delegatorName = servletContext.getInitParameter("entityDelegatorName");
178
179 if (delegatorName == null || delegatorName.length() <= 0) {
180 delegatorName = "default";
181 }
182 if (Debug.verboseOn()) Debug.logVerbose("Setup Entity Engine Delegator with name " + delegatorName, module);
183 delegator = DelegatorFactory.getDelegator(delegatorName);
184 servletContext.setAttribute("delegator", delegator);
185 if (delegator == null) {
186 Debug.logInfo("[ContextFilter.init] ERROR: delegator factory returned null for delegatorName \"" + delegatorName + "\"", module);
187 }
188 }
189 return delegator;
190 }
191
192 public class MessageSender implements Runnable {
193 protected boolean running = true;
194 protected ArrayList<String> messages = new ArrayList<String>();
195 private String realm = null;
196
197 public MessageSender(String realm) {
198 this.realm = realm;
199 }
200
201 public void stop() {
202 running = false;
203 }
204
205 /**
206 * Add message for sending.
207 */
208 public void send(String channel, String message) {
209 synchronized (messages) {
210 messages.add("{\"realm\":\"" + realm + "\"," +
211 "\"channel\":\"" + channel + "\"," +
212 "\"message\":\"" + message + "\"}");
213 messages.notify();
214
215 try {
216 dispatcher.runSync("receiveCometMessage", UtilMisc.toMap("realm", realm, "message", message, "channel", channel));
217 } catch (GenericServiceException ex) {
218 Debug.logError(ex, module);
219 }
220 }
221 }
222
223 public void run() {
224 while (running) {
225 if (messages.size() == 0) {
226 try {
227 synchronized (messages) {
228 messages.wait();
229 }
230 } catch (InterruptedException e) {
231 // Ignore
232 }
233 }
234
235 synchronized (connections) {
236 String[] pendingMessages = null;
237 synchronized (messages) {
238 pendingMessages = messages.toArray(new String[0]);
239 messages.clear();
240 }
241 // Send any pending message on all the open connections
242 for (int i = 0; i < connections.size(); i++) {
243 try {
244 PrintWriter writer = connections.get(i).getWriter();
245 for (int j = 0; j < pendingMessages.length; j++) {
246 Debug.logInfo("SENDING: " + pendingMessages[j], module);
247 writer.println(pendingMessages[j]);
248 }
249 writer.flush();
250 } catch (IOException e) {
251 Debug.logInfo("IOExeption sending message" + e, module);
252 }
253 }
254 }
255 }
37 } 256 }
38 } 257 }
39 } 258 }
......
1 <?xml version="1.0"?>
2 <!DOCTYPE web-app PUBLIC "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN" "http://java.sun.com/dtd/web-app_2_3.dtd">
3
4 <!--
5 Licensed to the Apache Software Foundation (ASF) under one
6 or more contributor license agreements. See the NOTICE file
7 distributed with this work for additional information
8 regarding copyright ownership. The ASF licenses this file
9 to you under the Apache License, Version 2.0 (the
10 "License"); you may not use this file except in compliance
11 with the License. You may obtain a copy of the License at
12
13 http://www.apache.org/licenses/LICENSE-2.0
14
15 Unless required by applicable law or agreed to in writing,
16 software distributed under the License is distributed on an
17 "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
18 KIND, either express or implied. See the License for the
19 specific language governing permissions and limitations
20 under the License.
21 -->
22
23 <web-app>
24 <display-name>DriverUp API</display-name>
25 <description>DriverUp API</description>
26
27 <context-param>
28 <param-name>entityDelegatorName</param-name>
29 <param-value>default</param-value>
30 <description>The Name of the Entity Delegator to use, defined in entityengine.xml</description>
31 </context-param>
32 <context-param>
33 <param-name>localDispatcherName</param-name>
34 <param-value>cometmessenger</param-value>
35 <description>A unique name used to identify/recognize the local dispatcher for the Service Engine</description>
36 </context-param>
37
38 <servlet>
39 <servlet-name>CometMessengerServlet</servlet-name>
40 <display-name>CometMessengerServlet</display-name>
41 <description>Comet Messenger Servlet</description>
42 <servlet-class>com.brainfood.ofbiz.CometMessengerServlet</servlet-class>
43 <load-on-startup>1</load-on-startup>
44 </servlet>
45 <servlet-mapping>
46 <servlet-name>CometMessengerServlet</servlet-name>
47 <url-pattern>/messenger</url-pattern>
48 </servlet-mapping>
49 </web-app>
1 define([], function() {
2 var listeners = [];
3 var messenger = {
4 process: function() {
5 var url = "/comet/messenger"
6 var request = new XMLHttpRequest();
7 var responseLength = 0;
8 request.open("GET", url, true);
9 request.setRequestHeader("Content-Type","application/x-javascript;");
10 request.onreadystatechange = function() {
11 if (request.readyState == 3 || request.readyState == 4) {
12 if (request.status == 200){
13 if (request.responseText) {
14 var newText = request.responseText.substr(responseLength);
15 if (newText.length > 0) {
16 for (var i=0; i < listeners.length; i++) {
17 listeners[i](newText);
18 }
19 }
20 responseLength = request.responseText.length;
21 }
22 }
23 if (request.readyState == 4) {
24 if (request.status == 200) {
25 messenger.process();
26 } else {
27 console.log('Error, retrying in 5 seconds');
28 setTimeout(function() { messenger.process(); }, 5000);
29 }
30 }
31 }
32 };
33 request.send(null);
34 },
35 addListener: function(listener) {
36 listeners.push(listener);
37 },
38 removeListener: function(listener) {
39 var index = listeners.indexOf(listener);
40 if (index > -1) {
41 array.splice(index, 1);
42 }
43 }
44 }
45 messenger.process();
46
47 return messenger;
48 });