EMMA Coverage Report (generated Mon Aug 23 17:21:34 CEST 2010)
[all classes][diskCacheV111.poolManager]

COVERAGE SUMMARY FOR SOURCE FILE [PoolManagerV5.java]

nameclass, %method, %block, %line, %
PoolManagerV5.java50%  (2/4)26%  (15/57)13%  (254/1945)15%  (60.6/392)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class PoolManagerV5$LinkGroupSelectionTask0%   (0/1)0%   (0/3)0%   (0/185)0%   (0/40)
PoolManagerV5$LinkGroupSelectionTask (PoolManagerV5, PoolManagerSelectLinkGro... 0%   (0/1)0%   (0/20)0%   (0/5)
run (): void 0%   (0/1)0%   (0/74)0%   (0/20)
selectLinkGroups (): List 0%   (0/1)0%   (0/91)0%   (0/15)
     
class PoolManagerV5$WriteRequestHandler0%   (0/1)0%   (0/4)0%   (0/236)0%   (0/47)
PoolManagerV5$WriteRequestHandler (PoolManagerV5, CellMessage, PoolMgrSelectW... 0%   (0/1)0%   (0/23)0%   (0/6)
requestFailed (int, String): void 0%   (0/1)0%   (0/23)0%   (0/6)
requestSucceeded (String): void 0%   (0/1)0%   (0/31)0%   (0/8)
run (): void 0%   (0/1)0%   (0/159)0%   (0/27)
     
class PoolManagerV5100% (1/1)26%  (12/46)15%  (194/1317)20%  (53.1/262)
ac_free_$_0 (Args): String 0%   (0/1)0%   (0/40)0%   (0/5)
ac_get_av_pools_$_4 (Args): String 0%   (0/1)0%   (0/99)0%   (0/19)
ac_getpoolsbylink_$_1 (Args): String 0%   (0/1)0%   (0/43)0%   (0/8)
ac_set_max_threads_$_1 (Args): String 0%   (0/1)0%   (0/2)0%   (0/1)
ac_set_timeout_fetch_$_1 (Args): String 0%   (0/1)0%   (0/13)0%   (0/2)
ac_set_timeout_pnfs_$_1 (Args): String 0%   (0/1)0%   (0/11)0%   (0/2)
access$100 (PoolManagerV5, long): void 0%   (0/1)0%   (0/4)0%   (0/1)
access$1000 (PoolManagerV5): boolean 0%   (0/1)0%   (0/3)0%   (0/1)
access$1100 (PoolManagerV5): RequestContainerV5 0%   (0/1)0%   (0/3)0%   (0/1)
access$400 (PoolManagerV5, long, StorageInfo): long 0%   (0/1)0%   (0/5)0%   (0/1)
access$500 (PoolManagerV5): PoolSelectionUnit 0%   (0/1)0%   (0/3)0%   (0/1)
access$600 (PoolManagerV5): CostModule 0%   (0/1)0%   (0/3)0%   (0/1)
access$700 (PoolManagerV5): boolean 0%   (0/1)0%   (0/3)0%   (0/1)
access$800 (PoolManagerV5, StorageInfo): boolean 0%   (0/1)0%   (0/4)0%   (0/1)
access$900 (PoolManagerV5): PoolMonitorV5 0%   (0/1)0%   (0/3)0%   (0/1)
determineExpectedFileSize (long, StorageInfo): long 0%   (0/1)0%   (0/26)0%   (0/9)
getCellInfo (CellInfo): CellInfo 0%   (0/1)0%   (0/19)0%   (0/4)
getInfo (PrintWriter): void 0%   (0/1)0%   (0/106)0%   (0/13)
messageArrived (CellMessage, PoolMgrSelectWritePoolMsg): DelayedReply 0%   (0/1)0%   (0/7)0%   (0/1)
messageArrived (PoolManagerGetPoolListMessage): PoolManagerGetPoolListMessage 0%   (0/1)0%   (0/12)0%   (0/4)
messageArrived (PoolManagerGetPoolsByLinkMessage): PoolManagerGetPoolsByLinkM... 0%   (0/1)0%   (0/27)0%   (0/10)
messageArrived (PoolManagerGetPoolsByNameMessage): PoolManagerGetPoolsByNameM... 0%   (0/1)0%   (0/40)0%   (0/11)
messageArrived (PoolManagerGetPoolsByPoolGroupMessage): PoolManagerGetPoolsBy... 0%   (0/1)0%   (0/27)0%   (0/10)
messageArrived (PoolManagerPoolModeMessage): PoolManagerPoolModeMessage 0%   (0/1)0%   (0/49)0%   (0/8)
messageArrived (PoolManagerPoolUpMessage): void 0%   (0/1)0%   (0/151)0%   (0/21)
messageArrived (PoolManagerSelectLinkGroupForWriteMessage): DelayedReply 0%   (0/1)0%   (0/22)0%   (0/5)
messageArrived (PoolMgrGetPoolByLink): PoolMgrGetPoolByLink 0%   (0/1)0%   (0/49)0%   (0/11)
messageArrived (PoolMgrGetPoolLinkGroups): PoolMgrGetPoolLinkGroups 0%   (0/1)0%   (0/21)0%   (0/5)
messageArrived (PoolMgrQueryPoolsMsg): PoolMgrQueryPoolsMsg 0%   (0/1)0%   (0/21)0%   (0/4)
printSetup (PrintWriter): void 0%   (0/1)0%   (0/48)0%   (0/11)
quotasExceeded (StorageInfo): boolean 0%   (0/1)0%   (0/98)0%   (0/16)
runWatchdogSequence (long): void 0%   (0/1)0%   (0/69)0%   (0/9)
sendPoolStatusRelay (String, int): void 0%   (0/1)0%   (0/8)0%   (0/2)
sendPoolStatusRelay (String, int, PoolV2Mode, int, String): void 0%   (0/1)0%   (0/48)0%   (0/10)
ac_set_timeout_pool_$_1 (Args): String 100% (1/1)64%  (37/58)56%  (6.7/12)
init (): void 100% (1/1)69%  (25/36)78%  (4.7/6)
setPoolStatusRelayPath (CellPath): void 100% (1/1)78%  (7/9)88%  (1.8/2)
setQuotaManager (String): void 100% (1/1)86%  (12/14)95%  (2.8/3)
<static initializer> 100% (1/1)100% (15/15)100% (2/2)
PoolManagerV5 (): void 100% (1/1)100% (76/76)100% (25/25)
access$000 (): Logger 100% (1/1)100% (2/2)100% (1/1)
setCostModule (CostModule): void 100% (1/1)100% (4/4)100% (2/2)
setPoolMonitor (PoolMonitorV5): void 100% (1/1)100% (4/4)100% (2/2)
setPoolSelectionUnit (PoolSelectionUnit): void 100% (1/1)100% (4/4)100% (2/2)
setRequestContainer (RequestContainerV5): void 100% (1/1)100% (4/4)100% (2/2)
setSendCostInfo (boolean): void 100% (1/1)100% (4/4)100% (2/2)
     
class PoolManagerV5$WatchdogThread100% (1/1)75%  (3/4)29%  (60/207)18%  (7.5/43)
PoolManagerV5$WatchdogThread (PoolManagerV5, String): void 0%   (0/1)0%   (0/122)0%   (0/28)
run (): void 100% (1/1)11%  (3/28)10%  (1/10)
PoolManagerV5$WatchdogThread (PoolManagerV5): void 100% (1/1)100% (31/31)100% (7/7)
toString (): String 100% (1/1)100% (26/26)100% (1/1)

1package diskCacheV111.poolManager ;
2 
3import java.io.PrintWriter;
4import java.util.Arrays;
5import java.util.ArrayList;
6import java.util.Collection;
7import java.util.Collections;
8import java.util.Date;
9import java.util.HashMap;
10import java.util.Iterator;
11import java.util.List;
12import java.util.Map;
13import java.util.NoSuchElementException;
14import java.util.StringTokenizer;
15 
16import org.slf4j.Logger;
17import org.slf4j.LoggerFactory;
18import org.dcache.poolmanager.Utils;
19 
20import diskCacheV111.poolManager.PoolSelectionUnit.DirectionType;
21import diskCacheV111.pools.PoolV2Mode;
22import diskCacheV111.util.CacheException;
23import diskCacheV111.util.PnfsId;
24import diskCacheV111.util.Version;
25import diskCacheV111.vehicles.GenericStorageInfo;
26import diskCacheV111.vehicles.IpProtocolInfo;
27import diskCacheV111.vehicles.PoolCostCheckable;
28import diskCacheV111.vehicles.PoolLinkGroupInfo;
29import diskCacheV111.vehicles.PoolManagerGetPoolListMessage;
30import diskCacheV111.vehicles.PoolManagerPoolModeMessage;
31import diskCacheV111.vehicles.PoolManagerPoolUpMessage;
32import diskCacheV111.vehicles.PoolManagerGetPoolsByNameMessage;
33import diskCacheV111.vehicles.PoolManagerGetPoolsByLinkMessage;
34import diskCacheV111.vehicles.PoolManagerGetPoolsByPoolGroupMessage;
35import diskCacheV111.vehicles.PoolMgrGetPoolByLink;
36import diskCacheV111.vehicles.PoolMgrGetPoolLinkGroups;
37import diskCacheV111.vehicles.PoolMgrQueryPoolsMsg;
38import diskCacheV111.vehicles.PoolMgrSelectWritePoolMsg;
39import diskCacheV111.vehicles.PoolStatusChangedMessage;
40import diskCacheV111.vehicles.ProtocolInfo;
41import diskCacheV111.vehicles.QuotaMgrCheckQuotaMessage;
42import diskCacheV111.vehicles.StorageInfo;
43import diskCacheV111.vehicles.PoolManagerPoolInformation;
44import dmg.cells.nucleus.CellInfo;
45import dmg.cells.nucleus.CellMessage;
46import dmg.cells.nucleus.CellPath;
47import dmg.cells.nucleus.CellVersion;
48import dmg.cells.nucleus.DelayedReply;
49import dmg.cells.nucleus.CDC;
50import dmg.cells.nucleus.NoRouteToCellException;
51import dmg.util.Args;
52import dmg.util.CommandException;
53 
54import org.dcache.cells.AbstractCellComponent;
55import org.dcache.cells.CellCommandListener;
56import org.dcache.cells.CellMessageReceiver;
57import org.dcache.vehicles.PoolManagerSelectLinkGroupForWriteMessage;
58 
59public class PoolManagerV5
60    extends AbstractCellComponent
61    implements CellCommandListener,
62               CellMessageReceiver
63{
64    private int  _writeThreads     = 0 ;
65    private int  _readThreads      = 0 ;
66 
67    private int _counterPoolUp         = 0 ;
68    private int _counterSelectWritePool= 0 ;
69    private int _counterSelectReadPool = 0 ;
70 
71    private Map _readHandlerList   = new HashMap() ;
72    private final Object  _readHandlerLock   = new Object() ;
73 
74    private PoolSelectionUnit _selectionUnit ;
75    private PoolMonitorV5     _poolMonitor   ;
76 
77    private long _pnfsTimeout      = 15 * 1000;
78    private long _readPoolTimeout  = 15 * 1000;
79    private long _poolFetchTimeout = 5 * 24 * 3600 * 1000;
80    private long _writePoolTimeout = 15 * 1000;
81    private long _poolTimeout      = 15 * 1000;
82 
83    private CostModule   _costModule  ;
84    private CellPath     _poolStatusRelayPath = null ;
85 
86    private RequestContainerV5 _requestContainer ;
87    private WatchdogThread     _watchdog         = null ;
88 
89    private boolean _sendCostInfo  = false ;                   //VP
90    private boolean _quotasEnabled = false ;
91    private String  _quotaManager  = "none";
92 
93 
94    private final static Logger _log = LoggerFactory.getLogger(PoolManagerV5.class);
95    private final static Logger _logPoolMonitor = LoggerFactory.getLogger("logger.org.dcache.poolmonitor." + PoolManagerV5.class.getName());
96 
97 
98    public PoolManagerV5()
99    {
100    }
101 
102    public void setPoolSelectionUnit(PoolSelectionUnit selectionUnit)
103    {
104        _selectionUnit = selectionUnit;
105    }
106 
107    public void setCostModule(CostModule costModule)
108    {
109        _costModule = costModule;
110    }
111 
112    public void setPoolMonitor(PoolMonitorV5 poolMonitor)
113    {
114        _poolMonitor = poolMonitor;
115    }
116 
117    public void setRequestContainer(RequestContainerV5 requestContainer)
118    {
119        _requestContainer = requestContainer;
120    }
121 
122    public void setPoolStatusRelayPath(CellPath poolStatusRelayPath)
123    {
124        _poolStatusRelayPath =
125            (poolStatusRelayPath.hops() == 0)
126            ? null
127            : poolStatusRelayPath;
128    }
129 
130    public void setQuotaManager(String quotaManager)
131    {
132        _quotaManager = quotaManager;
133        _quotasEnabled = !_quotaManager.equals("none");
134    }
135 
136    public void setSendCostInfo(boolean sendCostInfo)
137    {
138        _sendCostInfo = sendCostInfo;
139    }
140 
141    public void init()
142    {
143        String watchdogParam = getArgs().getOpt("watchdog");
144        if (watchdogParam != null && watchdogParam.length() > 0) {
145            _watchdog = new WatchdogThread(watchdogParam);
146        } else {
147            _watchdog = new WatchdogThread();
148        }
149        _log.info("Watchdog : " + _watchdog);
150    }
151 
152    @Override
153    public CellInfo getCellInfo(CellInfo info)
154    {
155        PoolManagerCellInfo pminfo = new PoolManagerCellInfo(info);
156        pminfo.setCellVersion(new CellVersion(Version.getVersion(),"$Revision: 14256 $"));
157        pminfo.setPoolList(_selectionUnit.getActivePools());
158        return pminfo;
159    }
160 
161    @Override
162    public void printSetup(PrintWriter writer)
163    {
164        writer.print("#\n# Setup of ");
165        writer.print(getCellName());
166        writer.print(" (");
167        writer.print(getClass().getName());
168        writer.print(") at ");
169        writer.println(new Date().toString());
170        writer.println("#");
171        writer.print("set timeout pool ");
172        writer.println(""+(_poolMonitor.getPoolTimeout()/1000L));
173        writer.println("#");
174    }
175 
176    private class WatchdogThread implements Runnable {
177        private long _deathDetected = 10L * 60L * 1000L; // 10 minutes
178        private long _sleepTimer = 1L * 60L * 1000L; // 1 minute
179        private long _watchdogSequenceCounter = 0L;
180 
181        public WatchdogThread() {
182            new Thread(this, "watchdog").start();
183            _log.info("WatchdogThread initialized with : " + this);
184        }
185 
186        public WatchdogThread(String parameter) {
187            //
188            // [<deathDetection>]:[<sleeper>]
189            //
190            long deathDetected = 0;
191            long sleeping = 0;
192            try {
193                StringTokenizer st = new StringTokenizer(parameter, ":");
194                String tmp = null;
195                if (st.hasMoreTokens()) {
196                    tmp = st.nextToken();
197                    if (tmp.length() > 0)
198                        deathDetected = Long.parseLong(tmp);
199                }
200                if (st.hasMoreTokens()) {
201                    tmp = st.nextToken();
202                    if (tmp.length() > 0)
203                        sleeping = Long.parseLong(tmp);
204                }
205 
206                if ((deathDetected < 10) || (sleeping < 10))
207                    throw new IllegalArgumentException("Timers to small : " + parameter);
208 
209                if (deathDetected > 0L)
210                    _deathDetected = deathDetected * 1000L;
211                if (sleeping > 0L)
212                    _sleepTimer = sleeping * 1000L;
213 
214            } catch (Exception ee) {
215                _log.warn("WatchdogThread : illegal arguments [" + parameter + "] (using defaults) " + ee.getMessage());
216            }
217            new Thread(this, "watchdog").start();
218            _log.info("WatchdogThread initialized with : " + this);
219        }
220 
221        public void run() {
222            _log.info("watchdog thread activated");
223            while (true) {
224                try {
225                    Thread.sleep(_sleepTimer);
226                } catch (InterruptedException e) {
227                    _log.info("watchdog thread interrupted");
228                    break;
229                }
230                runWatchdogSequence(_deathDetected);
231                _watchdogSequenceCounter++;
232            }
233            _log.info("watchdog finished");
234        }
235 
236        @Override
237        public String toString() {
238            return "DeathDetection=" + (_deathDetected / 1000L) + ";Sleep="
239                    + (_sleepTimer / 1000L) + ";Counter="
240                    + _watchdogSequenceCounter + ";";
241        }
242    }
243 
244    public PoolManagerPoolModeMessage
245        messageArrived(PoolManagerPoolModeMessage msg)
246    {
247        PoolSelectionUnit.SelectionPool pool = _selectionUnit.getPool(msg
248                .getPoolName());
249        if (pool == null) {
250            msg.setFailed(563, "Pool not found : " + msg.getPoolName());
251        } else if (msg.getPoolMode() == PoolManagerPoolModeMessage.UNDEFINED) {
252            //
253            // get pool mode
254            //
255            msg.setPoolMode(PoolManagerPoolModeMessage.READ | (pool.isReadOnly() ? 0 : PoolManagerPoolModeMessage.WRITE));
256        } else {
257            //
258            // set pool mode
259            //
260            pool.setReadOnly((msg.getPoolMode() & PoolManagerPoolModeMessage.WRITE) == 0);
261        }
262 
263        msg.setSucceeded();
264        return msg;
265    }
266 
267    private void runWatchdogSequence(long deathDetectedTimer)
268    {
269        for (String name : _selectionUnit.getDefinedPools(false)) {
270            PoolSelectionUnit.SelectionPool pool = _selectionUnit.getPool(name);
271            if (pool != null) {
272                if (pool.getActive() > deathDetectedTimer
273                    && pool.setSerialId(0L)) {
274 
275                    if( _logPoolMonitor.isDebugEnabled() ) {
276                        _logPoolMonitor.debug("Pool " + name + " declared as DOWN (no ping in " + deathDetectedTimer/1000 +" seconds).");
277                    }
278                    _requestContainer.poolStatusChanged(name, PoolStatusChangedMessage.DOWN);
279                    sendPoolStatusRelay(name, PoolStatusChangedMessage.DOWN,
280                                        null, 666, "DEAD");
281                }
282            }
283        }
284    }
285 
286    @Override
287    public void getInfo( PrintWriter pw ){
288        pw.println("PoolManager V [$Id: PoolManagerV5.java,v 1.48 2007-10-10 08:05:34 tigran Exp $]");
289        pw.println(" SelectionUnit : "+_selectionUnit.getVersion() ) ;
290        pw.println(" Write Threads : "+_writeThreads) ;
291        pw.println(" Read  Threads : "+_readThreads) ;
292        pw.println("  Pool Timeout : "+_poolMonitor.getPoolTimeout()/1000L) ;
293        pw.println("Message counts") ;
294        pw.println("           PoolUp : "+_counterPoolUp ) ;
295        pw.println("   SelectReadPool : "+_counterSelectReadPool ) ;
296        pw.println("  SelectWritePool : "+_counterSelectWritePool ) ;
297        if( _watchdog == null ){
298             pw.println("         Watchdog : disabled" ) ;
299        }else{
300             pw.println("         Watchdog : "+_watchdog ) ;
301        }
302    }
303    public String hh_set_max_threads = " # DEPRICATED         " ;
304    public String ac_set_max_threads_$_1( Args args )throws CommandException{
305      return "" ;
306    }
307    public String hh_set_timeout_pool = "[-read] [-write] <timeout/secs>" ;
308    public String ac_set_timeout_pool_$_1( Args args )throws CommandException{
309       boolean isWrite = args.getOpt("write") != null ;
310       boolean isRead  = args.getOpt("read")  != null ;
311       long    timeout = Integer.parseInt(args.argv(0)) * 1000 ;
312       if( ( ! isWrite ) && ( ! isRead ) ){
313          _readPoolTimeout = _writePoolTimeout = timeout ;
314          _poolMonitor.setPoolTimeout(_readPoolTimeout);
315          return "" ;
316       }
317       if( isWrite )_writePoolTimeout = timeout ;
318       if( isRead  ){
319          _readPoolTimeout = timeout ;
320          _poolMonitor.setPoolTimeout(_readPoolTimeout);
321       }
322       return "" ;
323    }
324    public String hh_set_timeout_pnfs = "<timeout/secs>" ;
325    public String ac_set_timeout_pnfs_$_1( Args args )throws CommandException{
326       _pnfsTimeout = Integer.parseInt(args.argv(0)) * 1000 ;
327       return "" ;
328    }
329    public String hh_set_timeout_fetch = "<timeout/min>" ;
330    public String ac_set_timeout_fetch_$_1( Args args )throws CommandException{
331       _poolFetchTimeout = Integer.parseInt(args.argv(0)) * 1000 * 60 ;
332       return "" ;
333    }
334    public String hh_getpoolsbylink = "<linkName> [-size=<filesize>]" ;
335    public String ac_getpoolsbylink_$_1( Args args )throws Exception {
336       String sizeString = args.getOpt("size") ;
337       long size = sizeString == null ? 50000000L : Long.parseLong( sizeString ) ;
338       String linkName = args.argv(0) ;
339 
340       List list = _poolMonitor.queryPoolsByLinkName( linkName , size ) ;
341 
342       StringBuffer sb = new StringBuffer() ;
343       for( Iterator i = list.iterator() ; i.hasNext() ; ){
344          sb.append( i.next().toString() ).append("\n");
345       }
346       return sb.toString() ;
347    }
348 
349    public synchronized
350        void messageArrived(PoolManagerPoolUpMessage poolMessage)
351    {
352        _counterPoolUp++;
353 
354        String poolName = poolMessage.getPoolName();
355        PoolSelectionUnit.SelectionPool pool =
356            _selectionUnit.getPool(poolName, true);
357 
358        PoolV2Mode newMode = poolMessage.getPoolMode();
359        PoolV2Mode oldMode = pool.getPoolMode();
360 
361        if (_logPoolMonitor.isDebugEnabled()) {
362            _logPoolMonitor.debug("PoolUp message from " + poolName
363                                  + " with mode " + newMode
364                                  + " and serialId " + poolMessage.getSerialId());
365        }
366 
367        /* For compatibility with previous versions of dCache, a pool
368         * marked DISABLED, but without any other DISABLED_ flags set
369         * is considered fully disabled.
370         */
371        boolean disabled =
372            newMode.getMode() == PoolV2Mode.DISABLED
373            || newMode.isDisabled(PoolV2Mode.DISABLED_DEAD)
374            || newMode.isDisabled(PoolV2Mode.DISABLED_STRICT);
375 
376        /* By convention, the serial number is set to zero when a pool
377         * is disabled. This is used by the watchdog to identify, that
378         * we have already announced that the pool is down.
379         */
380        long serial = disabled ? 0 : poolMessage.getSerialId();
381 
382        /* Any change in the kind of operations a pool might be able
383         * to perform has to be propagated to a number of other
384         * components.
385         *
386         * Notice that calling setSerialId has a side-effect, which is
387         * why we call it first.
388         */
389        boolean changed =
390            pool.setSerialId(serial)
391            || pool.isActive() == disabled
392            || (newMode.getMode() != oldMode.getMode())
393            || !pool.getHsmInstances().equals(poolMessage.getHsmInstances());
394 
395        pool.setPoolMode(newMode);
396        pool.setHsmInstances(poolMessage.getHsmInstances());
397        pool.setActive(!disabled);
398 
399        /* Notify others in case the pool status has changed. Due to
400         * limitations of the PoolStatusChangedMessage, we will often
401         * send a RESTART notification, when in fact only the pool
402         * mode has changed.
403         */
404        if (changed) {
405            _logPoolMonitor.info("Pool " + poolName + " changed from mode "
406                                 + oldMode + " to " + newMode);
407 
408            if (disabled) {
409                _requestContainer.poolStatusChanged(poolName,
410                                                    PoolStatusChangedMessage.DOWN);
411                sendPoolStatusRelay(poolName, PoolStatusChangedMessage.DOWN,
412                                    poolMessage.getPoolMode(),
413                                    poolMessage.getCode(),
414                                    poolMessage.getMessage());
415            } else {
416                _requestContainer.poolStatusChanged(poolName,
417                                                    PoolStatusChangedMessage.UP);
418                sendPoolStatusRelay(poolName, PoolStatusChangedMessage.RESTART);
419            }
420        }
421    }
422 
423    private void sendPoolStatusRelay( String poolName , int status ){
424       sendPoolStatusRelay( poolName , status , null , 0 , null ) ;
425    }
426    private void sendPoolStatusRelay( String poolName , int status ,
427                                      PoolV2Mode poolMode ,
428                                      int statusCode , String statusMessage ){
429 
430       if( _poolStatusRelayPath == null )return ;
431 
432       try{
433 
434          PoolStatusChangedMessage msg = new PoolStatusChangedMessage( poolName , status ) ;
435          msg.setPoolMode( poolMode ) ;
436          msg.setDetail( statusCode , statusMessage ) ;
437          _log.info("sendPoolStatusRelay : "+msg);
438          sendMessage(
439               new CellMessage( _poolStatusRelayPath , msg )
440                     ) ;
441 
442       }catch(Exception ee ){
443          _log.warn("Failed to send poolStatus changed message : "+ee ) ;
444       }
445    }
446 
447    public PoolMgrGetPoolLinkGroups
448        messageArrived(PoolMgrGetPoolLinkGroups msg)
449    {
450        Collection<PoolLinkGroupInfo> linkGroupInfos = Utils.linkGroupInfos(_selectionUnit, _costModule).values();
451 
452            PoolLinkGroupInfo[] poolLinkGroupInfos = linkGroupInfos.toArray(new PoolLinkGroupInfo[linkGroupInfos.size()]);
453            msg.setPoolLinkGroupInfos(poolLinkGroupInfos);
454        msg.setSucceeded();
455        return msg;
456    }
457 
458    public PoolManagerGetPoolListMessage
459        messageArrived(PoolManagerGetPoolListMessage msg)
460    {
461       String [] pools = _selectionUnit.getActivePools() ;
462       msg.setPoolList(Arrays.asList(pools)) ;
463       msg.setSucceeded();
464       return msg;
465    }
466 
467    public PoolMgrGetPoolByLink
468        messageArrived(PoolMgrGetPoolByLink msg)
469        throws CacheException
470    {
471        String linkName = msg.getLinkName();
472        long   filesize = msg.getFilesize();
473 
474        try {
475            List<PoolCostCheckable> pools =
476                _poolMonitor.queryPoolsByLinkName( linkName , filesize ) ;
477            if ((pools == null) || pools.isEmpty())
478                throw new CacheException(57, "No appropriate pools found for link: "+linkName);
479            msg.setPoolName(pools.get(0).getPoolName());
480            msg.setSucceeded();
481        } catch (InterruptedException e) {
482            throw new CacheException(CacheException.UNEXPECTED_SYSTEM_EXCEPTION,
483                                     "Pool manager is shutting down");
484        }
485        return msg;
486    }
487 
488    public PoolManagerGetPoolsByNameMessage
489        messageArrived(PoolManagerGetPoolsByNameMessage msg)
490        throws CacheException
491    {
492        try {
493            List<PoolManagerPoolInformation> pools = new ArrayList();
494            for (String name: msg.getPoolNames()) {
495                try {
496                    pools.add(_poolMonitor.getPoolInformation(name));
497                } catch (NoSuchElementException e) {
498                    /* Don't include a pool that doesn't exist.
499                     */
500                }
501            }
502            msg.setPools(pools);
503            msg.setSucceeded();
504        } catch (InterruptedException e) {
505            throw new CacheException(CacheException.UNEXPECTED_SYSTEM_EXCEPTION,
506                                     "Pool manager is shutting down");
507        }
508        return msg;
509    }
510 
511    public PoolManagerGetPoolsByLinkMessage
512        messageArrived(PoolManagerGetPoolsByLinkMessage msg)
513        throws CacheException
514    {
515        try {
516            msg.setPools(_poolMonitor.getPoolsByLink(msg.getLink()));
517            msg.setSucceeded();
518        } catch (InterruptedException e) {
519            throw new CacheException(CacheException.UNEXPECTED_SYSTEM_EXCEPTION,
520                                     "Pool manager is shutting down");
521        } catch (NoSuchElementException e) {
522            Collection<PoolManagerPoolInformation> empty =
523                Collections.emptyList();
524            msg.setPools(empty);
525            msg.setSucceeded();
526        }
527        return msg;
528    }
529 
530    public PoolManagerGetPoolsByPoolGroupMessage
531        messageArrived(PoolManagerGetPoolsByPoolGroupMessage msg)
532        throws CacheException
533    {
534        try {
535            msg.setPools(_poolMonitor.getPoolsByPoolGroup(msg.getPoolGroup()));
536            msg.setSucceeded();
537        } catch (InterruptedException e) {
538            throw new CacheException(CacheException.UNEXPECTED_SYSTEM_EXCEPTION,
539                                     "Pool manager is shutting down");
540        } catch (NoSuchElementException e) {
541            Collection<PoolManagerPoolInformation> empty =
542                Collections.emptyList();
543            msg.setPools(empty);
544            msg.setSucceeded();
545        }
546        return msg;
547    }
548 
549    public PoolMgrQueryPoolsMsg
550        messageArrived(PoolMgrQueryPoolsMsg msg)
551    {
552        DirectionType accessType = msg.getAccessType();
553        msg.setPoolList(PoolPreferenceLevel.fromPoolPreferenceLevelToList(
554           _selectionUnit.match(accessType,
555                                msg.getNetUnitName(),
556                                msg.getProtocolUnitName(),
557                                msg.getStorageInfo(),
558                                null)));
559        msg.setSucceeded();
560        return msg;
561    }
562 
563    private static class XProtocolInfo implements IpProtocolInfo {
564       private String [] _host = new String[1] ;
565 
566       private static final long serialVersionUID = -5817364111427851052L;
567 
568       private XProtocolInfo( String hostName ){
569          _host[0] = hostName ;
570       }
571       public String getProtocol(){ return "DCap" ; }
572       public int    getMinorVersion(){ return 0 ; }
573       public int    getMajorVersion(){ return 0 ; }
574       public String getVersionString(){ return "0.0" ; }
575       public String [] getHosts(){ return _host ; }
576       public int       getPort(){ return 0 ; }
577       public boolean isFileCheckRequired() { return true; }
578    }
579    private static class XStorageInfo extends GenericStorageInfo {
580 
581       private static final long serialVersionUID = -6624549402952279903L;
582 
583       private XStorageInfo( String hsm , String storageClass ){
584               super(hsm,storageClass);
585       }
586       @Override
587    public String getBitfileId(){ return "" ; }
588       @Override
589    public long   getFileSize(){ return 100 ; }
590       @Override
591    public void   setFileSize( long fileSize ){}
592       @Override
593    public boolean isStored(){ return true ; }
594 
595    }
596    public String hh_get_av_pools = "<pnfsId> <hsm> <storageClass> <host>" ;
597    public String ac_get_av_pools_$_4( Args args ) throws Exception {
598       try{
599          PnfsId pnfsId = new PnfsId( args.argv(0) ) ;
600          XStorageInfo storageInfo = new XStorageInfo( args.argv(1) , args.argv(2) ) ;
601          XProtocolInfo protocolInfo = new XProtocolInfo( args.argv(3) ) ;
602 
603          PoolMonitorV5.PnfsFileLocation  _pnfsFileLocation =
604                    _poolMonitor.getPnfsFileLocation( pnfsId ,
605                                                      storageInfo ,
606                                                      protocolInfo, null ) ;
607 
608          List available = _pnfsFileLocation.getFileAvailableMatrix() ;
609 
610          Iterator i = ((List)available.get(0)).iterator() ;
611          StringBuffer sb = new StringBuffer() ;
612          sb.append("Available and allowed\n");
613          while( i.hasNext() ){
614             sb.append("  ").append( i.next().toString() ).append("\n");
615          }
616          sb.append("Allowed (not available)\n");
617          if( ( available = _pnfsFileLocation.getAllowedButNotAvailable() ) != null ){
618             i = available.iterator() ;
619             while( i.hasNext() ){
620                sb.append("  ").append( i.next().toString() ).append("\n");
621             }
622          }
623          return sb.toString() ;
624 
625       }catch( Exception ee ){
626 
627          ee.printStackTrace() ;
628          throw ee ;
629       }
630    }
631    /*
632    public String hh_get_pools = "<hsm> <storageClass> <host>"+
633                                 " [-size=<size>] [-mode=stage|store]" ;
634    public String ac_get_pools_$_3( Args args ) throws Exception {
635       String mode = args.getOpt("mode") ;
636       mode = mode == null ? "stage" : mode ;
637       long size = 0L ;
638       String sizeString = args.getOpt("size") ;
639       if( sizeString != null )size = Long.parseLong(sizeString);
640       try{
641          XStorageInfo storageInfo = new XStorageInfo( args.argv(0) , args.argv(1) ) ;
642          XProtocolInfo protocolInfo = new XProtocolInfo( args.argv(2) ) ;
643 
644          List list = mode.equals("stage") ?
645                      _poolMonitor.getStagePoolList( storageInfo , protocolInfo , size ) :
646                      _poolMonitor.getStorePoolList( storageInfo , protocolInfo , size ) ;
647 
648          Iterator i = list.iterator() ;
649          StringBuffer sb = new StringBuffer() ;
650          while( i.hasNext() ){
651             sb.append( i.next().toString() ).append("\n");
652          }
653          return sb.toString() ;
654 
655       }catch( Exception ee ){
656 
657          ee.printStackTrace() ;
658          throw ee ;
659       }
660    }
661    */
662 
663    private boolean quotasExceeded( StorageInfo info ){
664 
665       String storageClass = info.getStorageClass()+"@"+info.getHsm() ;
666 
667       QuotaMgrCheckQuotaMessage quotas = new QuotaMgrCheckQuotaMessage( storageClass ) ;
668       CellMessage msg = new CellMessage( new CellPath(_quotaManager) , quotas ) ;
669       try{
670           msg = sendAndWait( msg , 20000L ) ;
671           if( msg == null ){
672              _log.warn("quotasExceeded of "+storageClass+" : request timed out");
673              return false ;
674           }
675           Object obj = msg.getMessageObject() ;
676           if( ! (obj instanceof QuotaMgrCheckQuotaMessage ) ){
677              _log.warn("quotasExceeded of "+storageClass+" : unexpected object arrived : "+obj.getClass().getName());
678              return false ;
679           }
680 
681           return ((QuotaMgrCheckQuotaMessage)obj).isHardQuotaExceeded() ;
682 
683       }catch(Exception ee ){
684 
685           _log.warn( "quotasExceeded of "+storageClass+" : Exception : "+ee);
686           _log.warn(ee.toString());
687           return false ;
688       }
689 
690    }
691 
692    private long determineExpectedFileSize(long expectedLength, StorageInfo storageInfo)
693    {
694        if (expectedLength > 0) {
695            return expectedLength;
696        }
697 
698        if (storageInfo.getFileSize() > 0) {
699            return storageInfo.getFileSize();
700        }
701 
702        String s = storageInfo.getKey("alloc-size");
703        if (s != null) {
704            try {
705                return Long.parseLong(s);
706            } catch (NumberFormatException e) {
707                // bad values are ignored
708            }
709        }
710 
711        return 0;
712    }
713 
714    public DelayedReply messageArrived(PoolManagerSelectLinkGroupForWriteMessage message)
715        throws CacheException
716    {
717        if (message.getStorageInfo() == null) {
718            throw new IllegalArgumentException("Storage info is missing");
719        }
720        if (message.getProtocolInfo() == null ){
721            throw new IllegalArgumentException("Protocol info is missing");
722        }
723 
724        return new LinkGroupSelectionTask(message);
725    }
726 
727    /**
728     * Task for processing link group selection messages.
729     */
730    public class LinkGroupSelectionTask
731        extends DelayedReply
732        implements Runnable
733    {
734        private final PoolManagerSelectLinkGroupForWriteMessage _message;
735        private final CDC _cdc;
736 
737        public LinkGroupSelectionTask(PoolManagerSelectLinkGroupForWriteMessage message)
738        {
739            _message = message;
740            _cdc = new CDC();
741            new Thread(this, "LinkGroupSelectionTask").start();
742        }
743 
744        public void run()
745        {
746            long started = System.currentTimeMillis();
747            _cdc.apply();
748            try {
749                _log.info("Select link group handler started");
750 
751                _message.setLinkGroups(selectLinkGroups());
752                _message.setSucceeded();
753 
754                _log.info("Select link group handler finished after {} ms",
755                          (System.currentTimeMillis() - started));
756            } catch (Exception e) {
757                _message.setFailed(CacheException.UNEXPECTED_SYSTEM_EXCEPTION,
758                                   e.getMessage());
759            } finally {
760                try {
761                    send(_message);
762                } catch (NoRouteToCellException e) {
763                    _log.error("Failed to send reply: " + e.getMessage());
764                } catch (InterruptedException e) {
765                    _log.warn("Link group selection handler was interrupted");
766                } finally {
767                    CDC.clear();
768                }
769            }
770        }
771 
772        protected List<String> selectLinkGroups()
773        {
774            StorageInfo storageInfo = _message.getStorageInfo();
775            ProtocolInfo protocolInfo = _message.getProtocolInfo();
776            long expectedLength =
777                determineExpectedFileSize(_message.getFileSize(), storageInfo);
778            String protocol =
779                protocolInfo.getProtocol() + "/" + protocolInfo.getMajorVersion();
780            String hostName =
781                (protocolInfo instanceof IpProtocolInfo)
782                ? ((IpProtocolInfo) protocolInfo).getHosts()[0]
783                : null;
784 
785            Collection<String> linkGroups = _message.getLinkGroups();
786            if (linkGroups == null) {
787                linkGroups =
788                    Utils.linkGroupInfos(_selectionUnit, _costModule).keySet();
789            }
790 
791            List<String> outputLinkGroups =
792                new ArrayList<String>(linkGroups.size());
793 
794            for (String linkGroup: linkGroups) {
795                PoolPreferenceLevel [] level =
796                    _selectionUnit.match(DirectionType.WRITE,
797                                         hostName,
798                                         protocol,
799                                         storageInfo,
800                                         linkGroup);
801                if (level.length > 0) {
802                    outputLinkGroups.add(linkGroup);
803                }
804            }
805 
806            return outputLinkGroups;
807        }
808    }
809 
810    ///////////////////////////////////////////////////////////////
811    //
812    // the write io request handler
813    //
814    public DelayedReply messageArrived(CellMessage envelope,
815                                       PoolMgrSelectWritePoolMsg msg)
816    {
817        return new WriteRequestHandler(envelope, msg);
818    }
819 
820    public class WriteRequestHandler extends DelayedReply implements Runnable
821    {
822        private CellMessage _envelope;
823        private PoolMgrSelectWritePoolMsg _request;
824        private PnfsId _pnfsId;
825 
826        public WriteRequestHandler(CellMessage envelope,
827                                   PoolMgrSelectWritePoolMsg msg)
828        {
829            _envelope = envelope;
830            _request = msg;
831            _pnfsId = _request.getPnfsId();
832            new Thread(this, "writeHandler").start();
833        }
834 
835       public void run(){
836 
837           StorageInfo  storageInfo  = _request.getStorageInfo() ;
838           ProtocolInfo protocolInfo = _request.getProtocolInfo() ;
839 
840           _log.info( _pnfsId.toString()+" write handler started" );
841           long started = System.currentTimeMillis();
842 
843           if( storageInfo == null ){
844              requestFailed( 21 , "Storage info not available for write request : "+_pnfsId ) ;
845              return ;
846           }else if( protocolInfo == null ){
847              requestFailed( 22 , "Protocol info not available for write request : "+_pnfsId ) ;
848              return ;
849           }
850           if( _quotasEnabled && quotasExceeded( storageInfo ) ){
851              requestFailed( 55 , "Quotas Exceeded for StorageClass : "+storageInfo.getStorageClass() ) ;
852              return ;
853           }
854 
855           long expectedLength =
856               determineExpectedFileSize(_request.getFileSize(), storageInfo);
857 
858           /* The cost module relies on the expected file size.
859            */
860           _request.setFileSize(expectedLength);
861 
862           try{
863 
864              List<PoolCostCheckable> storeList = _poolMonitor.
865                               getPnfsFileLocation( _pnfsId , storageInfo , protocolInfo, _request.getLinkGroup() ).
866                               getStorePoolList( expectedLength ) ;
867              /*
868              List storeList =
869                  _poolMonitor.getStorePoolList(  storageInfo ,
870                                                  protocolInfo ,
871                                                  expectedLength );
872              */
873              String poolName = storeList.get(0).getPoolName() ;
874 
875              if (_sendCostInfo)
876                    _requestContainer.sendCostMsg(
877                             _pnfsId, storeList.get(0), true
878                                                 );        //VP
879 
880              _log.info(_pnfsId+" write handler selected "+poolName+" after "+
881                  ( System.currentTimeMillis() - started ) );
882              requestSucceeded( poolName ) ;
883 
884           }catch(CacheException ce ){
885              requestFailed( ce.getRc() , ce.getMessage() ) ;
886           }catch(Exception ee ){
887              requestFailed( 17 , ee.getMessage() ) ;
888           }
889       }
890 
891        protected void requestFailed(int errorCode, String errorMessage)
892        {
893            _request.setFailed(errorCode, errorMessage);
894            try {
895                send(_request);
896            } catch (Exception e) {
897                _log.warn("Exception requestFailed : " + e, e);
898            }
899        }
900 
901        protected void requestSucceeded(String poolName)
902        {
903            _request.setPoolName(poolName);
904            _request.setSucceeded();
905            try {
906                send(_request);
907                _costModule.messageArrived(_envelope);
908            } catch (Exception e) {
909                _log.warn("Exception in requestSucceeded : " + e, e);
910            }
911        }
912    }
913 
914    public String ac_free_$_0(Args args) {
915 
916 
917            Map<String, PoolLinkGroupInfo> linkGroupSize = Utils.linkGroupInfos(_selectionUnit, _costModule);
918 
919            StringBuilder sb = new StringBuilder();
920 
921            for(Map.Entry<String, PoolLinkGroupInfo> linkGourp: linkGroupSize.entrySet() ) {
922                    sb.append(linkGourp.getKey()).append(" : ")
923                            .append(linkGourp.getValue().getAvailableSpaceInBytes() ).append("\n");
924            }
925 
926            return sb.toString();
927 
928    }
929}

[all classes][diskCacheV111.poolManager]
EMMA 2.0.5312 (C) Vladimir Roubtsov