EMMA Coverage Report (generated Mon Aug 23 17:21:34 CEST 2010)
[all classes][dmg.cells.nucleus]

COVERAGE SUMMARY FOR SOURCE FILE [CellNucleus.java]

nameclass, %method, %block, %line, %
CellNucleus.java100% (5/5)43%  (43/99)42%  (815/1921)42%  (177/422)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class CellNucleus$KillerThread100% (1/1)50%  (1/2)14%  (20/138)12%  (4/32)
run (): void 0%   (0/1)0%   (0/118)0%   (0/28)
CellNucleus$KillerThread (CellNucleus, KillEvent): void 100% (1/1)100% (20/20)100% (4/4)
     
class CellNucleus100% (1/1)39%  (35/90)42%  (635/1512)44%  (147.3/335)
CellNucleus (Cell, String): void 0%   (0/1)0%   (0/6)0%   (0/2)
_getCellInfo (): CellInfo 0%   (0/1)0%   (0/77)0%   (0/25)
access$1000 (CellNucleus, ThreadGroup): Collection 0%   (0/1)0%   (0/4)0%   (0/1)
access$1100 (CellNucleus, Collection, long): boolean 0%   (0/1)0%   (0/5)0%   (0/1)
access$1200 (CellNucleus, Collection): void 0%   (0/1)0%   (0/4)0%   (0/1)
access$1400 (CellNucleus): Logger 0%   (0/1)0%   (0/3)0%   (0/1)
access$500 (CellNucleus): boolean 0%   (0/1)0%   (0/3)0%   (0/1)
access$600 (CellNucleus): ExecutorService 0%   (0/1)0%   (0/3)0%   (0/1)
access$700 (CellNucleus): boolean 0%   (0/1)0%   (0/3)0%   (0/1)
access$800 (CellNucleus): ExecutorService 0%   (0/1)0%   (0/3)0%   (0/1)
access$900 (CellNucleus): ThreadGroup 0%   (0/1)0%   (0/3)0%   (0/1)
createNewCell (String, String, Socket, boolean): Cell 0%   (0/1)0%   (0/15)0%   (0/3)
createNewCell (String, String, String [], Object []): Object 0%   (0/1)0%   (0/17)0%   (0/3)
getCellClass (): String 0%   (0/1)0%   (0/3)0%   (0/1)
getCellDomainInfo (): CellDomainInfo 0%   (0/1)0%   (0/3)0%   (0/1)
getCellInfo (): CellInfo 0%   (0/1)0%   (0/5)0%   (0/1)
getCellInfo (String): CellInfo 0%   (0/1)0%   (0/4)0%   (0/1)
getCellNames (): String [] 0%   (0/1)0%   (0/3)0%   (0/1)
getCellTunnelInfos (): CellTunnelInfo [] 0%   (0/1)0%   (0/3)0%   (0/1)
getCellType (): String 0%   (0/1)0%   (0/3)0%   (0/1)
getCellVersionByClass (Class): CellVersion 0%   (0/1)0%   (0/14)0%   (0/2)
getCellVersionByObject (Object): CellVersion 0%   (0/1)0%   (0/16)0%   (0/3)
getClassProviders (): String [][] 0%   (0/1)0%   (0/3)0%   (0/1)
getEventQueueSize (): int 0%   (0/1)0%   (0/14)0%   (0/4)
getNonDaemonThreads (ThreadGroup): Collection 0%   (0/1)0%   (0/33)0%   (0/8)
getPinboard (): Pinboard 0%   (0/1)0%   (0/3)0%   (0/1)
getPrintoutLevel (): int 0%   (0/1)0%   (0/3)0%   (0/1)
getPrintoutLevel (String): int 0%   (0/1)0%   (0/4)0%   (0/1)
getRoutingList (): CellRoute [] 0%   (0/1)0%   (0/3)0%   (0/1)
getRoutingTable (): CellRoutingTable 0%   (0/1)0%   (0/3)0%   (0/1)
getThisCell (): Cell 0%   (0/1)0%   (0/3)0%   (0/1)
getThreadGroup (): ThreadGroup 0%   (0/1)0%   (0/3)0%   (0/1)
getThreads (): Thread [] 0%   (0/1)0%   (0/35)0%   (0/8)
getThreads (String): Thread [] 0%   (0/1)0%   (0/4)0%   (0/1)
getWaitQueue (): Map 0%   (0/1)0%   (0/23)0%   (0/5)
join (String, long): boolean 0%   (0/1)0%   (0/5)0%   (0/1)
joinThreads (Collection, long): boolean 0%   (0/1)0%   (0/38)0%   (0/11)
kill (String): void 0%   (0/1)0%   (0/5)0%   (0/2)
killThreads (Collection): void 0%   (0/1)0%   (0/28)0%   (0/5)
loadClass (String): Class 0%   (0/1)0%   (0/4)0%   (0/1)
resendMessage (CellMessage): void 0%   (0/1)0%   (0/6)0%   (0/2)
routeAdd (CellRoute): void 0%   (0/1)0%   (0/4)0%   (0/2)
routeDelete (CellRoute): void 0%   (0/1)0%   (0/4)0%   (0/2)
routeFind (CellAddressCore): CellRoute 0%   (0/1)0%   (0/5)0%   (0/1)
sendAndWait (CellMessage, long): CellMessage 0%   (0/1)0%   (0/7)0%   (0/1)
sendMessage (CellMessage, boolean, boolean, CellMessageAnswerable, long): void 0%   (0/1)0%   (0/130)0%   (0/21)
setAsyncCallback (boolean): void 0%   (0/1)0%   (0/15)0%   (0/5)
setCallbackExecutor (ExecutorService): void 0%   (0/1)0%   (0/20)0%   (0/7)
setClassProvider (String, String): void 0%   (0/1)0%   (0/5)0%   (0/2)
setDomainContext (String, Object): void 0%   (0/1)0%   (0/7)0%   (0/2)
setMessageExecutor (ExecutorService): void 0%   (0/1)0%   (0/20)0%   (0/7)
setPrintoutLevel (String, int): void 0%   (0/1)0%   (0/5)0%   (0/2)
setPrintoutLevel (int): void 0%   (0/1)0%   (0/4)0%   (0/1)
setSystemNucleus (CellNucleus): void 0%   (0/1)0%   (0/4)0%   (0/2)
updateWaitQueue (): int 0%   (0/1)0%   (0/74)0%   (0/18)
sendAndWait (CellMessage, boolean, boolean, long): CellMessage 100% (1/1)63%  (114/182)79%  (24.5/31)
addToEventQueue (CellEvent): void 100% (1/1)65%  (96/148)76%  (20.6/27)
createNewCell (String, String, String, boolean): Cell 100% (1/1)82%  (27/33)80%  (8/10)
CellNucleus (Cell, String, String): void 100% (1/1)85%  (159/186)85%  (34/40)
<static initializer> 100% (1/1)100% (9/9)100% (3/3)
access$000 (): CellGlue 100% (1/1)100% (2/2)100% (1/1)
access$100 (CellNucleus): String 100% (1/1)100% (3/3)100% (1/1)
access$1500 (): Logger 100% (1/1)100% (2/2)100% (1/1)
access$200 (): Logger 100% (1/1)100% (2/2)100% (1/1)
access$302 (CellNucleus, int): int 100% (1/1)100% (5/5)100% (1/1)
access$400 (CellNucleus): Cell 100% (1/1)100% (3/3)100% (1/1)
addCellEventListener (CellEventListener): void 100% (1/1)100% (5/5)100% (2/2)
createErrorLogWriter (): Writer 100% (1/1)100% (9/9)100% (1/1)
createInfoLogWriter (): Writer 100% (1/1)100% (9/9)100% (1/1)
export (): void 100% (1/1)100% (4/4)100% (1/1)
getCellDomainName (): String 100% (1/1)100% (3/3)100% (1/1)
getCellName (): String 100% (1/1)100% (3/3)100% (1/1)
getDomainContext (): Map 100% (1/1)100% (3/3)100% (1/1)
getDomainContext (String): Object 100% (1/1)100% (4/4)100% (1/1)
getDomainContextReader (String): Reader 100% (1/1)100% (24/24)100% (4/4)
getLogTargetForCell (String): CellNucleus 100% (1/1)100% (17/17)100% (7/7)
getLoggingThresholds (): FilterThresholds 100% (1/1)100% (3/3)100% (1/1)
getThisAddress (): CellAddressCore 100% (1/1)100% (8/8)100% (1/1)
getUnique (): int 100% (1/1)100% (3/3)100% (1/1)
isSystemNucleus (): boolean 100% (1/1)100% (8/8)100% (1/1)
kill (): void 100% (1/1)100% (4/4)100% (1/1)
newThread (Runnable): Thread 100% (1/1)100% (9/9)100% (1/1)
newThread (Runnable, String): Thread 100% (1/1)100% (10/10)100% (1/1)
sendKillEvent (KillEvent): void 100% (1/1)100% (36/36)100% (5/5)
sendMessage (CellMessage): void 100% (1/1)100% (6/6)100% (2/2)
sendMessage (CellMessage, boolean, boolean): void 100% (1/1)100% (24/24)100% (7/7)
setCellClass (String): void 100% (1/1)100% (4/4)100% (2/2)
setLoggingThresholds (FilterThresholds): void 100% (1/1)100% (4/4)100% (2/2)
setPinboard (Pinboard): void 100% (1/1)100% (4/4)100% (2/2)
wrapLoggingContext (Runnable): Runnable 100% (1/1)100% (9/9)100% (2/2)
     
class CellNucleus$DeliverMessageTask100% (1/1)100% (2/2)55%  (113/204)56%  (21.8/39)
innerRun (): void 100% (1/1)52%  (99/190)51%  (17.8/35)
CellNucleus$DeliverMessageTask (CellNucleus, CellEvent): void 100% (1/1)100% (14/14)100% (4/4)
     
class CellNucleus$AbstractNucleusTask100% (1/1)100% (3/3)59%  (23/39)59%  (5.9/10)
run (): void 100% (1/1)45%  (13/29)55%  (4.9/9)
CellNucleus$AbstractNucleusTask (CellNucleus): void 100% (1/1)100% (6/6)100% (1/1)
CellNucleus$AbstractNucleusTask (CellNucleus, CellNucleus$1): void 100% (1/1)100% (4/4)100% (1/1)
     
class CellNucleus$1100% (1/1)100% (2/2)86%  (24/28)96%  (6.7/7)
run (): void 100% (1/1)75%  (12/16)96%  (5.7/6)
CellNucleus$1 (CellNucleus, NDC, Runnable): void 100% (1/1)100% (12/12)100% (1/1)

1package dmg.cells.nucleus;
2 
3import dmg.util.Pinboard;
4import dmg.util.BufferedLineWriter;
5import dmg.util.Slf4jErrorWriter;
6import dmg.util.Slf4jInfoWriter;
7import dmg.util.logback.FilterThresholds;
8import java.io.*;
9import java.util.*;
10import java.util.concurrent.ExecutorService;
11import java.util.concurrent.Executors;
12import java.util.concurrent.ThreadPoolExecutor;
13import java.util.concurrent.ThreadFactory;
14import java.util.concurrent.RejectedExecutionException;
15import java.lang.reflect.*;
16import java.net.Socket;
17 
18import org.slf4j.LoggerFactory;
19import org.slf4j.Logger;
20import org.dcache.commons.util.NDC;
21 
22import ch.qos.logback.classic.Level;
23 
24 
25/**
26 *
27 *
28 * @author Patrick Fuhrmann
29 * @version 0.1, 15 Feb 1998
30 */
31public class CellNucleus implements ThreadFactory
32{
33    private static final  int    INITIAL  =  0;
34    private static final  int    ACTIVE   =  1;
35    private static final  int    REMOVING =  2;
36    private static final  int    DEAD     =  3;
37 
38    private static CellGlue __cellGlue  = null;
39    private final  String    _cellName;
40    private final  String    _cellType;
41    private        ThreadGroup _threads      = null;
42    private final  Cell      _cell;
43    private final  Date      _creationTime   = new Date();
44    private        int       _state          = INITIAL;
45    private        int       _printoutLevel  = 0;
46 
47    private final static Logger _logMessages =
48        LoggerFactory.getLogger("logger.org.dcache.cells.messages");
49    private final static Logger _logNucleus =
50        LoggerFactory.getLogger(CellNucleus.class);
51    private final Logger _logCell;
52 
53    //  have to be synchronized map
54    private final  Map<UOID, CellLock> _waitHash = new HashMap<UOID, CellLock>();
55    private String _cellClass;
56 
57    private volatile ExecutorService _callbackExecutor;
58    private volatile ExecutorService _messageExecutor;
59 
60    private boolean _isPrivateCallbackExecutor = true;
61    private boolean _isPrivateMessageExecutor = true;
62 
63    private Pinboard _pinboard;
64    private FilterThresholds _loggingThresholds;
65 
66    public CellNucleus(Cell cell, String name) {
67 
68        this(cell, name, "Generic");
69    }
70    public CellNucleus(Cell cell, String name, String type) {
71 
72        _logCell = LoggerFactory.getLogger(cell.getClass());
73 
74        if (__cellGlue == null) {
75            //
76            // the cell gluon hasn't yet been created
77            // (we insist in creating a SystemCell first.)
78            //
79            if (cell instanceof dmg.cells.nucleus.SystemCell) {
80                __cellGlue = new CellGlue(name);
81                _cellName    = "System";
82                _cellType    = "System";
83                __cellGlue.setSystemNucleus(this);
84            } else {
85                throw new
86                    IllegalArgumentException("System must be first Cell");
87            }
88 
89        } else {
90            //
91            // we don't accept more then one System.cells
92            //
93            if (cell instanceof dmg.cells.nucleus.SystemCell) {
94                throw new
95                    IllegalArgumentException("System already exists");
96            } else {
97                String cellName    = name.replace('@', '+');
98 
99                if ((cellName == null) ||
100                   (cellName.equals("")))cellName = "*";
101                if (cellName.charAt(cellName.length() - 1) == '*') {
102                    if (cellName.length() == 1) {
103                        cellName = "$-"+getUnique();
104                    } else {
105                        cellName = cellName.substring(0,cellName.length()-1)+
106                            "-"+getUnique();
107                    }
108                }
109 
110                _cellName = cellName;
111                _cellType    = type;
112 
113            }
114        }
115 
116        _cell = cell;
117        _cellClass = _cell.getClass().getName();
118 
119        //
120        // for the use in restricted sandboxes
121        //
122        try {
123 
124            _threads = new ThreadGroup(__cellGlue.getMasterThreadGroup(),
125                                       _cellName+"-threads");
126 
127        } catch(SecurityException se) {
128            _threads = null;
129        }
130 
131        _callbackExecutor = Executors.newSingleThreadExecutor(this);
132        _messageExecutor = Executors.newSingleThreadExecutor(this);
133 
134        _logNucleus.info("Created : "+name);
135        _state = ACTIVE;
136 
137        //
138        // make ourself known to the world
139        //
140        _printoutLevel = __cellGlue.getDefaultPrintoutLevel();
141        __cellGlue.addCell(_cellName, this);
142 
143    }
144 
145    /**
146     * Returns the CellNucleus to which log messages tagged with a
147     * given cell are associated.
148     */
149    public static CellNucleus getLogTargetForCell(String cell)
150    {
151        CellNucleus nucleus = null;
152        if (__cellGlue != null) {
153            if (cell != null) {
154                nucleus = __cellGlue.getCell(cell);
155            }
156            if (nucleus == null) {
157                nucleus = __cellGlue.getSystemNucleus();
158            }
159        }
160        return nucleus;
161    }
162 
163    void setSystemNucleus(CellNucleus nucleus) {
164        __cellGlue.setSystemNucleus(nucleus);
165    }
166 
167    boolean isSystemNucleus() {
168        return this == __cellGlue.getSystemNucleus();
169    }
170 
171    public String getCellName() { return _cellName; }
172    public String getCellType() { return _cellType; }
173 
174    public String getCellClass()
175    {
176        return _cellClass;
177    }
178 
179    public void setCellClass(String cellClass)
180    {
181        _cellClass = cellClass;
182    }
183 
184    public CellAddressCore getThisAddress() {
185        return new CellAddressCore(_cellName, __cellGlue.getCellDomainName());
186    }
187    public CellDomainInfo getCellDomainInfo() {
188        return __cellGlue.getCellDomainInfo();
189    }
190    public String getCellDomainName() {
191        return __cellGlue.getCellDomainName();
192    }
193    public String [] getCellNames() { return __cellGlue.getCellNames(); }
194    public CellInfo getCellInfo(String name) {
195        return __cellGlue.getCellInfo(name);
196    }
197    public CellInfo getCellInfo() {
198        return __cellGlue.getCellInfo(getCellName());
199    }
200 
201    public Map<String, Object> getDomainContext()
202    {
203        return __cellGlue.getCellContext();
204    }
205 
206    public Reader getDomainContextReader(String contextName)
207        throws FileNotFoundException  {
208        Object o = __cellGlue.getCellContext(contextName);
209        if (o == null)
210            throw new
211                FileNotFoundException("Context not found : "+contextName);
212        return new StringReader(o.toString());
213    }
214    public void   setDomainContext(String contextName, Object context) {
215        __cellGlue.getCellContext().put(contextName, context);
216    }
217    public Object getDomainContext(String str) {
218        return __cellGlue.getCellContext(str);
219    }
220    public String [] [] getClassProviders() {
221        return __cellGlue.getClassProviders();
222    }
223    public synchronized void setClassProvider(String selection, String provider) {
224        __cellGlue.setClassProvider(selection, provider);
225    }
226    Cell getThisCell() { return _cell; }
227 
228    CellInfo _getCellInfo() {
229        CellInfo info = new CellInfo();
230        info.setCellName(getCellName());
231        info.setDomainName(getCellDomainName());
232        info.setCellType(getCellType());
233        info.setCreationTime(_creationTime);
234        try {
235            info.setCellVersion(getCellVersionByObject(_cell));
236        } catch(Exception e) {}
237        try {
238            info.setPrivateInfo(_cell.getInfo());
239        } catch(Exception e) {
240            info.setPrivateInfo("Not yet/No more available\n");
241        }
242        try {
243            info.setShortInfo(_cell.toString());
244        } catch(Exception e) {
245            info.setShortInfo("Not yet/No more available");
246        }
247        info.setCellClass(_cellClass);
248        try {
249            info.setEventQueueSize(getEventQueueSize());
250            info.setState(_state);
251            info.setThreadCount(_threads.activeCount());
252        } catch(Exception e) {
253            info.setEventQueueSize(0);
254            info.setState(0);
255            info.setThreadCount(0);
256        }
257        return info;
258    }
259    public void   setPrintoutLevel(int level) { _printoutLevel = level; }
260    public int    getPrintoutLevel() { return _printoutLevel; }
261    public void   setPrintoutLevel(String cellName, int level) {
262        __cellGlue.setPrintoutLevel(cellName, level);
263    }
264    public int    getPrintoutLevel(String cellName) {
265        return __cellGlue.getPrintoutLevel(cellName);
266    }
267 
268    public void setLoggingThresholds(FilterThresholds thresholds)
269    {
270        _loggingThresholds = thresholds;
271    }
272 
273    public FilterThresholds getLoggingThresholds()
274    {
275        return _loggingThresholds;
276    }
277 
278    public void setPinboard(Pinboard pinboard)
279    {
280        _pinboard = pinboard;
281    }
282 
283    public Pinboard getPinboard()
284    {
285        return _pinboard;
286    }
287 
288    public synchronized void setAsyncCallback(boolean asyncCallback)
289    {
290        if (asyncCallback) {
291            setCallbackExecutor(Executors.newCachedThreadPool(this));
292        } else {
293            setCallbackExecutor(Executors.newSingleThreadExecutor(this));
294        }
295        _isPrivateCallbackExecutor = true;
296    }
297 
298    /**
299     * Executor used for message callbacks.
300     */
301    public synchronized void setCallbackExecutor(ExecutorService executor)
302    {
303        if (executor == null) {
304            throw new IllegalArgumentException("null is not allowed");
305        }
306        if (_isPrivateCallbackExecutor) {
307            _callbackExecutor.shutdown();
308        }
309        _callbackExecutor = executor;
310        _isPrivateCallbackExecutor = false;
311    }
312 
313    /**
314     * Executor used for incoming message delivery.
315     */
316    public synchronized void setMessageExecutor(ExecutorService executor)
317    {
318        if (executor == null) {
319            throw new IllegalArgumentException("null is not allowed");
320        }
321        if (_isPrivateMessageExecutor) {
322            _messageExecutor.shutdown();
323        }
324        _messageExecutor = executor;
325        _isPrivateMessageExecutor = false;
326    }
327 
328    public void   sendMessage(CellMessage msg)
329        throws SerializationException,
330               NoRouteToCellException    {
331 
332        sendMessage(msg, true, true);
333 
334    }
335    public void   resendMessage(CellMessage msg)
336        throws SerializationException,
337               NoRouteToCellException    {
338 
339        sendMessage(msg, false, true);
340 
341    }
342    public void   sendMessage(CellMessage msg,
343                              boolean locally,
344                              boolean remotely)
345        throws SerializationException,
346               NoRouteToCellException    {
347 
348        if (!msg.isStreamMode()) {
349            msg.touch();
350        }
351 
352        EventLogger.sendBegin(this, msg, "async");
353        try {
354            __cellGlue.sendMessage(this, msg, locally, remotely);
355        } finally {
356            EventLogger.sendEnd(msg);
357        }
358    }
359    public CellMessage   sendAndWait(CellMessage msg, long timeout)
360        throws SerializationException,
361               NoRouteToCellException,
362               InterruptedException      {
363        return sendAndWait(msg, true, true, timeout);
364    }
365 
366    public CellMessage sendAndWait(CellMessage msg,
367                                   boolean local,
368                                   boolean remote,
369                                   long    timeout)
370        throws SerializationException,
371               NoRouteToCellException,
372               InterruptedException
373    {
374        if (!msg.isStreamMode()) {
375            msg.touch();
376        }
377 
378        msg.setTtl(timeout);
379 
380        EventLogger.sendBegin(this, msg, "blocking");
381        UOID uoid = msg.getUOID();
382        try {
383            CellLock lock = new CellLock();
384            synchronized (_waitHash) {
385                _waitHash.put(uoid, lock);
386            }
387            _logNucleus.info("sendAndWait : adding to hash : " + uoid);
388 
389            __cellGlue.sendMessage(this, msg, local, remote);
390 
391            //
392            // because of a linux native thread problem with
393            // wait(n > 0), we have to use a interruptedFlag
394            // and the time messurement.
395            //
396            synchronized (lock) {
397                long start = System.currentTimeMillis();
398                while (lock.getObject() == null && timeout > 0) {
399                    lock.wait(timeout);
400                    timeout -= (System.currentTimeMillis() - start);
401                }
402            }
403            CellMessage answer = (CellMessage)lock.getObject();
404            if (answer == null) {
405                return null;
406            }
407            answer = new CellMessage(answer);
408 
409            Object obj = answer.getMessageObject();
410            if (obj instanceof NoRouteToCellException) {
411                throw (NoRouteToCellException) obj;
412            } else if (obj instanceof SerializationException) {
413                throw (SerializationException) obj;
414            }
415            return answer;
416        } finally {
417            synchronized (_waitHash) {
418                _waitHash.remove(uoid);
419            }
420            EventLogger.sendEnd(msg);
421        }
422    }
423 
424    public Map<UOID,CellLock > getWaitQueue() {
425 
426        Map<UOID,CellLock > hash = new HashMap<UOID,CellLock >();
427        synchronized (_waitHash) {
428            hash.putAll(_waitHash);
429        }
430        return hash;
431    }
432 
433    public int updateWaitQueue()
434    {
435        Collection<CellLock> expired = new ArrayList<CellLock>();
436        long now  = System.currentTimeMillis();
437        int size;
438 
439        synchronized (_waitHash) {
440            Iterator<CellLock> i = _waitHash.values().iterator();
441            while (i.hasNext()) {
442                CellLock lock =  i.next();
443                if (lock != null && !lock.isSync() && lock.getTimeout() < now) {
444                    expired.add(lock);
445                    i.remove();
446                }
447            }
448            size = _waitHash.size();
449        }
450 
451        //
452        // _waitHash can't be used here. Otherwise
453        // we will end up in a deadlock (NO LOCKS WHILE CALLING CALLBACKS)
454        //
455        for (CellLock lock: expired) {
456            CellMessage envelope = lock.getMessage();
457            EventLogger.sendEnd(envelope);
458            lock.getCallback().answerTimedOut(envelope);
459        }
460 
461        return size;
462    }
463 
464    public void sendMessage(CellMessage msg,
465                            boolean local,
466                            boolean remote,
467                            CellMessageAnswerable callback,
468                            long timeout)
469        throws SerializationException
470    {
471        if (!msg.isStreamMode()) {
472            msg.touch();
473        }
474 
475        msg.setTtl(timeout);
476 
477        EventLogger.sendBegin(this, msg, "callback");
478        UOID uoid = msg.getUOID();
479        boolean success = false;
480        try {
481            CellLock lock = new CellLock(msg, callback, timeout);
482            synchronized (_waitHash) {
483                _waitHash.put(uoid, lock);
484            }
485 
486            __cellGlue.sendMessage(this, msg, local, remote);
487            success = true;
488        } catch (NoRouteToCellException e) {
489            if (callback != null)
490                callback.exceptionArrived(msg, e);
491        } finally {
492            if (!success) {
493                synchronized (_waitHash) {
494                    _waitHash.remove(uoid);
495                }
496                EventLogger.sendEnd(msg);
497            }
498        }
499    }
500    public void addCellEventListener(CellEventListener listener) {
501        __cellGlue.addCellEventListener(this, listener);
502 
503    }
504    public void export() { __cellGlue.export(this);  }
505    /**
506     *
507     * The kill method schedules the specified cell for deletion.
508     * The actual remove operation will run in a different
509     * thread. So on return of this method the cell may
510     * or may not be alive.
511     */
512    public void kill() {   __cellGlue.kill(this);  }
513    /**
514     *
515     * The kill method schedules this Cell for deletion.
516     * The actual remove operation will run in a different
517     * thread. So on return of this method the cell may
518     * or may not be alive.
519     */
520    public void kill(String cellName) throws IllegalArgumentException {
521        __cellGlue.kill(this, cellName);
522    }
523 
524 
525    /**
526     * Blocks until the given cell is dead.
527     *
528     * @param timeout the maximum time to wait in milliseconds.
529     * @throws InterruptedException if another thread interrupted the
530     * current thread before or while the current thread was waiting
531     * for a notification. The interrupted status of the current
532     * thread is cleared when this exception is thrown.
533     * @return True if the cell died, false in case of a timeout.
534     */
535    public boolean join(String cellName, long timeout)
536        throws InterruptedException
537    {
538        return __cellGlue.join(cellName, timeout);
539    }
540 
541    /**
542     * Returns the non-daemon threads of a thread group.
543     */
544    private Collection<Thread> getNonDaemonThreads(ThreadGroup group)
545    {
546        Thread[] threads = new Thread[group.activeCount()];
547        int count = group.enumerate(threads);
548        Collection<Thread> nonDaemonThreads = new ArrayList<Thread>(count);
549        for (int i = 0; i < count; i++) {
550            Thread thread = threads[i];
551            if (!thread.isDaemon()) {
552                nonDaemonThreads.add(thread);
553            }
554        }
555        return nonDaemonThreads;
556    }
557 
558    /**
559     * Waits for at most timeout milliseconds for the termination of a
560     * set of threads.
561     *
562     * @return true if all threads terminated, false otherwise
563     */
564    private boolean joinThreads(Collection<Thread> threads, long timeout)
565        throws InterruptedException
566    {
567        long deadline = System.currentTimeMillis() + timeout;
568        for (Thread thread: threads) {
569            if (thread.isAlive()) {
570                long wait = deadline - System.currentTimeMillis();
571                if (wait <= 0) {
572                    return false;
573                }
574                thread.join(wait);
575                if (thread.isAlive()) {
576                    return false;
577                }
578            }
579        }
580        return true;
581    }
582 
583    /**
584     * Interrupts a set of threads.
585     */
586    private void killThreads(Collection<Thread> threads)
587        throws InterruptedException
588    {
589        for (Thread thread: threads) {
590            if (thread.isAlive()) {
591                _logNucleus.info("killerThread : interrupting " + thread.getName());
592                thread.interrupt();
593            }
594        }
595    }
596 
597    private Runnable wrapLoggingContext(final Runnable runnable)
598    {
599        final NDC ndc = NDC.cloneNdc();
600        return new Runnable() {
601            public void run() {
602                CDC.setCellsContext(CellNucleus.this);
603                NDC.set(ndc);
604                try {
605                    runnable.run();
606                } finally {
607                    NDC.clear();
608                }
609            }
610        };
611    }
612 
613    public Thread newThread(Runnable target)
614    {
615        return new Thread(_threads, wrapLoggingContext(target));
616    }
617 
618    public Thread newThread(Runnable target, String name)
619    {
620        return new Thread(_threads, wrapLoggingContext(target), name);
621    }
622 
623    //
624    //  package
625    //
626    Thread [] getThreads(String cellName) {
627        return __cellGlue.getThreads(cellName);
628    }
629    public ThreadGroup getThreadGroup() { return _threads; }
630    Thread [] getThreads() {
631        if (_threads == null)return new Thread[0];
632 
633        int threadCount = _threads.activeCount();
634        Thread [] list  = new Thread[threadCount];
635        int rc = _threads.enumerate(list);
636        if (rc == list.length)return list;
637        Thread [] ret = new Thread[rc];
638        System.arraycopy(list, 0, ret, 0, rc);
639        return ret;
640    }
641 
642    int  getUnique() { return __cellGlue.getUnique(); }
643 
644    int getEventQueueSize()
645    {
646        if (_messageExecutor instanceof ThreadPoolExecutor) {
647            ThreadPoolExecutor executor =
648                (ThreadPoolExecutor) _messageExecutor;
649            return executor.getQueue().size();
650        }
651        return 0;
652    }
653 
654    void addToEventQueue(CellEvent ce) {
655        //
656        //
657        if (ce instanceof RoutedMessageEvent) {
658            if (_cell instanceof CellTunnel) {
659                //
660                // nothing to do (no transformation needed)
661                //
662            } else {
663                //
664                // originally this case has not been forseen,
665                // but it appeared rather useful. It allows alias
666                // cells which serves several different cells names.
667                // mainly useful for debuggin purposes (see alias
668                // package.
669                //
670                ce = new MessageEvent(((RoutedMessageEvent)ce).getMessage());
671            }
672        }
673 
674        try {
675            if (ce instanceof MessageEvent) {
676                //
677                // we have to cover 3 cases :
678                //   - absolutely asynchronous request
679                //   - asynchronous, but we have a callback to call
680                //   - synchronous
681                //
682                final CellMessage msg = ((MessageEvent) ce).getMessage();
683                if (msg != null) {
684                    _logNucleus.info("addToEventQueue : message arrived : "+msg);
685                    CellLock lock;
686 
687                    synchronized (_waitHash) {
688                        lock = _waitHash.remove(msg.getLastUOID());
689                    }
690 
691                    if (lock != null) {
692                        //
693                        // we were waiting for you (sync or async)
694                        //
695                        _logNucleus.info("addToEventQueue : lock found for : "+msg);
696                        if (lock.isSync()) {
697                            _logNucleus.info("addToEventQueue : is synchronous : "+msg);
698                            synchronized (lock) {
699                                lock.setObject(msg);
700                                lock.notifyAll();
701                            }
702                            _logNucleus.info("addToEventQueue : dest. was triggered : "+msg);
703                        } else {
704                            _logNucleus.info("addToEventQueue : is asynchronous : "+msg);
705                            _callbackExecutor.execute(new CallbackTask(lock, msg));
706                        }
707                        return;
708                    }
709                }     // end of : msg != null
710            }        // end of : ce instanceof MessageEvent
711 
712            _messageExecutor.execute(new DeliverMessageTask(ce));
713        } catch (RejectedExecutionException e) {
714            _logNucleus.error("Message queue overflow. Dropping " + ce);
715        }
716    }
717 
718    void sendKillEvent(KillEvent ce)
719    {
720        _logNucleus.info("sendKillEvent : received "+ce);
721        Thread thread = new KillerThread(ce);
722        thread.start();
723        _logNucleus.info("sendKillEvent : " + thread.getName()+" started on group "+
724             thread.getThreadGroup().getName());
725    }
726 
727    //
728    // helper to get version string from arbitrary object
729    //
730    public static CellVersion getCellVersionByObject(Object obj) throws Exception {
731        Class<?> c =  obj.getClass();
732 
733        Method m = c.getMethod("getCellVersion", (Class<?> [])null);
734 
735        return (CellVersion)m.invoke(obj, (Object [])null);
736    }
737 
738    public static CellVersion getCellVersionByClass(Class<?> c) throws Exception {
739 
740        Method m = c.getMethod("getCellVersion", (Class [])null);
741 
742        return (CellVersion)m.invoke((Object)null, (Object [])null);
743    }
744 
745    ////////////////////////////////////////////////////////////
746    //
747    //   create new cell by different arguments
748    //   String, String [], Socket
749    //   can choose between systemLoader only or
750    //   Domain loader.
751    //
752    public Cell createNewCell(String cellClass,
753                              String cellName,
754                              String cellArgs,
755                              boolean systemOnly)
756        throws ClassNotFoundException,
757               NoSuchMethodException,
758               SecurityException,
759               InstantiationException,
760               InvocationTargetException,
761               IllegalAccessException,
762               ClassCastException
763    {
764        try {
765            Object [] args = new Object[1];
766            args[0] = cellArgs;
767            return (Cell)__cellGlue._newInstance(cellClass,
768                                                 cellName,
769                                                 args,
770                                                 systemOnly);
771        } catch (InvocationTargetException e) {
772            Throwable t = e.getTargetException();
773            if (t instanceof RuntimeException) {
774                throw (RuntimeException) t;
775            }
776            if (t instanceof Error) {
777                throw (Error) t;
778            }
779            throw e;
780        }
781    }
782 
783    public Class<?> loadClass(String className) throws ClassNotFoundException {
784        return __cellGlue.loadClass(className);
785    }
786 
787    public Object  createNewCell(String className,
788                                 String cellName,
789                                 String [] argsClassNames,
790                                 Object [] args)
791        throws ClassNotFoundException,
792               NoSuchMethodException,
793               InstantiationException,
794               IllegalAccessException,
795               InvocationTargetException,
796               ClassCastException                       {
797 
798        if (argsClassNames == null)
799            return __cellGlue._newInstance(
800                                           className, cellName, args, false);
801        else
802            return __cellGlue._newInstance(
803                                           className, cellName, argsClassNames, args, false);
804    }
805 
806    public Cell createNewCell(String cellClass,
807                              String cellName,
808                              Socket socket,
809                              boolean systemOnly)
810        throws ClassNotFoundException,
811               NoSuchMethodException,
812               SecurityException,
813               InstantiationException,
814               InvocationTargetException,
815               IllegalAccessException,
816               ClassCastException          {
817 
818        Object [] args = new Object[1];
819        args[0] = socket;
820 
821        return (Cell)__cellGlue._newInstance(cellClass,
822                                             cellName,
823                                             args,
824                                             systemOnly);
825    }
826    ////////////////////////////////////////////////////////////
827    //
828    //
829    // the routing stuff
830    //
831    public void routeAdd(CellRoute  route) throws IllegalArgumentException {
832        __cellGlue.routeAdd(route);
833    }
834    public void routeDelete(CellRoute  route) throws IllegalArgumentException {
835        __cellGlue.routeDelete(route);
836    }
837    CellRoute routeFind(CellAddressCore addr) {
838        return __cellGlue.getRoutingTable().find(addr);
839    }
840    CellRoutingTable getRoutingTable() { return __cellGlue.getRoutingTable(); }
841    CellRoute [] getRoutingList() { return __cellGlue.getRoutingList(); }
842    //
843    CellTunnelInfo [] getCellTunnelInfos() { return __cellGlue.getCellTunnelInfos(); }
844    //
845 
846    public Writer createErrorLogWriter()
847    {
848        return new BufferedLineWriter(new Slf4jErrorWriter(_logCell));
849    }
850 
851    public Writer createInfoLogWriter()
852    {
853        return new BufferedLineWriter(new Slf4jInfoWriter(_logCell));
854    }
855 
856    public static final int  PRINT_CELL          =    1;
857    public static final int  PRINT_ERROR_CELL    =    2;
858    public static final int  PRINT_NUCLEUS       =    4;
859    public static final int  PRINT_ERROR_NUCLEUS =    8;
860    public static final int  PRINT_FATAL         = 0x10;
861    public static final int  PRINT_ERRORS        =
862        PRINT_ERROR_CELL|PRINT_ERROR_NUCLEUS;
863    public static final int  PRINT_EVERYTHING    =
864        PRINT_CELL|PRINT_ERROR_CELL|PRINT_NUCLEUS|PRINT_ERROR_NUCLEUS|PRINT_FATAL;
865 
866    private class KillerThread extends Thread
867    {
868        private final KillEvent _event;
869 
870        public KillerThread(KillEvent event)
871        {
872            super(__cellGlue.getKillerThreadGroup(), "killer-" + _cellName);
873            _event = event;
874        }
875 
876        @Override
877        public void run()
878        {
879            _logNucleus.info("killerThread : started");
880            _state = REMOVING;
881            addToEventQueue(new LastMessageEvent());
882            try {
883                _cell.prepareRemoval(_event);
884            } catch (Throwable e) {
885                Thread t = Thread.currentThread();
886                t.getUncaughtExceptionHandler().uncaughtException(t, e);
887            }
888 
889            synchronized (this) {
890                if (_isPrivateCallbackExecutor) {
891                    _callbackExecutor.shutdown();
892                }
893                if (_isPrivateMessageExecutor) {
894                    _messageExecutor.shutdown();
895                }
896            }
897 
898            _logNucleus.info("killerThread : waiting for all threads in "+_threads+" to finish");
899 
900            try {
901                Collection<Thread> threads = getNonDaemonThreads(_threads);
902 
903                /* Some threads shut down asynchronously. Give them
904                 * one second before we start to kill them.
905                 */
906                while (!joinThreads(threads, 1000)) {
907                    killThreads(threads);
908                }
909                _threads.destroy();
910            } catch (IllegalThreadStateException e) {
911                _threads.setDaemon(true);
912            } catch (InterruptedException e) {
913                _logNucleus.warn("killerThread : Interrupted while waiting for threads");
914            }
915            __cellGlue.destroy(CellNucleus.this);
916            _state = DEAD;
917            _logNucleus.info("killerThread : stopped");
918        }
919    }
920 
921    private abstract class AbstractNucleusTask implements Runnable
922    {
923        protected abstract void innerRun();
924 
925        public void run ()
926        {
927            CDC cdc = new CDC();
928            try {
929                CDC.setCellsContext(CellNucleus.this);
930                innerRun();
931            } catch (Throwable e) {
932                Thread t = Thread.currentThread();
933                t.getUncaughtExceptionHandler().uncaughtException(t, e);
934            } finally {
935                cdc.apply();
936            }
937        }
938    }
939 
940    private class CallbackTask extends AbstractNucleusTask
941    {
942        private final CellLock _lock;
943        private final CellMessage _message;
944 
945        public CallbackTask(CellLock lock, CellMessage message)
946        {
947            _lock = lock;
948            _message = message;
949        }
950 
951        @Override
952        public void innerRun()
953        {
954            CellMessageAnswerable callback =
955                _lock.getCallback();
956 
957            CellMessage answer;
958            Object obj;
959            try {
960                answer = new CellMessage(_message);
961                _lock.getCdc().apply();
962                obj = answer.getMessageObject();
963            } catch (SerializationException e) {
964                _logNucleus.warn(e.getMessage());
965                obj = e;
966                answer = null;
967            }
968 
969            EventLogger.sendEnd(_lock.getMessage());
970            if (obj instanceof Exception) {
971                callback.
972                    exceptionArrived(_lock.getMessage(), (Exception) obj);
973            } else {
974                callback.
975                    answerArrived(_lock.getMessage(), answer);
976            }
977            _logNucleus.info("addToEventQueue : callback done for : " + _message);
978        }
979    }
980 
981    private class DeliverMessageTask extends AbstractNucleusTask
982    {
983        private final CellEvent _event;
984 
985        public DeliverMessageTask(CellEvent event)
986        {
987            _event = event;
988            EventLogger.queueBegin(_event);
989        }
990 
991        @Override
992        public void innerRun()
993        {
994            EventLogger.queueEnd(_event);
995 
996            if (_event instanceof LastMessageEvent) {
997                _logNucleus.info("messageThread : LastMessageEvent arrived");
998                _cell.messageArrived((MessageEvent) _event);
999            } else if (_event instanceof RoutedMessageEvent) {
1000                _logNucleus.info("messageThread : RoutedMessageEvent arrived");
1001                _cell.messageArrived((RoutedMessageEvent) _event);
1002            } else if (_event instanceof MessageEvent) {
1003                MessageEvent msgEvent = (MessageEvent) _event;
1004                _logNucleus.info("messageThread : MessageEvent arrived");
1005                CellMessage msg;
1006                try {
1007                    msg = new CellMessage(msgEvent.getMessage());
1008                } catch (SerializationException e) {
1009                    CellMessage envelope = msgEvent.getMessage();
1010                    _logCell.error(String.format("Discarding a malformed message from %s with UOID %s and session [%s]: %s",
1011                                                 envelope.getSourcePath(),
1012                                                 envelope.getUOID(),
1013                                                 envelope.getSession(),
1014                                                 e.getMessage()), e);
1015                    return;
1016                }
1017 
1018                CDC.setMessageContext(msg);
1019                try {
1020                    //
1021                    // deserialize the message
1022                    //
1023                    if (_logMessages.isDebugEnabled()) {
1024                        String messageObject = msg.getMessageObject() == null? "NULL" : msg.getMessageObject().getClass().getName();
1025                        _logMessages.debug("nucleusMessageArrived src=" + msg.getSourceAddress() +
1026                                           " dest=" + msg.getDestinationAddress() + " [" + messageObject + "] UOID=" + msg.getUOID().toString());
1027                    }
1028                    //
1029                    // and deliver it
1030                    //
1031                    _logNucleus.info("messageThread : delivering message : "+msg);
1032                    _cell.messageArrived(new MessageEvent(msg));
1033                    _logNucleus.info("messageThread : delivering message done : "+msg);
1034                } catch (RuntimeException e) {
1035                    if (!msg.isReply()) {
1036                        try {
1037                            msg.revertDirection();
1038                            msg.setMessageObject(e);
1039                            sendMessage(msg);
1040                        } catch (NoRouteToCellException f) {
1041                            _logCell.error("PANIC : Problem returning answer : " + f);
1042                        }
1043                    }
1044                    throw e;
1045                } finally {
1046                    CDC.clearMessageContext();
1047                }
1048            }
1049        }
1050    }
1051}

[all classes][dmg.cells.nucleus]
EMMA 2.0.5312 (C) Vladimir Roubtsov