1 | package org.dcache.cells; |
2 | |
3 | import java.io.Serializable; |
4 | |
5 | import dmg.cells.nucleus.CellEndpoint; |
6 | import dmg.cells.nucleus.CellMessage; |
7 | import dmg.cells.nucleus.CellPath; |
8 | import dmg.cells.nucleus.CellMessageAnswerable; |
9 | import dmg.cells.nucleus.NoRouteToCellException; |
10 | |
11 | import diskCacheV111.vehicles.Message; |
12 | import diskCacheV111.util.CacheException; |
13 | import diskCacheV111.util.TimeoutCacheException; |
14 | import org.dcache.util.CacheExceptionFactory; |
15 | |
16 | /** |
17 | * Stub class for common cell communication patterns. An instance |
18 | * of the template class encapsulates properties such as the |
19 | * destination and timeout for communication. |
20 | * |
21 | * Operations are aware of the dCache Message class and the |
22 | * CacheException class, and are able to interpret dCache error |
23 | * messages. |
24 | */ |
25 | public class CellStub |
26 | implements CellMessageSender |
27 | { |
28 | private CellEndpoint _endpoint; |
29 | private CellPath _destination; |
30 | private long _timeout = 30000; |
31 | private boolean _retryOnNoRouteToCell; |
32 | |
33 | public CellStub() |
34 | { |
35 | } |
36 | |
37 | public CellStub(CellEndpoint endpoint) |
38 | { |
39 | setCellEndpoint(endpoint); |
40 | } |
41 | |
42 | public CellStub(CellEndpoint endpoint, CellPath destination) |
43 | { |
44 | this(endpoint); |
45 | setDestinationPath(destination); |
46 | } |
47 | |
48 | public CellStub(CellEndpoint endpoint, CellPath destination, long timeout) |
49 | { |
50 | this(endpoint, destination); |
51 | setTimeout(timeout); |
52 | } |
53 | |
54 | @Override |
55 | public void setCellEndpoint(CellEndpoint endpoint) |
56 | { |
57 | _endpoint = endpoint; |
58 | } |
59 | |
60 | public void setDestination(String destination) |
61 | { |
62 | setDestinationPath(new CellPath(destination)); |
63 | } |
64 | |
65 | public void setDestinationPath(CellPath destination) |
66 | { |
67 | _destination = destination; |
68 | } |
69 | |
70 | public CellPath getDestinationPath() |
71 | { |
72 | return _destination; |
73 | } |
74 | |
75 | /** |
76 | * Sets the communication timeout of the stub. |
77 | * @param timeout the timeout in milliseconds |
78 | */ |
79 | public void setTimeout(long timeout) |
80 | { |
81 | _timeout = timeout; |
82 | } |
83 | |
84 | /** |
85 | * Returns the communication timeout of the stub. |
86 | */ |
87 | public long getTimeout() |
88 | { |
89 | return _timeout; |
90 | } |
91 | |
92 | /** |
93 | * Set the value of the retryOnNoRouteCell property, which |
94 | * determines whether to retry on failure to route the message to |
95 | * the destination. |
96 | * |
97 | * If set to false, failure to send the message will cause a |
98 | * TimeoutCacheException to be reported right away. If set to |
99 | * true, failure to send the message to the destination cell will |
100 | * be retried until the timeout has been reached. Once the timeout |
101 | * is reached, a TimeoutCacheException is thrown. This is useful |
102 | * for destinations for which communication failure is known to be |
103 | * temporary. |
104 | * |
105 | * Limitations: This property currently only has an effect on the |
106 | * sendAndWait method. Asynchronous message delivery always |
107 | * reports a no route error in case of communication failure. |
108 | */ |
109 | public void setRetryOnNoRouteToCell(boolean retry) |
110 | { |
111 | _retryOnNoRouteToCell = retry; |
112 | } |
113 | |
114 | /** |
115 | * Returns the value of the retryOnNoRouteCell property, which |
116 | * determines whether to retry on failure to route the message to |
117 | * the destination. |
118 | */ |
119 | public boolean getRetryOnNoRouteToCell() |
120 | { |
121 | return _retryOnNoRouteToCell; |
122 | } |
123 | |
124 | /** |
125 | * Sends a message and waits for the reply. The reply is expected |
126 | * to contain a message object of the same type as the message |
127 | * object that was sent, and the return code of that message is |
128 | * expected to be zero. If either is not the case, an exception is |
129 | * thrown. |
130 | * |
131 | * @param msg the message object to send |
132 | * @return the message object from the reply |
133 | * @throws InterruptedException If the thread is interrupted |
134 | * @throws CacheException If the message could not be sent, a |
135 | * timeout occurred, the object in the reply was of the wrong |
136 | * type, or the return code was non-zero. |
137 | */ |
138 | public <T extends Message> T sendAndWait(T msg) |
139 | throws CacheException, InterruptedException |
140 | { |
141 | return sendAndWait(msg, _timeout); |
142 | } |
143 | |
144 | /** |
145 | * Sends a message and waits for the reply. The reply is expected |
146 | * to contain a message object of the same type as the message |
147 | * object that was sent, and the return code of that message is |
148 | * expected to be zero. If either is not the case, an exception is |
149 | * thrown. |
150 | * |
151 | * @param msg the message object to send |
152 | * @param timeout in milliseconds to wait for a reply |
153 | * @return the message object from the reply |
154 | * @throws InterruptedException If the thread is interrupted |
155 | * @throws CacheException If the message could not be sent, a |
156 | * timeout occurred, the object in the reply was of the wrong |
157 | * type, or the return code was non-zero. |
158 | */ |
159 | public <T extends Message> T sendAndWait(T msg, long timeout) |
160 | throws CacheException, InterruptedException |
161 | { |
162 | return sendAndWait(_destination, msg, timeout); |
163 | } |
164 | |
165 | /** |
166 | * Sends a message and waits for the reply. The reply is expected |
167 | * to contain a message object of the same type as the message |
168 | * object that was sent, and the return code of that message is |
169 | * expected to be zero. If either is not the case, an exception is |
170 | * thrown. |
171 | * |
172 | * @param path the destination cell |
173 | * @param msg the message object to send |
174 | * @return the message object from the reply |
175 | * @throws InterruptedException If the thread is interrupted |
176 | * @throws CacheException If the message could not be sent, a |
177 | * timeout occurred, the object in the reply was of the wrong |
178 | * type, or the return code was non-zero. |
179 | */ |
180 | public <T extends Message> T sendAndWait(CellPath path, T msg) |
181 | throws CacheException, InterruptedException |
182 | { |
183 | msg.setReplyRequired(true); |
184 | |
185 | T reply = (T) sendAndWait(path, msg, msg.getClass()); |
186 | |
187 | if (reply.getReturnCode() != 0) { |
188 | throw CacheExceptionFactory.exceptionOf(reply); |
189 | } |
190 | |
191 | return reply; |
192 | } |
193 | |
194 | /** |
195 | * Sends a message and waits for the reply. The reply is expected |
196 | * to contain a message object of the specified type. If this is |
197 | * not the case, an exception is thrown. |
198 | * |
199 | * @param msg the message object to send |
200 | * @param type the expected type of the reply |
201 | * @return the message object from the reply |
202 | * @throws InterruptedException If the thread is interrupted |
203 | * @throws CacheException If the message could not be sent, a |
204 | * timeout occurred, or the object in the reply was of the |
205 | * wrong type. |
206 | */ |
207 | public <T extends Serializable> T |
208 | sendAndWait(Serializable msg, Class<T> type) |
209 | throws CacheException, InterruptedException |
210 | { |
211 | return sendAndWait(_destination, msg, type); |
212 | } |
213 | |
214 | |
215 | public <T extends Serializable> T sendAndWait(CellPath path, |
216 | Serializable msg, |
217 | Class<T> type) |
218 | throws CacheException, InterruptedException |
219 | { |
220 | return sendAndWait(path, msg, type, _timeout); |
221 | } |
222 | |
223 | |
224 | /** |
225 | * Sends a message and waits for the reply. The reply is expected |
226 | * to contain a message object of the specified type. If this is |
227 | * not the case, an exception is thrown. |
228 | * |
229 | * @param msg the message object to send |
230 | * @param type the expected type of the reply |
231 | * @param timeout the time to wait for the reply |
232 | * @return the message object from the reply |
233 | * @throws InterruptedException If the thread is interrupted |
234 | * @throws CacheException If the message could not be sent, a |
235 | * timeout occurred, or the object in the reply was of the |
236 | * wrong type. |
237 | */ |
238 | public <T extends Serializable> T |
239 | sendAndWait(Serializable msg, Class<T> type, long timeout) |
240 | throws CacheException, InterruptedException |
241 | { |
242 | return sendAndWait(_destination, msg, type, timeout); |
243 | } |
244 | |
245 | |
246 | |
247 | /** |
248 | * Sends a message and waits for the reply. The reply is expected |
249 | * to contain a message object of the same type as the message |
250 | * object that was sent, and the return code of that message is |
251 | * expected to be zero. If either is not the case, an exception is |
252 | * thrown. |
253 | * |
254 | * @param path the destination cell |
255 | * @param msg the message object to send |
256 | * @param timeout the time to wait for the reply |
257 | * @return the message object from the reply |
258 | * @throws InterruptedException If the thread is interrupted |
259 | * @throws CacheException If the message could not be sent, a |
260 | * timeout occurred, the object in the reply was of the wrong |
261 | * type, or the return code was non-zero. |
262 | */ |
263 | public <T extends Message> T sendAndWait(CellPath path, T msg, long timeout) |
264 | throws CacheException, InterruptedException |
265 | { |
266 | msg.setReplyRequired(true); |
267 | |
268 | T reply = (T) sendAndWait(path, msg, msg.getClass(), timeout); |
269 | |
270 | if (reply.getReturnCode() != 0) { |
271 | throw CacheExceptionFactory.exceptionOf(reply); |
272 | } |
273 | |
274 | return reply; |
275 | } |
276 | |
277 | |
278 | /** |
279 | * Sends a message and waits for the reply. The reply is expected |
280 | * to contain a message object of the specified type. If this is |
281 | * not the case, an exception is thrown. |
282 | * |
283 | * @param path the destination cell |
284 | * @param msg the message object to send |
285 | * @param type the expected type of the reply |
286 | * @param timeout the time to wait for the reply |
287 | * @return the message object from the reply |
288 | * @throws InterruptedException If the thread is interrupted |
289 | * @throws CacheException If the message could not be sent, a |
290 | * timeout occurred, or the object in the reply was of the |
291 | * wrong type. |
292 | */ |
293 | public <T extends Serializable> T sendAndWait(CellPath path, |
294 | Serializable msg, |
295 | Class<T> type, |
296 | long timeout) |
297 | throws CacheException, InterruptedException |
298 | { |
299 | CellMessage replyMessage; |
300 | try { |
301 | CellMessage envelope = new CellMessage(path, msg); |
302 | if (_retryOnNoRouteToCell) { |
303 | replyMessage = |
304 | _endpoint.sendAndWaitToPermanent(envelope, timeout); |
305 | } else { |
306 | replyMessage = |
307 | _endpoint.sendAndWait(envelope, timeout); |
308 | } |
309 | } catch (NoRouteToCellException e) { |
310 | /* From callers point of view a timeout due to a lost |
311 | * message or a missing route to the destination is pretty |
312 | * much the same, so we report this as a timeout. The |
313 | * error message gives the details. |
314 | */ |
315 | throw new TimeoutCacheException(e.getMessage()); |
316 | } |
317 | |
318 | if (replyMessage == null) { |
319 | String errmsg = String.format("Request to %s timed out.", path); |
320 | throw new TimeoutCacheException(errmsg); |
321 | } |
322 | |
323 | Object replyObject = replyMessage.getMessageObject(); |
324 | if (!(type.isInstance(replyObject))) { |
325 | String errmsg = "Got unexpected message of class " + |
326 | replyObject.getClass() + " from " + |
327 | replyMessage.getSourceAddress(); |
328 | throw new CacheException(CacheException.UNEXPECTED_SYSTEM_EXCEPTION, |
329 | errmsg); |
330 | } |
331 | |
332 | return (T)replyObject; |
333 | } |
334 | |
335 | /** |
336 | * Sends <code>message</code> asynchronously, expecting a result |
337 | * of type <code>type</code>. The result is delivered to |
338 | * <code>callback</code>. |
339 | */ |
340 | public <T extends Serializable> void send(Serializable message, |
341 | Class<T> type, |
342 | MessageCallback<T> callback) |
343 | { |
344 | if (_destination == null) |
345 | throw new IllegalStateException("Destination must be specified"); |
346 | send(_destination, message, type, callback); |
347 | } |
348 | |
349 | /** |
350 | * Sends <code>message</code> asynchronously to |
351 | * <code>destination</code>, expecting a result of type |
352 | * <code>type</code>. The result is delivered to |
353 | * <code>callback</code>. |
354 | */ |
355 | public <T extends Serializable> void send(CellPath destination, |
356 | Serializable message, |
357 | Class<T> type, |
358 | MessageCallback<T> callback) |
359 | { |
360 | if (message instanceof Message) { |
361 | ((Message) message).setReplyRequired(true); |
362 | } |
363 | _endpoint.sendMessage(new CellMessage(destination, message), |
364 | new CellCallback<T>(type, callback), |
365 | _timeout); |
366 | } |
367 | |
368 | /** |
369 | * Sends <code>message</code> to <code>destination</code>. |
370 | */ |
371 | public void send(Serializable message) |
372 | throws NoRouteToCellException |
373 | { |
374 | if (_destination == null) { |
375 | throw new IllegalStateException("Destination must be specified"); |
376 | } |
377 | send(_destination, message); |
378 | } |
379 | |
380 | |
381 | /** |
382 | * Sends <code>message</code> to <code>destination</code>. |
383 | */ |
384 | public void send(CellPath destination, Serializable message) |
385 | throws NoRouteToCellException |
386 | { |
387 | _endpoint.sendMessage(new CellMessage(destination, message)); |
388 | } |
389 | |
390 | /** |
391 | * Adapter class to wrap MessageCallback in CellMessageAnswerable. |
392 | */ |
393 | static class CellCallback<T> implements CellMessageAnswerable |
394 | { |
395 | private final MessageCallback<T> _callback; |
396 | private final Class<T> _type; |
397 | |
398 | CellCallback(Class<T> type, MessageCallback<T> callback) |
399 | { |
400 | _callback = callback; |
401 | _type = type; |
402 | } |
403 | |
404 | @Override |
405 | public void answerArrived(CellMessage request, CellMessage answer) |
406 | { |
407 | Object o = answer.getMessageObject(); |
408 | if (_type.isInstance(o)) { |
409 | if (o instanceof Message) { |
410 | Message msg = (Message) o; |
411 | int rc = msg.getReturnCode(); |
412 | if (rc == 0) { |
413 | _callback.success(_type.cast(o)); |
414 | } else { |
415 | _callback.failure(rc, msg.getErrorObject()); |
416 | } |
417 | } else { |
418 | _callback.success(_type.cast(o)); |
419 | } |
420 | } else if (o instanceof Exception) { |
421 | exceptionArrived(request, (Exception) o); |
422 | } else { |
423 | _callback.failure(CacheException.UNEXPECTED_SYSTEM_EXCEPTION, |
424 | "Unexpected reply: " + o); |
425 | } |
426 | } |
427 | |
428 | @Override |
429 | public void answerTimedOut(CellMessage request) |
430 | { |
431 | _callback.timeout(); |
432 | } |
433 | |
434 | @Override |
435 | public void exceptionArrived(CellMessage request, Exception exception) |
436 | { |
437 | if (exception instanceof NoRouteToCellException) { |
438 | _callback.noroute(); |
439 | } else if (exception instanceof CacheException) { |
440 | CacheException e = (CacheException) exception; |
441 | _callback.failure(e.getRc(), |
442 | CacheExceptionFactory.exceptionOf(e.getRc(), e.getMessage())); |
443 | } else { |
444 | _callback.failure(CacheException.UNEXPECTED_SYSTEM_EXCEPTION, |
445 | exception.toString()); |
446 | } |
447 | } |
448 | } |
449 | } |