1 | package dmg.cells.network; |
2 | |
3 | import java.io.BufferedOutputStream; |
4 | import java.io.BufferedInputStream; |
5 | import java.io.IOException; |
6 | import java.io.EOFException; |
7 | import java.io.InputStream; |
8 | import java.io.InterruptedIOException; |
9 | import java.io.ObjectInputStream; |
10 | import java.io.ObjectOutputStream; |
11 | import java.io.OutputStream; |
12 | import java.io.PrintWriter; |
13 | import java.net.Socket; |
14 | import java.net.SocketException; |
15 | import java.util.Map; |
16 | import java.util.HashMap; |
17 | |
18 | |
19 | import org.slf4j.Logger; |
20 | import org.slf4j.LoggerFactory; |
21 | |
22 | import dmg.cells.nucleus.CellAdapter; |
23 | import dmg.cells.nucleus.CellPath; |
24 | import dmg.cells.nucleus.CellExceptionMessage; |
25 | import dmg.cells.nucleus.CellDomainInfo; |
26 | import dmg.cells.nucleus.CellMessage; |
27 | import dmg.cells.nucleus.CellNucleus; |
28 | import dmg.cells.nucleus.CellRoute; |
29 | import dmg.cells.nucleus.CellTunnel; |
30 | import dmg.cells.nucleus.CellTunnelInfo; |
31 | import dmg.cells.nucleus.MessageEvent; |
32 | import dmg.cells.nucleus.NoRouteToCellException; |
33 | import dmg.cells.nucleus.RoutedMessageEvent; |
34 | import dmg.cells.nucleus.SerializationException; |
35 | import dmg.util.Args; |
36 | import dmg.util.Gate; |
37 | import dmg.util.StreamEngine; |
38 | |
39 | /** |
40 | * |
41 | * |
42 | * @author Patrick Fuhrmann |
43 | * @version 0.1, 5 Mar 2001 |
44 | */ |
45 | public class LocationMgrTunnel |
46 | extends CellAdapter |
47 | implements CellTunnel, Runnable |
48 | { |
49 | /** |
50 | * This class encapsulates routing table management. It ensures |
51 | * that at most one tunnel to any given domain is registered at a |
52 | * time. |
53 | * |
54 | * It is assumed that all tunnels share the same cell glue (this |
55 | * is normally the case for cells in the same domain). |
56 | */ |
57 | static class Tunnels |
58 | { |
59 | private Map<String,LocationMgrTunnel> _tunnels = |
60 | new HashMap<String,LocationMgrTunnel>(); |
61 | |
62 | /** |
63 | * Adds a new tunnel. A route for the tunnel destination is |
64 | * registered in the CellNucleus. The same tunnel cannot be |
65 | * registered twice; unregister it first. |
66 | * |
67 | * If another tunnel is already registered for the same |
68 | * destination, then the other tunnel is killed. |
69 | */ |
70 | public synchronized void add(LocationMgrTunnel tunnel) |
71 | throws InterruptedException |
72 | { |
73 | CellNucleus nucleus = tunnel.getNucleus(); |
74 | |
75 | if (_tunnels.containsValue(tunnel)) |
76 | throw new IllegalArgumentException("Cannot register the same tunnel twice"); |
77 | |
78 | String domain = tunnel.getRemoteDomainName(); |
79 | |
80 | /* Kill old tunnel first. |
81 | */ |
82 | LocationMgrTunnel old; |
83 | while ((old = _tunnels.get(domain)) != null) { |
84 | old.kill(); |
85 | wait(); |
86 | } |
87 | |
88 | /* Add new route. |
89 | */ |
90 | CellRoute route = new CellRoute(domain, |
91 | tunnel.getCellName(), |
92 | CellRoute.DOMAIN); |
93 | try { |
94 | nucleus.routeAdd(route); |
95 | } catch (IllegalArgumentException e) { |
96 | /* Crap, somehow the entry already exists. Well, we |
97 | * insist on adding a new one, so we delete the old |
98 | * one first. |
99 | */ |
100 | nucleus.routeDelete(route); |
101 | nucleus.routeAdd(route); |
102 | } |
103 | |
104 | /* Keep track of what we did. |
105 | */ |
106 | _tunnels.put(domain, tunnel); |
107 | notifyAll(); |
108 | } |
109 | |
110 | /** |
111 | * Removes a tunnel and unregisters its routes. If the tunnel |
112 | * was already removed, then nothing happens. |
113 | * |
114 | * It is crucial that the <code>_remoteDomainInfo</code> of |
115 | * the tunnel does not change between the point at which it is |
116 | * added and the point at which it is removed. |
117 | */ |
118 | public synchronized void remove(LocationMgrTunnel tunnel) |
119 | { |
120 | CellNucleus nucleus = tunnel.getNucleus(); |
121 | String domain = tunnel.getRemoteDomainName(); |
122 | if (_tunnels.get(domain) == tunnel) { |
123 | _tunnels.remove(domain); |
124 | nucleus.routeDelete(new CellRoute(domain, |
125 | tunnel.getCellName(), |
126 | CellRoute.DOMAIN)); |
127 | notifyAll(); |
128 | } |
129 | } |
130 | } |
131 | |
132 | /** |
133 | * We use a single shared instance of Tunnels to coordinate route |
134 | * creation between tunnels. |
135 | */ |
136 | private final static Tunnels _tunnels = new Tunnels(); |
137 | |
138 | private final static Logger _log = |
139 | LoggerFactory.getLogger(LocationMgrTunnel.class); |
140 | |
141 | private final static Logger _logMessages = |
142 | LoggerFactory.getLogger("logger.org.dcache.cells.messages"); |
143 | |
144 | private final CellNucleus _nucleus; |
145 | |
146 | private CellDomainInfo _remoteDomainInfo; |
147 | private final Socket _socket; |
148 | private final ObjectInputStream _input; |
149 | private final ObjectOutputStream _output; |
150 | |
151 | private boolean _down = false; |
152 | |
153 | // |
154 | // some statistics |
155 | // |
156 | private int _messagesToTunnel = 0; |
157 | private int _messagesToSystem = 0; |
158 | |
159 | public LocationMgrTunnel(String cellName, StreamEngine engine, Args args) |
160 | throws IOException |
161 | { |
162 | super(cellName, "System", args, false); |
163 | |
164 | try { |
165 | _nucleus = getNucleus(); |
166 | _socket = engine.getSocket(); |
167 | _socket.setTcpNoDelay(true); |
168 | |
169 | /* The constructor of ObjectOutputStream writes a header |
170 | * to the stream. Hence we flush the stream after |
171 | * creation. |
172 | */ |
173 | _output = new ObjectOutputStream(new BufferedOutputStream(engine.getOutputStream())); |
174 | _output.flush(); |
175 | _input = new ObjectInputStream(new BufferedInputStream(engine.getInputStream())); |
176 | } catch (IOException e) { |
177 | start(); |
178 | kill(); |
179 | throw e; |
180 | } |
181 | getNucleus().newThread(this, "Tunnel").start(); |
182 | } |
183 | |
184 | private CellDomainInfo negotiateDomainInfo(ObjectOutputStream out, |
185 | ObjectInputStream in) |
186 | throws IOException |
187 | { |
188 | try { |
189 | send(_nucleus.getCellDomainInfo()); |
190 | |
191 | Object obj = in.readObject(); |
192 | if (obj == null) |
193 | throw new IOException("EOS encountered while reading DomainInfo"); |
194 | return (CellDomainInfo)obj; |
195 | } catch (ClassNotFoundException e) { |
196 | throw new IOException("Cannot deserialize object. This is most likely due to a version mismatch."); |
197 | } |
198 | } |
199 | |
200 | synchronized private void setDown(boolean down) |
201 | { |
202 | _down = down; |
203 | notifyAll(); |
204 | } |
205 | |
206 | synchronized private boolean isDown() |
207 | { |
208 | return _down; |
209 | } |
210 | |
211 | private void logSend(CellMessage msg) |
212 | { |
213 | if (_logMessages.isDebugEnabled()) { |
214 | Object object = msg.getMessageObject(); |
215 | String messageObject = |
216 | object == null ? "NULL" : object.getClass().getName(); |
217 | _logMessages.debug("tunnelMessageSent src=" |
218 | + msg.getSourceAddress() |
219 | + " dest=" + msg.getDestinationAddress() |
220 | + " [" + messageObject + "] UOID=" |
221 | + msg.getUOID().toString()); |
222 | } |
223 | } |
224 | |
225 | private void logReceive(CellMessage msg) |
226 | { |
227 | if (_logMessages.isDebugEnabled()) { |
228 | Object object = msg.getMessageObject(); |
229 | String messageObject = |
230 | object == null ? "NULL" : object.getClass().getName(); |
231 | |
232 | _logMessages.debug("tunnelMessageReceived src=" |
233 | + msg.getSourceAddress() |
234 | + " dest=" + msg.getDestinationAddress() |
235 | + " [" + messageObject + "] UOID=" |
236 | + msg.getUOID().toString()); |
237 | } |
238 | } |
239 | |
240 | private void returnToSender(CellMessage msg, NoRouteToCellException e) |
241 | throws SerializationException |
242 | { |
243 | try { |
244 | if (!(msg instanceof CellExceptionMessage)) { |
245 | CellPath retAddr = (CellPath)msg.getSourcePath().clone(); |
246 | retAddr.revert(); |
247 | CellExceptionMessage ret = new CellExceptionMessage(retAddr, e); |
248 | ret.setLastUOID(msg.getUOID()); |
249 | _nucleus.sendMessage(ret); |
250 | } |
251 | } catch (NoRouteToCellException f) { |
252 | _log.warn("Unable to deliver message and unable to return it to sender: " + msg); |
253 | } |
254 | } |
255 | |
256 | private void receive() |
257 | throws InterruptedException, IOException, ClassNotFoundException |
258 | { |
259 | CellMessage msg; |
260 | while ((msg = (CellMessage)_input.readObject()) != null) { |
261 | logReceive(msg); |
262 | |
263 | try { |
264 | sendMessage(msg); |
265 | _messagesToSystem++; |
266 | } catch (NoRouteToCellException e) { |
267 | returnToSender(msg, e); |
268 | } |
269 | } |
270 | } |
271 | |
272 | private void send(Object msg) |
273 | throws IOException |
274 | { |
275 | if (isDown()) |
276 | throw new IOException("Tunnel has been shut down."); |
277 | |
278 | _output.writeObject(msg); |
279 | |
280 | /* An object output stream will only serialize an object ones |
281 | * and likewise the object input stream will recreate the |
282 | * object DAG at the other end. To avoid that the receiver |
283 | * needs to unnecessarily keep references to previous objects, |
284 | * we reset the stream. Notice that resetting the stream sends |
285 | * a reset messsage. Hence we reset the stream before flushing |
286 | * it. |
287 | */ |
288 | _output.reset(); |
289 | _output.flush(); |
290 | } |
291 | |
292 | public void run() |
293 | { |
294 | if (isDown()) |
295 | throw new IllegalStateException("Tunnel has already been closed"); |
296 | |
297 | try { |
298 | _remoteDomainInfo = negotiateDomainInfo(_output, _input); |
299 | _log.info("Established tunnel to " + getRemoteDomainName()); |
300 | |
301 | start(); |
302 | |
303 | _tunnels.add(this); |
304 | try { |
305 | receive(); |
306 | } finally { |
307 | _tunnels.remove(this); |
308 | } |
309 | } catch (EOFException e) { |
310 | } catch (ClassNotFoundException e) { |
311 | _log.warn("Cannot deserialize object. This is most likely due to a version mismatch."); |
312 | } catch (InterruptedException e) { |
313 | } catch (IOException e) { |
314 | _log.warn("Error while reading from tunnel: " + e.getMessage()); |
315 | } finally { |
316 | start(); |
317 | kill(); |
318 | } |
319 | } |
320 | |
321 | public void messageArrived(MessageEvent me) |
322 | { |
323 | if (me instanceof RoutedMessageEvent) { |
324 | CellMessage msg = me.getMessage(); |
325 | try { |
326 | logSend(msg); |
327 | _messagesToTunnel++; |
328 | send(msg); |
329 | } catch (IOException e) { |
330 | _log.warn("Error while sending message: " + e.getMessage()); |
331 | returnToSender(msg, |
332 | new NoRouteToCellException("Communication failure. Message could not be delivered.")); |
333 | kill(); |
334 | } |
335 | } else { |
336 | super.messageArrived(me); |
337 | } |
338 | } |
339 | |
340 | public CellTunnelInfo getCellTunnelInfo() |
341 | { |
342 | return new CellTunnelInfo(getCellName(), |
343 | _nucleus.getCellDomainInfo(), |
344 | _remoteDomainInfo); |
345 | } |
346 | |
347 | protected String getRemoteDomainName() |
348 | { |
349 | return (_remoteDomainInfo == null) |
350 | ? "" |
351 | : _remoteDomainInfo.getCellDomainName(); |
352 | } |
353 | |
354 | public String toString() |
355 | { |
356 | return "Connected to " + getRemoteDomainName(); |
357 | } |
358 | |
359 | public void getInfo(PrintWriter pw) |
360 | { |
361 | pw.println("Location Mgr Tunnel : " + getCellName()); |
362 | pw.println("-> Tunnel : " + _messagesToTunnel); |
363 | pw.println("-> Domain : " + _messagesToSystem); |
364 | pw.println("Peer : " + getRemoteDomainName()); |
365 | } |
366 | |
367 | public void cleanUp() |
368 | { |
369 | _log.info("Closing tunnel to " + getRemoteDomainName()); |
370 | setDown(true); |
371 | try { |
372 | _socket.shutdownInput(); |
373 | _socket.close(); |
374 | } catch (IOException e) { |
375 | _log.warn("Failed to close socket: " + e.getMessage()); |
376 | } |
377 | } |
378 | |
379 | public synchronized void join() throws InterruptedException |
380 | { |
381 | while (!isDown()) { |
382 | wait(); |
383 | } |
384 | } |
385 | } |