1 | /* |
2 | * BroadcastCell.java |
3 | * |
4 | * Created on January 31, 2005, 8:32 AM |
5 | */ |
6 | |
7 | package dmg.cells.services.multicaster; |
8 | import dmg.cells.nucleus.* ; |
9 | import dmg.util.* ; |
10 | import java.util.* ; |
11 | import java.io.* ; |
12 | |
13 | import org.slf4j.Logger; |
14 | import org.slf4j.LoggerFactory; |
15 | |
16 | /** |
17 | * |
18 | * @author patrick |
19 | */ |
20 | public class BroadcastCell extends CellAdapter { |
21 | |
22 | private final static Logger _log = |
23 | LoggerFactory.getLogger(BroadcastCell.class); |
24 | |
25 | private class Entry { |
26 | |
27 | private static final int STATIC = 1 ; |
28 | private static final int CANCEL_ON_FAILURE = 2 ; |
29 | private static final int EXPIRES = 4 ; |
30 | |
31 | private CellPath _destination = null ; |
32 | private String _trigger = null ; |
33 | private int _mode = STATIC ; |
34 | private long _created = System.currentTimeMillis() ; |
35 | private long _expires = 0L ; |
36 | private long _used = 0L ; |
37 | private long _failed = 0L ; |
38 | |
39 | private Entry( CellPath destination , String trigger ){ |
40 | _destination = destination ; |
41 | _trigger = trigger ; |
42 | } |
43 | private void setCancelOnFailure( boolean cancel ){ |
44 | if(cancel){ |
45 | _mode |= CANCEL_ON_FAILURE ; |
46 | _mode &= ~ STATIC ; |
47 | } |
48 | else{ |
49 | _mode &= ~ CANCEL_ON_FAILURE ; |
50 | if( ( _mode & EXPIRES ) == 0 )_mode |= STATIC ; |
51 | } |
52 | } |
53 | private void setExpires( long expires ){ |
54 | if( expires <= 0L ){ |
55 | _expires = 0L ; |
56 | _mode &= ~ EXPIRES ; |
57 | if( ( _mode & CANCEL_ON_FAILURE ) == 0 )_mode |= STATIC ; |
58 | }else{ |
59 | _expires = expires ; |
60 | _mode |= EXPIRES ; |
61 | _mode &= ~ STATIC ; |
62 | } |
63 | } |
64 | private String getTrigger(){ return _trigger ; } |
65 | private CellPath getPath(){ return _destination ; } |
66 | public String toString(){ |
67 | StringBuffer sb = new StringBuffer() ; |
68 | sb.append( "[").append(_trigger). |
69 | append(";").append(_destination.toString() ) ; |
70 | sb.append(";("+_used+","+_failed+")"); |
71 | sb.append(";mode="); |
72 | sb.append( isValid() ? "V" : "X" ) ; |
73 | if( ( _mode & STATIC ) != 0 )sb.append("S"); |
74 | if( ( _mode & CANCEL_ON_FAILURE ) != 0 )sb.append("C"); |
75 | if( ( _mode & EXPIRES ) != 0 ){ |
76 | long rest = _expires - System.currentTimeMillis() ; |
77 | rest = ( rest <= 0L ) ? 0L : ( rest / 1000L ) ; |
78 | sb.append("E;ex=").append(rest) ; |
79 | }else{ |
80 | sb.append(";"); |
81 | } |
82 | sb.append("]"); |
83 | return sb.toString() ; |
84 | } |
85 | public boolean isValid(){ |
86 | if( ( _mode & STATIC ) != 0 )return true ; |
87 | if( ( ( _mode & EXPIRES ) != 0 ) && |
88 | ( _expires < System.currentTimeMillis() ) )return false ; |
89 | return true ; |
90 | } |
91 | public boolean isCancelOnFailure(){ return ( _mode & CANCEL_ON_FAILURE) != 0 ; } |
92 | public boolean equals( Object obj ){ |
93 | |
94 | if( ! (obj instanceof Entry) ) return false; |
95 | Entry other = (Entry)obj ; |
96 | return other._destination.equals(this._destination) && |
97 | other._trigger.equals( this._trigger ) ; |
98 | } |
99 | public int hashCode(){ |
100 | return (_destination.toString()+_trigger).hashCode(); |
101 | } |
102 | } |
103 | private CellNucleus _nucleus = null ; |
104 | private Args _args = null ; |
105 | |
106 | private HashMap _eventClassMap = new HashMap() ; |
107 | private HashMap _destinationMap = new HashMap() ; |
108 | private boolean _debug = false ; |
109 | private String _debugMode = null ; |
110 | private long _received = 0L ; |
111 | private long _forwarded = 0L ; |
112 | private long _sent = 0L ; |
113 | private Debugging _debugging = new Debugging() ; |
114 | |
115 | |
116 | /** Creates a new instance of BroadcastCell */ |
117 | public BroadcastCell(String name , String args ) { |
118 | super( name , args , false ) ; |
119 | _args = getArgs() ; |
120 | _nucleus = getNucleus() ; |
121 | |
122 | _debugMode = _args.getOpt("debug") ; |
123 | if( _debugMode != null ){ |
124 | _debug = true ; |
125 | addCommandListener(_debugging); |
126 | } |
127 | export() ; |
128 | start() ; |
129 | } |
130 | public String hh_ls = "" ; |
131 | public String ac_ls( Args args ){ |
132 | synchronized( this ){ |
133 | StringBuffer sb = new StringBuffer() ; |
134 | Iterator i = _eventClassMap.entrySet().iterator() ; |
135 | while( i.hasNext() ){ |
136 | |
137 | Map.Entry entry = (Map.Entry) i.next() ; |
138 | String key = (String)entry.getKey() ; |
139 | sb.append( key ).append("\n"); |
140 | Map map = (Map)entry.getValue() ; |
141 | Iterator j = map.entrySet().iterator() ; |
142 | while( j.hasNext() ){ |
143 | Map.Entry me = (Map.Entry) j.next() ; |
144 | CellAddressCore path = (CellAddressCore)me.getKey() ; |
145 | Entry e = (Entry)me.getValue() ; |
146 | sb.append(" ").append(path.toString()). |
147 | append(" ").append(e.toString()).append("\n"); |
148 | } |
149 | } |
150 | return sb.toString(); |
151 | } |
152 | } |
153 | private class OptionClass { |
154 | |
155 | private long expires = -1 ; |
156 | private boolean failures = false ; |
157 | private String eventClass = null ; |
158 | private String destination = null ; |
159 | |
160 | private OptionClass( Args args ){ |
161 | |
162 | eventClass = args.argv(0); |
163 | destination = args.argc() > 1 ? args.argv(1) : null ; |
164 | |
165 | String tmp = args.getOpt("expires") ; |
166 | if( tmp != null )expires = Long.parseLong(tmp)*1000L + System.currentTimeMillis() ; |
167 | |
168 | tmp = args.getOpt("cancelonfailure") ; |
169 | if( tmp != null ){ |
170 | if( tmp.equals("") ){ |
171 | failures = true ; |
172 | }else if( tmp.equals("on" ) ){ |
173 | failures = true ; |
174 | }else if( tmp.equals("off") ){ |
175 | failures = false ; |
176 | }else{ |
177 | throw new |
178 | IllegalArgumentException("-cancelonfailure=[on|off]"); |
179 | } |
180 | } |
181 | } |
182 | |
183 | } |
184 | public String hh_register = |
185 | "<classEvent> <cellPath> [-send] [-expires=<seconds>] [-cancelonfailure=[on|off]]" ; |
186 | public String ac_register_$_2( Args args ) throws Exception { |
187 | try{ |
188 | OptionClass options = new OptionClass( args ) ; |
189 | |
190 | synchronized( this ){ |
191 | Entry entry = register( new CellPath( options.destination ) , options.eventClass ) ; |
192 | entry.setCancelOnFailure(options.failures) ; |
193 | if( options.expires > 0L )entry.setExpires(options.expires); |
194 | } |
195 | }catch(Exception ee ){ |
196 | _log.warn(ee.toString(), ee); |
197 | throw ee ; |
198 | } |
199 | return "" ; |
200 | } |
201 | public String hh_modify = |
202 | "<classEvent> <cellPath> [-expires=<seconds>] [-cancelonfailure=[on|off]]" ; |
203 | public String ac_modify_$_2( Args args ){ |
204 | |
205 | OptionClass options = new OptionClass( args ) ; |
206 | |
207 | Entry entry = null ; |
208 | synchronized( this ){ |
209 | entry = get( new CellPath( options.destination ) , options.eventClass ) ; |
210 | if( entry == null ) |
211 | throw new |
212 | IllegalArgumentException("Entry not found"); |
213 | entry.setCancelOnFailure(options.failures) ; |
214 | if( options.expires > 0L )entry.setExpires(options.expires); |
215 | } |
216 | return entry.toString() ; |
217 | } |
218 | public String hh_unregister = "<classEvent> <cellPath> [-send]" ; |
219 | public String ac_unregister_$_2( Args args ) throws Exception { |
220 | |
221 | OptionClass options = new OptionClass( args ) ; |
222 | Entry e = unregister( new CellPath(options.destination) , options.eventClass ) ; |
223 | return "" ; |
224 | } |
225 | private synchronized Entry get( CellPath destination , String eventClass ){ |
226 | |
227 | CellAddressCore core = destination.getDestinationAddress() ; |
228 | |
229 | Map map = (Map)_eventClassMap.get( eventClass ) ; |
230 | if( map == null )return null ; |
231 | return (Entry) map.get( core ) ; |
232 | } |
233 | private synchronized Entry register( CellPath destination , String eventClass ){ |
234 | |
235 | CellAddressCore core = destination.getDestinationAddress() ; |
236 | |
237 | Entry e = new Entry( destination , eventClass ) ; |
238 | |
239 | Map map = (Map)_eventClassMap.get( eventClass ) ; |
240 | if( map == null ){ |
241 | _eventClassMap.put( eventClass , map = new HashMap() ) ; |
242 | }else{ |
243 | if( map.get( core ) != null ) |
244 | throw new |
245 | IllegalArgumentException("Duplicated entry : "+e) ; |
246 | } |
247 | map.put( core , e ) ; |
248 | |
249 | map = (Map)_destinationMap.get( core ) ; |
250 | if( map == null )_destinationMap.put( core , map = new HashMap() ) ; |
251 | map.put( eventClass , e ) ; |
252 | |
253 | return e ; |
254 | } |
255 | private synchronized Entry unregister( CellPath destination , String eventClass ){ |
256 | |
257 | CellAddressCore core = destination.getDestinationAddress() ; |
258 | |
259 | Map map = (Map)_eventClassMap.get( eventClass ) ; |
260 | if( map == null ) |
261 | throw new |
262 | NoSuchElementException("Not an entry "+core+"/"+eventClass); |
263 | |
264 | Entry e = (Entry) map.remove( core ) ; |
265 | if( e == null ) |
266 | throw new |
267 | NoSuchElementException("Not an entry "+core+"/"+eventClass); |
268 | |
269 | if( map.size() == 0 )_eventClassMap.remove( eventClass ) ; |
270 | |
271 | |
272 | map = (Map)_destinationMap.get( core ) ; |
273 | if( map == null ) |
274 | throw new |
275 | NoSuchElementException("PANIC : inconsitent db : "+core+"/"+eventClass); |
276 | |
277 | e = (Entry)map.remove( eventClass ) ; |
278 | if( map.size() == 0 )_destinationMap.remove( core ) ; |
279 | |
280 | return e ; |
281 | |
282 | } |
283 | public String hh_send = "[<class>]"; |
284 | public String ac_send_$_0_1(Args args ) throws Exception { |
285 | Object obj = null ; |
286 | if( args.argc() == 0 ){ |
287 | obj = new ArrayList() ; |
288 | }else{ |
289 | Class c = Class.forName( args.argv(0) ) ; |
290 | obj = c.newInstance() ; |
291 | } |
292 | CellMessage msg = new CellMessage( |
293 | new CellPath("broadcast"), |
294 | obj ); |
295 | sendMessage(msg); |
296 | return "" ; |
297 | } |
298 | |
299 | private void handleBroadcastCommandMessage( CellMessage msg , BroadcastCommandMessage command ){ |
300 | if( ! ( command instanceof BroadcastEventCommandMessage ) )return ; |
301 | BroadcastEventCommandMessage event = (BroadcastEventCommandMessage)command ; |
302 | try{ |
303 | String eventClass = event.getEventClass() ; |
304 | CellPath target = event.getTarget() ; |
305 | if( target == null ){ |
306 | target = (CellPath)msg.getSourcePath().clone() ; |
307 | target.revert() ; |
308 | } |
309 | if( event instanceof BroadcastRegisterMessage ){ |
310 | BroadcastRegisterMessage reg = (BroadcastRegisterMessage)event ; |
311 | _log.info("Message register : "+reg); |
312 | synchronized( this ){ |
313 | Entry entry = get( target , eventClass ) ; |
314 | if( entry == null )entry = register( target , eventClass ) ; |
315 | |
316 | if( reg.isCancelOnFailure() )entry.setCancelOnFailure(true); |
317 | long expires = reg.getExpires() ; |
318 | if( expires > 0 )entry.setExpires( expires ) ; |
319 | } |
320 | }else if( event instanceof BroadcastUnregisterMessage ){ |
321 | BroadcastUnregisterMessage unreg = (BroadcastUnregisterMessage)event ; |
322 | _log.info("Message unregister : "+unreg); |
323 | |
324 | unregister( target , eventClass ) ; |
325 | |
326 | }else{ |
327 | throw new |
328 | IllegalArgumentException("Not a valid Broadcast command " +event.getClass()); |
329 | } |
330 | }catch(Exception ee ){ |
331 | _log.warn("Problem with {"+command+"}"+ee, ee); |
332 | event.setReturnValues(1,ee); |
333 | } |
334 | msg.revertDirection() ; |
335 | try{ |
336 | sendMessage(msg); |
337 | }catch(Exception ee ){ |
338 | _log.warn("Couldn't reply : "+ee); |
339 | } |
340 | } |
341 | public void getInfo( PrintWriter pw ){ |
342 | pw.println( " CellName : "+getCellName()); |
343 | pw.println( " CellClass : "+this.getClass().getName()) ; |
344 | pw.println( " Version : $Id: BroadcastCell.java,v 1.8 2006-12-15 11:09:37 tigran Exp $"); |
345 | pw.println(" Destinations : "+_destinationMap.size() ) ; |
346 | pw.println(" Event Classes : "+_eventClassMap.size() ); |
347 | pw.println(" Packets received : "+_received); |
348 | pw.println(" Packets sent : "+ _sent ) ; |
349 | pw.println("Packets forwarded : "+_forwarded ) ; |
350 | |
351 | } |
352 | public void messageArrived( CellMessage message ){ |
353 | _log.info("messageArrived : "+message); |
354 | _received ++ ; |
355 | if( _debug ){ |
356 | _debugging.messageArrived( message ) ; |
357 | return ; |
358 | } |
359 | |
360 | Object obj = message.getMessageObject() ; |
361 | if( obj instanceof BroadcastCommandMessage ){ |
362 | handleBroadcastCommandMessage( message , (BroadcastCommandMessage)obj ) ; |
363 | return ; |
364 | }else if( obj instanceof NoRouteToCellException ){ |
365 | NoRouteToCellException nrtc = (NoRouteToCellException)obj ; |
366 | handleNoRouteException( nrtc ) ; |
367 | return ; |
368 | } |
369 | // |
370 | // slit incoming object (classes) into subclasses and interfaces. |
371 | // |
372 | ArrayList classList = new ArrayList() ; |
373 | for( Class o = obj.getClass() ; o != null ; ){ |
374 | classList.add(o.getName()); |
375 | Class [] il = o.getInterfaces() ; |
376 | for( int i = 0 ; i < il.length ; i++){ |
377 | classList.add( il[i].getName() ) ; |
378 | } |
379 | o = o.getSuperclass() ; |
380 | } |
381 | _log.info("Message arrived "+obj.getClass().getName()); |
382 | Iterator i = classList.iterator() ; |
383 | while( i.hasNext() ){ |
384 | String eventClass = i.next().toString() ; |
385 | //_log.info("Checking : "+eventClass); |
386 | forwardMessage( message , eventClass ) ; |
387 | } |
388 | } |
389 | public void messageToForward( CellMessage message ){ |
390 | _log.info("FORWARD: "+message); |
391 | _forwarded ++ ; |
392 | Object obj = message.getMessageObject() ; |
393 | if( ( obj != null ) && ( obj instanceof NoRouteToCellException ) ){ |
394 | NoRouteToCellException nrtc = (NoRouteToCellException)obj ; |
395 | handleNoRouteException( nrtc ) ; |
396 | return ; |
397 | } |
398 | super.messageToForward(message); |
399 | } |
400 | private synchronized void forwardMessage( CellMessage message , String classEvent ){ |
401 | Map map = (Map)_eventClassMap.get(classEvent); |
402 | if( map == null ){ |
403 | // _log.info("forwardMessage : Not found in eventClassMap : "+classEvent); |
404 | return ; |
405 | } |
406 | ArrayList list = new ArrayList() ; |
407 | CellPath dest = message.getDestinationPath() ; |
408 | for( Iterator i = map.entrySet().iterator() ; i.hasNext() ; ){ |
409 | |
410 | Map.Entry mapentry = (Map.Entry)i.next() ; |
411 | CellPath origin = (CellPath)dest.clone() ; |
412 | Entry entry = (Entry)mapentry.getValue() ; |
413 | if( ! entry.isValid() ){ |
414 | list.add(entry); |
415 | continue ; |
416 | } |
417 | entry._used++ ; |
418 | // |
419 | // add the (entry) path to our destination and |
420 | // skip ourself. |
421 | // |
422 | origin.add(entry.getPath()); |
423 | origin.next(); |
424 | |
425 | CellMessage msg = new CellMessage( origin , message.getMessageObject() ) ; |
426 | msg.setUOID( message.getUOID() ) ; |
427 | // |
428 | // make sure a reply will find its way back. |
429 | // |
430 | msg.getSourcePath().add( message.getSourcePath() ); |
431 | try{ |
432 | _log.info("forwardMessage : "+classEvent+" forwarding to "+origin); |
433 | sendMessage(msg); |
434 | _sent++ ; |
435 | }catch(Exception ee ){ |
436 | _log.warn("forwardMessage : FAILED "+classEvent+" forwarding to "+origin+" "+ee); |
437 | if( entry.isCancelOnFailure() )list.add( entry ) ; |
438 | entry._failed ++ ; |
439 | } |
440 | } |
441 | unregister(list); |
442 | |
443 | } |
444 | private void handleNoRouteException( NoRouteToCellException nrtc ){ |
445 | CellPath destination = nrtc.getDestinationPath() ; |
446 | _log.warn("NoRouteToCell : "+nrtc); |
447 | // |
448 | // find matching destinations |
449 | // |
450 | ArrayList list = new ArrayList() ; |
451 | synchronized( this ){ |
452 | Map map = (Map)_destinationMap.get(destination.getDestinationAddress()); |
453 | if( map == null ){ |
454 | _log.warn("Exception path not found in map : "+destination); |
455 | return ; |
456 | } |
457 | for( Iterator i = map.values().iterator() ; i.hasNext() ; ){ |
458 | Entry e = (Entry)i.next() ; |
459 | if( e.isCancelOnFailure() ){ |
460 | _log.info("Scheduling for cancelation : "+e); |
461 | list.add(e); |
462 | } |
463 | } |
464 | unregister( list ) ; |
465 | |
466 | } |
467 | return ; |
468 | |
469 | } |
470 | private void unregister( List list ){ |
471 | for( Iterator i = list.iterator() ; i.hasNext() ; ){ |
472 | Entry e = (Entry)i.next() ; |
473 | try{ |
474 | unregister( e.getPath() , e.getTrigger() ) ; |
475 | }catch(NoSuchElementException nse){ |
476 | _log.warn("PANIC : Couldn't unregister "+e); |
477 | } |
478 | } |
479 | return ; |
480 | } |
481 | |
482 | /* |
483 | * |
484 | ** DEBUG PART |
485 | */ |
486 | private class Debugging { |
487 | private void messageArrived( CellMessage message ){ |
488 | Object obj = message.getMessageObject() ; |
489 | if( _debugMode.equals("source") ){ |
490 | _log.info("MessageObject : "+obj ) ; |
491 | }else if( _debugMode.equals("destination" ) ){ |
492 | if( obj instanceof BroadcastCommandMessage ){ |
493 | _log.info("Broadcast Message answer : "+obj ) ; |
494 | return ; |
495 | } |
496 | _log.info("Replying MessageObject : "+obj ) ; |
497 | message.revertDirection() ; |
498 | try{ |
499 | sendMessage(message); |
500 | }catch(Exception ee){ |
501 | _log.warn("Problems sending : "+message+"("+ee+")"); |
502 | } |
503 | } |
504 | return ; |
505 | } |
506 | } |
507 | public String hh_d_reg = "<eventClass> [<destination>] [-cancelonfailure] [-expires=<time>]" ; |
508 | public String hh_d_unreg = "<eventClass> [<destination>]" ; |
509 | public String hh_d_send = "<javaClass> [-destination=<cellName>] [-wait]"; |
510 | |
511 | public String ac_d_reg_$_1_2( Args args ) throws Exception { |
512 | |
513 | OptionClass options = new OptionClass(args) ; |
514 | |
515 | CellPath path = options.destination == null ? null : new CellPath(options.destination); |
516 | BroadcastRegisterMessage cmd = new BroadcastRegisterMessage(options.eventClass,path); |
517 | cmd.setCancelOnFailure(options.failures); |
518 | cmd.setExpires(options.expires); |
519 | |
520 | CellMessage msg = new CellMessage( new CellPath("broadcast"), cmd ) ; |
521 | |
522 | sendMessage(msg); |
523 | |
524 | return "" ; |
525 | } |
526 | public String ac_d_unreg_$_1_2( Args args ) throws Exception { |
527 | |
528 | OptionClass options = new OptionClass(args) ; |
529 | |
530 | CellPath path = options.destination == null ? null : new CellPath(options.destination); |
531 | BroadcastUnregisterMessage cmd = new BroadcastUnregisterMessage(options.eventClass,path); |
532 | |
533 | CellMessage msg = new CellMessage( new CellPath("broadcast"), cmd ) ; |
534 | |
535 | sendMessage(msg); |
536 | |
537 | return "" ; |
538 | } |
539 | public String ac_d_send_$_0_1( Args args ) throws Exception { |
540 | |
541 | Object obj = args.argc() == 0 ? |
542 | new ArrayList() : |
543 | Class.forName( args.argv(0) ).newInstance() ; |
544 | |
545 | String dest = args.getOpt("destination") ; |
546 | |
547 | CellMessage msg = new CellMessage( |
548 | new CellPath(dest==null?"broadcast":dest), |
549 | obj ); |
550 | sendMessage(msg); |
551 | return "" ; |
552 | |
553 | } |
554 | |
555 | |
556 | } |