1 | package org.dcache.cells; |
2 | |
3 | import java.util.Collection; |
4 | import java.util.Map; |
5 | import java.util.HashMap; |
6 | import java.util.ArrayList; |
7 | import java.util.concurrent.CopyOnWriteArrayList; |
8 | |
9 | import java.lang.reflect.Method; |
10 | import java.lang.reflect.Constructor; |
11 | import java.lang.reflect.Field; |
12 | import java.lang.reflect.InvocationTargetException; |
13 | |
14 | import dmg.cells.nucleus.CellMessage; |
15 | |
16 | import org.dcache.util.ReflectionUtils; |
17 | |
18 | /** |
19 | * Helper class for message dispatching. Used internally in |
20 | * CellMessageDispatcher; |
21 | */ |
22 | abstract class Receiver |
23 | { |
24 | final protected Object _object; |
25 | final protected Method _method; |
26 | |
27 | public Receiver(Object object, Method method) |
28 | { |
29 | _object = object; |
30 | _method = method; |
31 | } |
32 | |
33 | abstract public Object deliver(CellMessage envelope, Object message) |
34 | throws IllegalAccessException, InvocationTargetException; |
35 | |
36 | public String toString() |
37 | { |
38 | return String.format("Object: %1$s; Method: %2$s", _object, _method); |
39 | } |
40 | } |
41 | |
42 | class ShortReceiver extends Receiver |
43 | { |
44 | public ShortReceiver(Object object, Method method) |
45 | { |
46 | super(object, method); |
47 | } |
48 | |
49 | public Object deliver(CellMessage envelope, Object message) |
50 | throws IllegalAccessException, InvocationTargetException |
51 | { |
52 | return _method.invoke(_object, message); |
53 | } |
54 | } |
55 | |
56 | |
57 | class LongReceiver extends Receiver |
58 | { |
59 | public LongReceiver(Object object, Method method) |
60 | { |
61 | super(object, method); |
62 | } |
63 | |
64 | public Object deliver(CellMessage envelope, Object message) |
65 | throws IllegalAccessException, InvocationTargetException |
66 | { |
67 | return _method.invoke(_object, envelope, message); |
68 | } |
69 | } |
70 | |
71 | |
72 | /** |
73 | * Automatic dispatch of dCache messages to message handlers. |
74 | */ |
75 | public class CellMessageDispatcher |
76 | { |
77 | /** Cached message handlers for fast dispatch. */ |
78 | private final Map<Class,Collection<Receiver>> _receivers = |
79 | new HashMap<Class,Collection<Receiver>>(); |
80 | |
81 | /** Name of receiver methods. */ |
82 | private final String _receiverName; |
83 | |
84 | /** |
85 | * Registered message listeners. |
86 | * |
87 | * @see addMessageListener |
88 | */ |
89 | private final Collection<Object> _messageListeners = |
90 | new CopyOnWriteArrayList<Object>(); |
91 | |
92 | public CellMessageDispatcher(String receiverName) |
93 | { |
94 | _receiverName = receiverName; |
95 | } |
96 | |
97 | /** |
98 | * Returns true if <code>c</code> has a method suitable for |
99 | * message delivery. |
100 | */ |
101 | private boolean hasListener(Class c) |
102 | { |
103 | for (Method m : c.getMethods()) { |
104 | if (m.getName().equals(_receiverName)) { |
105 | return true; |
106 | } |
107 | } |
108 | return false; |
109 | } |
110 | |
111 | /** |
112 | * Adds a listener for dCache messages. |
113 | * |
114 | * The object is scanned for public methods with the signature |
115 | * <code>name(Object message)</code> or <code>name(CellMessage |
116 | * envelope, Object message)</code>, where <code>name</code> is |
117 | * the receiver name, <code>envelope</code> is the envelope |
118 | * containing the message. |
119 | * |
120 | * After registration, all cell messages with a message object |
121 | * matching the type of the argument will be send to object. |
122 | * |
123 | * Message dispatching is performed in the <code>call</code> |
124 | * method. If that method is overridden in derivatives, the |
125 | * derivative must make sure that <code>call</code> is still |
126 | * called. |
127 | */ |
128 | public void addMessageListener(Object o) |
129 | { |
130 | Class c = o.getClass(); |
131 | if (hasListener(c)) { |
132 | synchronized (_receivers) { |
133 | if (_messageListeners.add(o)) { |
134 | _receivers.clear(); |
135 | } |
136 | } |
137 | } |
138 | } |
139 | |
140 | /** |
141 | * Removes a listener previously added with addMessageListener. |
142 | */ |
143 | public void removeMessageListener(Object o) |
144 | { |
145 | synchronized (_receivers) { |
146 | if (_messageListeners.remove(o)) { |
147 | _receivers.clear(); |
148 | } |
149 | } |
150 | } |
151 | |
152 | /** |
153 | * Returns the message types that can be reveived by an object of |
154 | * the given class. |
155 | */ |
156 | public Collection<Class> getMessageTypes(Object o) |
157 | { |
158 | Class c = o.getClass(); |
159 | Collection<Class> types = new ArrayList(); |
160 | |
161 | for (Method method : c.getMethods()) { |
162 | if (method.getName().equals(_receiverName)) { |
163 | Class[] parameterTypes = method.getParameterTypes(); |
164 | switch (parameterTypes.length) { |
165 | case 1: |
166 | types.add(parameterTypes[0]); |
167 | break; |
168 | case 2: |
169 | if (CellMessage.class.isAssignableFrom(parameterTypes[0])) { |
170 | types.add(parameterTypes[1]); |
171 | } |
172 | break; |
173 | } |
174 | } |
175 | } |
176 | return types; |
177 | } |
178 | |
179 | /** |
180 | * Finds the objects and methods, in other words the receivers, of |
181 | * messages of a given type. |
182 | * |
183 | * FIXME: This is still not quite the right thing: if you have |
184 | * messageArrived(CellMessage, X) and messageArrived(Y) and Y is |
185 | * more specific than X, then you would expect the latter to be |
186 | * called for message Y. This is not yet the case. |
187 | */ |
188 | private Collection<Receiver> findReceivers(Class c) |
189 | { |
190 | synchronized (_receivers) { |
191 | Collection<Receiver> receivers = new ArrayList<Receiver>(); |
192 | for (Object listener : _messageListeners) { |
193 | Method m = ReflectionUtils.resolve(listener.getClass(), |
194 | _receiverName, |
195 | CellMessage.class, c); |
196 | if (m != null) { |
197 | receivers.add(new LongReceiver(listener, m)); |
198 | continue; |
199 | } |
200 | |
201 | m = ReflectionUtils.resolve(listener.getClass(), |
202 | _receiverName, |
203 | c); |
204 | if (m != null) { |
205 | receivers.add(new ShortReceiver(listener, m)); |
206 | } |
207 | } |
208 | return receivers; |
209 | } |
210 | } |
211 | |
212 | private String multipleRepliesError(Collection<Receiver> receivers, Object message) |
213 | { |
214 | return String.format("Processing of message [%s] of type %s failed: Multiple replies were generated by %s.", message, message.getClass().getName(), receivers); |
215 | } |
216 | |
217 | /** |
218 | * Delivers messages to registered message listeners. The return |
219 | * value is determined by the following rules (in order): |
220 | * |
221 | * 1. If any message listener throws an unchecked exception other |
222 | * than IllegalArgumentException or IllegalStateException, that |
223 | * exception is rethrown. |
224 | * |
225 | * 2. If more than one message listener returns a non-null value |
226 | * or throws a checked exception, IllegalArgumentException or |
227 | * IllegalStateException, then a RuntimeException is thrown |
228 | * reporting that multiple replies have been generated. This is |
229 | * a coding error. |
230 | * |
231 | * 3. If one message listener returns a non null value then that |
232 | * value is returned. If one message listener throws a checked |
233 | * exception, IllegalArgumentException or |
234 | * IllegalStateException, then that exception is returned. |
235 | * |
236 | * 4. Otherwise null is returned. |
237 | */ |
238 | public Object call(CellMessage envelope) |
239 | { |
240 | Object message = envelope.getMessageObject(); |
241 | Class c = message.getClass(); |
242 | Collection<Receiver> receivers; |
243 | |
244 | synchronized (_receivers) { |
245 | receivers = _receivers.get(c); |
246 | if (receivers == null) { |
247 | receivers = findReceivers(c); |
248 | _receivers.put(c, receivers); |
249 | } |
250 | } |
251 | |
252 | Object result = null; |
253 | for (Receiver receiver : receivers) { |
254 | try { |
255 | Object obj = receiver.deliver(envelope, message); |
256 | if (obj != null && result != null) { |
257 | throw new RuntimeException(multipleRepliesError(receivers, message)); |
258 | } |
259 | result = obj; |
260 | } catch (IllegalAccessException e) { |
261 | throw new RuntimeException("Cannot process message due to access error", e); |
262 | } catch (InvocationTargetException e) { |
263 | Throwable cause = e.getCause(); |
264 | if (cause instanceof IllegalArgumentException || |
265 | cause instanceof IllegalStateException) { |
266 | /* We recognize these two unchecked exceptions as |
267 | * something special and report back to the |
268 | * client. |
269 | */ |
270 | } else if (cause instanceof RuntimeException) { |
271 | throw (RuntimeException)cause; |
272 | } else if (cause instanceof Error) { |
273 | throw (Error)cause; |
274 | } |
275 | |
276 | if (result != null) { |
277 | throw new RuntimeException(multipleRepliesError(receivers, message)); |
278 | } |
279 | result = cause; |
280 | } |
281 | } |
282 | |
283 | return result; |
284 | } |
285 | } |