| 1 | package dmg.cells.network; |
| 2 | |
| 3 | import java.io.BufferedOutputStream; |
| 4 | import java.io.BufferedInputStream; |
| 5 | import java.io.IOException; |
| 6 | import java.io.EOFException; |
| 7 | import java.io.InputStream; |
| 8 | import java.io.InterruptedIOException; |
| 9 | import java.io.ObjectInputStream; |
| 10 | import java.io.ObjectOutputStream; |
| 11 | import java.io.OutputStream; |
| 12 | import java.io.PrintWriter; |
| 13 | import java.net.Socket; |
| 14 | import java.net.SocketException; |
| 15 | import java.util.Map; |
| 16 | import java.util.HashMap; |
| 17 | |
| 18 | |
| 19 | import org.slf4j.Logger; |
| 20 | import org.slf4j.LoggerFactory; |
| 21 | |
| 22 | import dmg.cells.nucleus.CellAdapter; |
| 23 | import dmg.cells.nucleus.CellPath; |
| 24 | import dmg.cells.nucleus.CellExceptionMessage; |
| 25 | import dmg.cells.nucleus.CellDomainInfo; |
| 26 | import dmg.cells.nucleus.CellMessage; |
| 27 | import dmg.cells.nucleus.CellNucleus; |
| 28 | import dmg.cells.nucleus.CellRoute; |
| 29 | import dmg.cells.nucleus.CellTunnel; |
| 30 | import dmg.cells.nucleus.CellTunnelInfo; |
| 31 | import dmg.cells.nucleus.MessageEvent; |
| 32 | import dmg.cells.nucleus.NoRouteToCellException; |
| 33 | import dmg.cells.nucleus.RoutedMessageEvent; |
| 34 | import dmg.cells.nucleus.SerializationException; |
| 35 | import dmg.util.Args; |
| 36 | import dmg.util.Gate; |
| 37 | import dmg.util.StreamEngine; |
| 38 | |
| 39 | /** |
| 40 | * |
| 41 | * |
| 42 | * @author Patrick Fuhrmann |
| 43 | * @version 0.1, 5 Mar 2001 |
| 44 | */ |
| 45 | public class LocationMgrTunnel |
| 46 | extends CellAdapter |
| 47 | implements CellTunnel, Runnable |
| 48 | { |
| 49 | /** |
| 50 | * This class encapsulates routing table management. It ensures |
| 51 | * that at most one tunnel to any given domain is registered at a |
| 52 | * time. |
| 53 | * |
| 54 | * It is assumed that all tunnels share the same cell glue (this |
| 55 | * is normally the case for cells in the same domain). |
| 56 | */ |
| 57 | static class Tunnels |
| 58 | { |
| 59 | private Map<String,LocationMgrTunnel> _tunnels = |
| 60 | new HashMap<String,LocationMgrTunnel>(); |
| 61 | |
| 62 | /** |
| 63 | * Adds a new tunnel. A route for the tunnel destination is |
| 64 | * registered in the CellNucleus. The same tunnel cannot be |
| 65 | * registered twice; unregister it first. |
| 66 | * |
| 67 | * If another tunnel is already registered for the same |
| 68 | * destination, then the other tunnel is killed. |
| 69 | */ |
| 70 | public synchronized void add(LocationMgrTunnel tunnel) |
| 71 | throws InterruptedException |
| 72 | { |
| 73 | CellNucleus nucleus = tunnel.getNucleus(); |
| 74 | |
| 75 | if (_tunnels.containsValue(tunnel)) |
| 76 | throw new IllegalArgumentException("Cannot register the same tunnel twice"); |
| 77 | |
| 78 | String domain = tunnel.getRemoteDomainName(); |
| 79 | |
| 80 | /* Kill old tunnel first. |
| 81 | */ |
| 82 | LocationMgrTunnel old; |
| 83 | while ((old = _tunnels.get(domain)) != null) { |
| 84 | old.kill(); |
| 85 | wait(); |
| 86 | } |
| 87 | |
| 88 | /* Add new route. |
| 89 | */ |
| 90 | CellRoute route = new CellRoute(domain, |
| 91 | tunnel.getCellName(), |
| 92 | CellRoute.DOMAIN); |
| 93 | try { |
| 94 | nucleus.routeAdd(route); |
| 95 | } catch (IllegalArgumentException e) { |
| 96 | /* Crap, somehow the entry already exists. Well, we |
| 97 | * insist on adding a new one, so we delete the old |
| 98 | * one first. |
| 99 | */ |
| 100 | nucleus.routeDelete(route); |
| 101 | nucleus.routeAdd(route); |
| 102 | } |
| 103 | |
| 104 | /* Keep track of what we did. |
| 105 | */ |
| 106 | _tunnels.put(domain, tunnel); |
| 107 | notifyAll(); |
| 108 | } |
| 109 | |
| 110 | /** |
| 111 | * Removes a tunnel and unregisters its routes. If the tunnel |
| 112 | * was already removed, then nothing happens. |
| 113 | * |
| 114 | * It is crucial that the <code>_remoteDomainInfo</code> of |
| 115 | * the tunnel does not change between the point at which it is |
| 116 | * added and the point at which it is removed. |
| 117 | */ |
| 118 | public synchronized void remove(LocationMgrTunnel tunnel) |
| 119 | { |
| 120 | CellNucleus nucleus = tunnel.getNucleus(); |
| 121 | String domain = tunnel.getRemoteDomainName(); |
| 122 | if (_tunnels.get(domain) == tunnel) { |
| 123 | _tunnels.remove(domain); |
| 124 | nucleus.routeDelete(new CellRoute(domain, |
| 125 | tunnel.getCellName(), |
| 126 | CellRoute.DOMAIN)); |
| 127 | notifyAll(); |
| 128 | } |
| 129 | } |
| 130 | } |
| 131 | |
| 132 | /** |
| 133 | * We use a single shared instance of Tunnels to coordinate route |
| 134 | * creation between tunnels. |
| 135 | */ |
| 136 | private final static Tunnels _tunnels = new Tunnels(); |
| 137 | |
| 138 | private final static Logger _log = |
| 139 | LoggerFactory.getLogger(LocationMgrTunnel.class); |
| 140 | |
| 141 | private final static Logger _logMessages = |
| 142 | LoggerFactory.getLogger("logger.org.dcache.cells.messages"); |
| 143 | |
| 144 | private final CellNucleus _nucleus; |
| 145 | |
| 146 | private CellDomainInfo _remoteDomainInfo; |
| 147 | private final Socket _socket; |
| 148 | private final ObjectInputStream _input; |
| 149 | private final ObjectOutputStream _output; |
| 150 | |
| 151 | private boolean _down = false; |
| 152 | |
| 153 | // |
| 154 | // some statistics |
| 155 | // |
| 156 | private int _messagesToTunnel = 0; |
| 157 | private int _messagesToSystem = 0; |
| 158 | |
| 159 | public LocationMgrTunnel(String cellName, StreamEngine engine, Args args) |
| 160 | throws IOException |
| 161 | { |
| 162 | super(cellName, "System", args, false); |
| 163 | |
| 164 | try { |
| 165 | _nucleus = getNucleus(); |
| 166 | _socket = engine.getSocket(); |
| 167 | _socket.setTcpNoDelay(true); |
| 168 | |
| 169 | /* The constructor of ObjectOutputStream writes a header |
| 170 | * to the stream. Hence we flush the stream after |
| 171 | * creation. |
| 172 | */ |
| 173 | _output = new ObjectOutputStream(new BufferedOutputStream(engine.getOutputStream())); |
| 174 | _output.flush(); |
| 175 | _input = new ObjectInputStream(new BufferedInputStream(engine.getInputStream())); |
| 176 | } catch (IOException e) { |
| 177 | start(); |
| 178 | kill(); |
| 179 | throw e; |
| 180 | } |
| 181 | getNucleus().newThread(this, "Tunnel").start(); |
| 182 | } |
| 183 | |
| 184 | private CellDomainInfo negotiateDomainInfo(ObjectOutputStream out, |
| 185 | ObjectInputStream in) |
| 186 | throws IOException |
| 187 | { |
| 188 | try { |
| 189 | send(_nucleus.getCellDomainInfo()); |
| 190 | |
| 191 | Object obj = in.readObject(); |
| 192 | if (obj == null) |
| 193 | throw new IOException("EOS encountered while reading DomainInfo"); |
| 194 | return (CellDomainInfo)obj; |
| 195 | } catch (ClassNotFoundException e) { |
| 196 | throw new IOException("Cannot deserialize object. This is most likely due to a version mismatch."); |
| 197 | } |
| 198 | } |
| 199 | |
| 200 | synchronized private void setDown(boolean down) |
| 201 | { |
| 202 | _down = down; |
| 203 | notifyAll(); |
| 204 | } |
| 205 | |
| 206 | synchronized private boolean isDown() |
| 207 | { |
| 208 | return _down; |
| 209 | } |
| 210 | |
| 211 | private void logSend(CellMessage msg) |
| 212 | { |
| 213 | if (_logMessages.isDebugEnabled()) { |
| 214 | Object object = msg.getMessageObject(); |
| 215 | String messageObject = |
| 216 | object == null ? "NULL" : object.getClass().getName(); |
| 217 | _logMessages.debug("tunnelMessageSent src=" |
| 218 | + msg.getSourceAddress() |
| 219 | + " dest=" + msg.getDestinationAddress() |
| 220 | + " [" + messageObject + "] UOID=" |
| 221 | + msg.getUOID().toString()); |
| 222 | } |
| 223 | } |
| 224 | |
| 225 | private void logReceive(CellMessage msg) |
| 226 | { |
| 227 | if (_logMessages.isDebugEnabled()) { |
| 228 | Object object = msg.getMessageObject(); |
| 229 | String messageObject = |
| 230 | object == null ? "NULL" : object.getClass().getName(); |
| 231 | |
| 232 | _logMessages.debug("tunnelMessageReceived src=" |
| 233 | + msg.getSourceAddress() |
| 234 | + " dest=" + msg.getDestinationAddress() |
| 235 | + " [" + messageObject + "] UOID=" |
| 236 | + msg.getUOID().toString()); |
| 237 | } |
| 238 | } |
| 239 | |
| 240 | private void returnToSender(CellMessage msg, NoRouteToCellException e) |
| 241 | throws SerializationException |
| 242 | { |
| 243 | try { |
| 244 | if (!(msg instanceof CellExceptionMessage)) { |
| 245 | CellPath retAddr = (CellPath)msg.getSourcePath().clone(); |
| 246 | retAddr.revert(); |
| 247 | CellExceptionMessage ret = new CellExceptionMessage(retAddr, e); |
| 248 | ret.setLastUOID(msg.getUOID()); |
| 249 | _nucleus.sendMessage(ret); |
| 250 | } |
| 251 | } catch (NoRouteToCellException f) { |
| 252 | _log.warn("Unable to deliver message and unable to return it to sender: " + msg); |
| 253 | } |
| 254 | } |
| 255 | |
| 256 | private void receive() |
| 257 | throws InterruptedException, IOException, ClassNotFoundException |
| 258 | { |
| 259 | CellMessage msg; |
| 260 | while ((msg = (CellMessage)_input.readObject()) != null) { |
| 261 | logReceive(msg); |
| 262 | |
| 263 | try { |
| 264 | sendMessage(msg); |
| 265 | _messagesToSystem++; |
| 266 | } catch (NoRouteToCellException e) { |
| 267 | returnToSender(msg, e); |
| 268 | } |
| 269 | } |
| 270 | } |
| 271 | |
| 272 | private void send(Object msg) |
| 273 | throws IOException |
| 274 | { |
| 275 | if (isDown()) |
| 276 | throw new IOException("Tunnel has been shut down."); |
| 277 | |
| 278 | _output.writeObject(msg); |
| 279 | |
| 280 | /* An object output stream will only serialize an object ones |
| 281 | * and likewise the object input stream will recreate the |
| 282 | * object DAG at the other end. To avoid that the receiver |
| 283 | * needs to unnecessarily keep references to previous objects, |
| 284 | * we reset the stream. Notice that resetting the stream sends |
| 285 | * a reset messsage. Hence we reset the stream before flushing |
| 286 | * it. |
| 287 | */ |
| 288 | _output.reset(); |
| 289 | _output.flush(); |
| 290 | } |
| 291 | |
| 292 | public void run() |
| 293 | { |
| 294 | if (isDown()) |
| 295 | throw new IllegalStateException("Tunnel has already been closed"); |
| 296 | |
| 297 | try { |
| 298 | _remoteDomainInfo = negotiateDomainInfo(_output, _input); |
| 299 | _log.info("Established tunnel to " + getRemoteDomainName()); |
| 300 | |
| 301 | start(); |
| 302 | |
| 303 | _tunnels.add(this); |
| 304 | try { |
| 305 | receive(); |
| 306 | } finally { |
| 307 | _tunnels.remove(this); |
| 308 | } |
| 309 | } catch (EOFException e) { |
| 310 | } catch (ClassNotFoundException e) { |
| 311 | _log.warn("Cannot deserialize object. This is most likely due to a version mismatch."); |
| 312 | } catch (InterruptedException e) { |
| 313 | } catch (IOException e) { |
| 314 | _log.warn("Error while reading from tunnel: " + e.getMessage()); |
| 315 | } finally { |
| 316 | start(); |
| 317 | kill(); |
| 318 | } |
| 319 | } |
| 320 | |
| 321 | public void messageArrived(MessageEvent me) |
| 322 | { |
| 323 | if (me instanceof RoutedMessageEvent) { |
| 324 | CellMessage msg = me.getMessage(); |
| 325 | try { |
| 326 | logSend(msg); |
| 327 | _messagesToTunnel++; |
| 328 | send(msg); |
| 329 | } catch (IOException e) { |
| 330 | _log.warn("Error while sending message: " + e.getMessage()); |
| 331 | returnToSender(msg, |
| 332 | new NoRouteToCellException("Communication failure. Message could not be delivered.")); |
| 333 | kill(); |
| 334 | } |
| 335 | } else { |
| 336 | super.messageArrived(me); |
| 337 | } |
| 338 | } |
| 339 | |
| 340 | public CellTunnelInfo getCellTunnelInfo() |
| 341 | { |
| 342 | return new CellTunnelInfo(getCellName(), |
| 343 | _nucleus.getCellDomainInfo(), |
| 344 | _remoteDomainInfo); |
| 345 | } |
| 346 | |
| 347 | protected String getRemoteDomainName() |
| 348 | { |
| 349 | return (_remoteDomainInfo == null) |
| 350 | ? "" |
| 351 | : _remoteDomainInfo.getCellDomainName(); |
| 352 | } |
| 353 | |
| 354 | public String toString() |
| 355 | { |
| 356 | return "Connected to " + getRemoteDomainName(); |
| 357 | } |
| 358 | |
| 359 | public void getInfo(PrintWriter pw) |
| 360 | { |
| 361 | pw.println("Location Mgr Tunnel : " + getCellName()); |
| 362 | pw.println("-> Tunnel : " + _messagesToTunnel); |
| 363 | pw.println("-> Domain : " + _messagesToSystem); |
| 364 | pw.println("Peer : " + getRemoteDomainName()); |
| 365 | } |
| 366 | |
| 367 | public void cleanUp() |
| 368 | { |
| 369 | _log.info("Closing tunnel to " + getRemoteDomainName()); |
| 370 | setDown(true); |
| 371 | try { |
| 372 | _socket.shutdownInput(); |
| 373 | _socket.close(); |
| 374 | } catch (IOException e) { |
| 375 | _log.warn("Failed to close socket: " + e.getMessage()); |
| 376 | } |
| 377 | } |
| 378 | |
| 379 | public synchronized void join() throws InterruptedException |
| 380 | { |
| 381 | while (!isDown()) { |
| 382 | wait(); |
| 383 | } |
| 384 | } |
| 385 | } |