EMMA Coverage Report (generated Mon Aug 23 17:21:34 CEST 2010)
[all classes][dmg.cells.network]

COVERAGE SUMMARY FOR SOURCE FILE [LocationMgrTunnel.java]

nameclass, %method, %block, %line, %
LocationMgrTunnel.java100% (2/2)9%   (2/22)3%   (19/627)4%   (5/142)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class LocationMgrTunnel100% (1/1)5%   (1/19)2%   (11/533)3%   (3/117)
LocationMgrTunnel (String, StreamEngine, Args): void 0%   (0/1)0%   (0/65)0%   (0/17)
access$000 (LocationMgrTunnel): void 0%   (0/1)0%   (0/3)0%   (0/1)
cleanUp (): void 0%   (0/1)0%   (0/34)0%   (0/8)
getCellTunnelInfo (): CellTunnelInfo 0%   (0/1)0%   (0/11)0%   (0/1)
getInfo (PrintWriter): void 0%   (0/1)0%   (0/45)0%   (0/5)
getRemoteDomainName (): String 0%   (0/1)0%   (0/9)0%   (0/1)
isDown (): boolean 0%   (0/1)0%   (0/3)0%   (0/1)
join (): void 0%   (0/1)0%   (0/7)0%   (0/3)
logReceive (CellMessage): void 0%   (0/1)0%   (0/41)0%   (0/5)
logSend (CellMessage): void 0%   (0/1)0%   (0/41)0%   (0/5)
messageArrived (MessageEvent): void 0%   (0/1)0%   (0/45)0%   (0/13)
negotiateDomainInfo (ObjectOutputStream, ObjectInputStream): CellDomainInfo 0%   (0/1)0%   (0/24)0%   (0/7)
receive (): void 0%   (0/1)0%   (0/27)0%   (0/8)
returnToSender (CellMessage, NoRouteToCellException): void 0%   (0/1)0%   (0/37)0%   (0/10)
run (): void 0%   (0/1)0%   (0/95)0%   (0/19)
send (Object): void 0%   (0/1)0%   (0/19)0%   (0/6)
setDown (boolean): void 0%   (0/1)0%   (0/6)0%   (0/3)
toString (): String 0%   (0/1)0%   (0/10)0%   (0/1)
<static initializer> 100% (1/1)100% (11/11)100% (3/3)
     
class LocationMgrTunnel$Tunnels100% (1/1)33%  (1/3)9%   (8/94)8%   (2/25)
add (LocationMgrTunnel): void 0%   (0/1)0%   (0/57)0%   (0/16)
remove (LocationMgrTunnel): void 0%   (0/1)0%   (0/29)0%   (0/7)
LocationMgrTunnel$Tunnels (): void 100% (1/1)100% (8/8)100% (2/2)

1package dmg.cells.network;
2 
3import java.io.BufferedOutputStream;
4import java.io.BufferedInputStream;
5import java.io.IOException;
6import java.io.EOFException;
7import java.io.InputStream;
8import java.io.InterruptedIOException;
9import java.io.ObjectInputStream;
10import java.io.ObjectOutputStream;
11import java.io.OutputStream;
12import java.io.PrintWriter;
13import java.net.Socket;
14import java.net.SocketException;
15import java.util.Map;
16import java.util.HashMap;
17 
18 
19import org.slf4j.Logger;
20import org.slf4j.LoggerFactory;
21 
22import dmg.cells.nucleus.CellAdapter;
23import dmg.cells.nucleus.CellPath;
24import dmg.cells.nucleus.CellExceptionMessage;
25import dmg.cells.nucleus.CellDomainInfo;
26import dmg.cells.nucleus.CellMessage;
27import dmg.cells.nucleus.CellNucleus;
28import dmg.cells.nucleus.CellRoute;
29import dmg.cells.nucleus.CellTunnel;
30import dmg.cells.nucleus.CellTunnelInfo;
31import dmg.cells.nucleus.MessageEvent;
32import dmg.cells.nucleus.NoRouteToCellException;
33import dmg.cells.nucleus.RoutedMessageEvent;
34import dmg.cells.nucleus.SerializationException;
35import dmg.util.Args;
36import dmg.util.Gate;
37import dmg.util.StreamEngine;
38 
39/**
40 *
41 *
42 * @author Patrick Fuhrmann
43 * @version 0.1, 5 Mar 2001
44 */
45public class LocationMgrTunnel
46    extends CellAdapter
47    implements CellTunnel, Runnable
48{
49    /**
50     * This class encapsulates routing table management. It ensures
51     * that at most one tunnel to any given domain is registered at a
52     * time.
53     *
54     * It is assumed that all tunnels share the same cell glue (this
55     * is normally the case for cells in the same domain).
56     */
57    static class Tunnels
58    {
59        private Map<String,LocationMgrTunnel> _tunnels =
60            new HashMap<String,LocationMgrTunnel>();
61 
62        /**
63         * Adds a new tunnel. A route for the tunnel destination is
64         * registered in the CellNucleus. The same tunnel cannot be
65         * registered twice; unregister it first.
66         *
67         * If another tunnel is already registered for the same
68         * destination, then the other tunnel is killed.
69         */
70        public synchronized void add(LocationMgrTunnel tunnel)
71            throws InterruptedException
72        {
73            CellNucleus nucleus = tunnel.getNucleus();
74 
75            if (_tunnels.containsValue(tunnel))
76                throw new IllegalArgumentException("Cannot register the same tunnel twice");
77 
78            String domain = tunnel.getRemoteDomainName();
79 
80            /* Kill old tunnel first.
81             */
82            LocationMgrTunnel old;
83            while ((old = _tunnels.get(domain)) != null) {
84                old.kill();
85                wait();
86            }
87 
88            /* Add new route.
89             */
90            CellRoute route = new CellRoute(domain,
91                                            tunnel.getCellName(),
92                                            CellRoute.DOMAIN);
93            try {
94                nucleus.routeAdd(route);
95            } catch (IllegalArgumentException e) {
96                /* Crap, somehow the entry already exists. Well, we
97                 * insist on adding a new one, so we delete the old
98                 * one first.
99                 */
100                nucleus.routeDelete(route);
101                nucleus.routeAdd(route);
102            }
103 
104            /* Keep track of what we did.
105             */
106            _tunnels.put(domain, tunnel);
107            notifyAll();
108        }
109 
110        /**
111         * Removes a tunnel and unregisters its routes. If the tunnel
112         * was already removed, then nothing happens.
113         *
114         * It is crucial that the <code>_remoteDomainInfo</code> of
115         * the tunnel does not change between the point at which it is
116         * added and the point at which it is removed.
117         */
118        public synchronized void remove(LocationMgrTunnel tunnel)
119        {
120            CellNucleus nucleus = tunnel.getNucleus();
121            String domain = tunnel.getRemoteDomainName();
122            if (_tunnels.get(domain) == tunnel) {
123                _tunnels.remove(domain);
124                nucleus.routeDelete(new CellRoute(domain,
125                                                  tunnel.getCellName(),
126                                                  CellRoute.DOMAIN));
127                notifyAll();
128            }
129        }
130    }
131 
132    /**
133     * We use a single shared instance of Tunnels to coordinate route
134     * creation between tunnels.
135     */
136    private final static Tunnels _tunnels = new Tunnels();
137 
138    private final static Logger _log =
139        LoggerFactory.getLogger(LocationMgrTunnel.class);
140 
141    private final static Logger _logMessages =
142        LoggerFactory.getLogger("logger.org.dcache.cells.messages");
143 
144    private final CellNucleus  _nucleus;
145 
146    private CellDomainInfo  _remoteDomainInfo;
147    private final Socket _socket;
148    private final ObjectInputStream  _input;
149    private final ObjectOutputStream _output;
150 
151    private boolean _down = false;
152 
153    //
154    // some statistics
155    //
156    private int  _messagesToTunnel    = 0;
157    private int  _messagesToSystem    = 0;
158 
159    public LocationMgrTunnel(String cellName, StreamEngine engine, Args args)
160        throws IOException
161    {
162        super(cellName, "System", args, false);
163 
164        try {
165            _nucleus = getNucleus();
166            _socket = engine.getSocket();
167            _socket.setTcpNoDelay(true);
168 
169            /* The constructor of ObjectOutputStream writes a header
170             * to the stream. Hence we flush the stream after
171             * creation.
172             */
173            _output = new ObjectOutputStream(new BufferedOutputStream(engine.getOutputStream()));
174            _output.flush();
175            _input = new ObjectInputStream(new BufferedInputStream(engine.getInputStream()));
176        } catch (IOException e) {
177            start();
178            kill();
179            throw e;
180        }
181        getNucleus().newThread(this, "Tunnel").start();
182    }
183 
184    private CellDomainInfo negotiateDomainInfo(ObjectOutputStream out,
185                                               ObjectInputStream in)
186        throws IOException
187    {
188        try  {
189            send(_nucleus.getCellDomainInfo());
190 
191            Object obj = in.readObject();
192            if (obj == null)
193                throw new IOException("EOS encountered while reading DomainInfo");
194            return (CellDomainInfo)obj;
195        } catch (ClassNotFoundException e) {
196            throw new IOException("Cannot deserialize object. This is most likely due to a version mismatch.");
197        }
198    }
199 
200    synchronized private void setDown(boolean down)
201    {
202        _down = down;
203        notifyAll();
204    }
205 
206    synchronized private boolean isDown()
207    {
208        return _down;
209    }
210 
211    private void logSend(CellMessage msg)
212    {
213        if (_logMessages.isDebugEnabled()) {
214            Object object = msg.getMessageObject();
215            String messageObject =
216                object == null ? "NULL" : object.getClass().getName();
217            _logMessages.debug("tunnelMessageSent src="
218                               + msg.getSourceAddress()
219                               + " dest=" + msg.getDestinationAddress()
220                               + " [" + messageObject + "] UOID="
221                               + msg.getUOID().toString());
222        }
223    }
224 
225    private void logReceive(CellMessage msg)
226    {
227        if (_logMessages.isDebugEnabled()) {
228            Object object = msg.getMessageObject();
229            String messageObject =
230                object == null ? "NULL" : object.getClass().getName();
231 
232            _logMessages.debug("tunnelMessageReceived src="
233                               + msg.getSourceAddress()
234                               + " dest=" + msg.getDestinationAddress()
235                               + " [" + messageObject + "] UOID="
236                               + msg.getUOID().toString());
237        }
238    }
239 
240    private void returnToSender(CellMessage msg, NoRouteToCellException e)
241        throws SerializationException
242    {
243        try {
244            if (!(msg instanceof CellExceptionMessage)) {
245                CellPath retAddr = (CellPath)msg.getSourcePath().clone();
246                retAddr.revert();
247                CellExceptionMessage ret = new CellExceptionMessage(retAddr, e);
248                ret.setLastUOID(msg.getUOID());
249                _nucleus.sendMessage(ret);
250            }
251        } catch (NoRouteToCellException f) {
252            _log.warn("Unable to deliver message and unable to return it to sender: " + msg);
253        }
254    }
255 
256    private void receive()
257        throws InterruptedException, IOException, ClassNotFoundException
258    {
259        CellMessage msg;
260        while ((msg = (CellMessage)_input.readObject()) != null) {
261            logReceive(msg);
262 
263            try {
264                sendMessage(msg);
265                _messagesToSystem++;
266            } catch (NoRouteToCellException e) {
267                returnToSender(msg, e);
268            }
269        }
270    }
271 
272    private void send(Object msg)
273        throws IOException
274    {
275        if (isDown())
276            throw new IOException("Tunnel has been shut down.");
277 
278        _output.writeObject(msg);
279 
280        /* An object output stream will only serialize an object ones
281         * and likewise the object input stream will recreate the
282         * object DAG at the other end. To avoid that the receiver
283         * needs to unnecessarily keep references to previous objects,
284         * we reset the stream. Notice that resetting the stream sends
285         * a reset messsage. Hence we reset the stream before flushing
286         * it.
287         */
288        _output.reset();
289        _output.flush();
290    }
291 
292    public void run()
293    {
294        if (isDown())
295            throw new IllegalStateException("Tunnel has already been closed");
296 
297        try {
298            _remoteDomainInfo = negotiateDomainInfo(_output, _input);
299            _log.info("Established tunnel to " + getRemoteDomainName());
300 
301            start();
302 
303            _tunnels.add(this);
304            try {
305                receive();
306            } finally {
307                _tunnels.remove(this);
308            }
309        } catch (EOFException e) {
310        } catch (ClassNotFoundException e) {
311            _log.warn("Cannot deserialize object. This is most likely due to a version mismatch.");
312        } catch (InterruptedException e) {
313        } catch (IOException e) {
314            _log.warn("Error while reading from tunnel: " + e.getMessage());
315        } finally {
316            start();
317            kill();
318        }
319    }
320 
321    public void messageArrived(MessageEvent me)
322    {
323        if (me instanceof RoutedMessageEvent) {
324            CellMessage msg = me.getMessage();
325            try {
326                logSend(msg);
327                _messagesToTunnel++;
328                send(msg);
329            } catch (IOException e) {
330                _log.warn("Error while sending message: " + e.getMessage());
331                returnToSender(msg,
332                               new NoRouteToCellException("Communication failure. Message could not be delivered."));
333                kill();
334            }
335        } else {
336            super.messageArrived(me);
337        }
338    }
339 
340    public CellTunnelInfo getCellTunnelInfo()
341    {
342        return new CellTunnelInfo(getCellName(),
343                                  _nucleus.getCellDomainInfo(),
344                                  _remoteDomainInfo);
345    }
346 
347    protected String getRemoteDomainName()
348    {
349        return (_remoteDomainInfo == null)
350            ? ""
351            : _remoteDomainInfo.getCellDomainName();
352    }
353 
354    public String toString()
355    {
356        return "Connected to " + getRemoteDomainName();
357    }
358 
359    public void getInfo(PrintWriter pw)
360    {
361        pw.println("Location Mgr Tunnel : " + getCellName());
362        pw.println("-> Tunnel     : " + _messagesToTunnel);
363        pw.println("-> Domain     : " + _messagesToSystem);
364        pw.println("Peer          : " + getRemoteDomainName());
365    }
366 
367    public void cleanUp()
368    {
369        _log.info("Closing tunnel to " + getRemoteDomainName());
370        setDown(true);
371        try {
372            _socket.shutdownInput();
373            _socket.close();
374        } catch (IOException e) {
375            _log.warn("Failed to close socket: " + e.getMessage());
376        }
377    }
378 
379    public synchronized void join() throws InterruptedException
380    {
381        while (!isDown()) {
382            wait();
383        }
384    }
385}

[all classes][dmg.cells.network]
EMMA 2.0.5312 (C) Vladimir Roubtsov