EMMA Coverage Report (generated Mon Aug 23 17:21:34 CEST 2010)
[all classes][diskCacheV111.poolManager]

COVERAGE SUMMARY FOR SOURCE FILE [PoolMonitorV5.java]

nameclass, %method, %block, %line, %
PoolMonitorV5.java50%  (1/2)15%  (7/48)3%   (36/1430)6%   (16/259)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class PoolMonitorV5$PnfsFileLocation0%   (0/1)0%   (0/19)0%   (0/758)0%   (0/122)
PoolMonitorV5$PnfsFileLocation (PoolMonitorV5, PnfsId, StorageInfo, ProtocolI... 0%   (0/1)0%   (0/27)0%   (0/9)
PoolMonitorV5$PnfsFileLocation (PoolMonitorV5, PnfsId, StorageInfo, ProtocolI... 0%   (0/1)0%   (0/8)0%   (0/1)
calculateFileAvailableMatrix (): void 0%   (0/1)0%   (0/249)0%   (0/34)
clear (): void 0%   (0/1)0%   (0/7)0%   (0/3)
getAcknowledgedPnfsPools (): List 0%   (0/1)0%   (0/8)0%   (0/3)
getAllowedButNotAvailable (): List 0%   (0/1)0%   (0/2)0%   (0/1)
getAllowedPoolCount (): int 0%   (0/1)0%   (0/3)0%   (0/1)
getAvailablePoolCount (): int 0%   (0/1)0%   (0/3)0%   (0/1)
getCostSortedAvailable (): List 0%   (0/1)0%   (0/17)0%   (0/5)
getCurrentParameterSet (): PoolManagerParameter 0%   (0/1)0%   (0/6)0%   (0/1)
getFetchPoolMatrix (PoolSelectionUnit$DirectionType, StorageInfo, ProtocolInf... 0%   (0/1)0%   (0/203)0%   (0/32)
getFileAvailableMatrix (): List 0%   (0/1)0%   (0/8)0%   (0/3)
getListOfParameter (): List 0%   (0/1)0%   (0/3)0%   (0/1)
getStagePoolMatrix (StorageInfo, ProtocolInfo, long): List 0%   (0/1)0%   (0/7)0%   (0/1)
getStorePoolList (StorageInfo, ProtocolInfo, long): List 0%   (0/1)0%   (0/169)0%   (0/19)
getStorePoolList (long): List 0%   (0/1)0%   (0/8)0%   (0/1)
say (String): void 0%   (0/1)0%   (0/16)0%   (0/2)
sortByCost (List, boolean): void 0%   (0/1)0%   (0/7)0%   (0/2)
sortByCost (List, boolean, PoolManagerParameter): void 0%   (0/1)0%   (0/7)0%   (0/2)
     
class PoolMonitorV5100% (1/1)24%  (7/29)5%   (36/672)12%  (16/137)
access$100 (PoolMonitorV5): PnfsHandler 0%   (0/1)0%   (0/3)0%   (0/1)
access$200 (PoolMonitorV5, Iterator, PnfsId, long, boolean): List 0%   (0/1)0%   (0/7)0%   (0/1)
access$300 (PoolMonitorV5): PoolSelectionUnit 0%   (0/1)0%   (0/3)0%   (0/1)
access$400 (PoolMonitorV5): PartitionManager 0%   (0/1)0%   (0/3)0%   (0/1)
access$500 (PoolMonitorV5, Iterator, long): List 0%   (0/1)0%   (0/5)0%   (0/1)
access$600 (): Logger 0%   (0/1)0%   (0/2)0%   (0/1)
access$700 (PoolMonitorV5): double 0%   (0/1)0%   (0/3)0%   (0/1)
calculateCost (PoolCostCheckable, boolean, PoolManagerParameter): double 0%   (0/1)0%   (0/23)0%   (0/3)
getCostComparator (boolean, PoolManagerParameter): Comparator 0%   (0/1)0%   (0/8)0%   (0/1)
getPnfsFileLocation (PnfsId, StorageInfo, ProtocolInfo, String): PoolMonitorV... 0%   (0/1)0%   (0/10)0%   (0/1)
getPoolInformation (Iterator): Collection 0%   (0/1)0%   (0/18)0%   (0/4)
getPoolInformation (PoolSelectionUnit$SelectionPool): PoolManagerPoolInformation 0%   (0/1)0%   (0/42)0%   (0/10)
getPoolInformation (String): PoolManagerPoolInformation 0%   (0/1)0%   (0/23)0%   (0/4)
getPoolTimeout (): long 0%   (0/1)0%   (0/3)0%   (0/1)
getPoolsByLink (String): Collection 0%   (0/1)0%   (0/10)0%   (0/2)
getPoolsByPoolGroup (String): Collection 0%   (0/1)0%   (0/10)0%   (0/2)
getPoolsPercentilePerformanceCost (double): double 0%   (0/1)0%   (0/5)0%   (0/1)
messageToCostModule (CellMessage): void 0%   (0/1)0%   (0/5)0%   (0/2)
queryPoolsByLinkName (String, long): List 0%   (0/1)0%   (0/42)0%   (0/8)
queryPoolsForCost (Iterator, long): List 0%   (0/1)0%   (0/165)0%   (0/29)
queryPoolsForPnfsId (Iterator, PnfsId, long, boolean): List 0%   (0/1)0%   (0/234)0%   (0/49)
ssortByCost (List, boolean, PoolManagerParameter): void 0%   (0/1)0%   (0/12)0%   (0/3)
<static initializer> 100% (1/1)100% (4/4)100% (1/1)
PoolMonitorV5 (): void 100% (1/1)100% (12/12)100% (5/5)
setCostModule (CostModule): void 100% (1/1)100% (4/4)100% (2/2)
setPartitionManager (PartitionManager): void 100% (1/1)100% (4/4)100% (2/2)
setPnfsHandler (PnfsHandler): void 100% (1/1)100% (4/4)100% (2/2)
setPoolSelectionUnit (PoolSelectionUnit): void 100% (1/1)100% (4/4)100% (2/2)
setPoolTimeout (long): void 100% (1/1)100% (4/4)100% (2/2)

1//  $Id: PoolMonitorV5.java,v 1.32 2007-08-01 20:00:45 tigran Exp $
2 
3package diskCacheV111.poolManager ;
4 
5import java.util.ArrayList;
6import java.util.Collection;
7import java.util.Collections;
8import java.util.Comparator;
9import java.util.HashMap;
10import java.util.HashSet;
11import java.util.Iterator;
12import java.util.List;
13import java.util.Map;
14import java.util.Set;
15import java.util.NoSuchElementException;
16 
17import org.slf4j.Logger;
18import org.slf4j.LoggerFactory;
19import org.dcache.cells.AbstractCellComponent;
20 
21import diskCacheV111.poolManager.PoolSelectionUnit.DirectionType;
22import diskCacheV111.util.CacheException;
23import diskCacheV111.util.PnfsHandler;
24import diskCacheV111.util.PnfsId;
25import diskCacheV111.util.SpreadAndWait;
26import diskCacheV111.vehicles.IpProtocolInfo;
27import diskCacheV111.vehicles.PoolCheckCostMessage;
28import diskCacheV111.vehicles.PoolCheckFileMessage;
29import diskCacheV111.vehicles.PoolCheckable;
30import diskCacheV111.vehicles.PoolCostCheckable;
31import diskCacheV111.vehicles.PoolManagerPoolInformation;
32import diskCacheV111.vehicles.ProtocolInfo;
33import diskCacheV111.vehicles.StorageInfo;
34import dmg.cells.nucleus.CellMessage;
35import dmg.cells.nucleus.CellPath;
36 
37public class PoolMonitorV5
38    extends AbstractCellComponent
39{
40    private final static Logger _log = LoggerFactory.getLogger(PoolMonitorV5.class);
41 
42   private long              _poolTimeout   = 15 * 1000;
43   private PoolSelectionUnit _selectionUnit ;
44   private PnfsHandler       _pnfsHandler   ;
45   private CostModule        _costModule    ;
46   private double            _maxWriteCost          = 1000000.0;
47   private PartitionManager  _partitionManager ;
48 
49    public PoolMonitorV5()
50    {
51    }
52 
53    public void setPoolSelectionUnit(PoolSelectionUnit selectionUnit)
54    {
55        _selectionUnit = selectionUnit;
56    }
57 
58    public void setPnfsHandler(PnfsHandler pnfsHandler)
59    {
60        _pnfsHandler = pnfsHandler;
61    }
62 
63    public void setCostModule(CostModule costModule)
64    {
65        _costModule = costModule;
66    }
67 
68    public void setPartitionManager(PartitionManager partitionManager)
69    {
70        _partitionManager = partitionManager;
71    }
72 
73   public void messageToCostModule( CellMessage cellMessage ){
74      _costModule.messageArrived(cellMessage);
75   }
76   public void setPoolTimeout( long poolTimeout ){
77      _poolTimeout = poolTimeout ;
78   }
79   /*
80   public void setSpaceCost( double spaceCost ){
81      _spaceCostFactor = spaceCost ;
82   }
83   public void setPerformanceCost( double performanceCost ){
84      _performanceCostFactor = performanceCost ;
85   }*/
86   public long getPoolTimeout(){ return _poolTimeout ;}
87    // output[0] -> Allowed and Available
88    // output[1] -> available but not allowed (sorted, cpu)
89    // output[2] -> allowed but not available (sorted, cpu + space)
90    // output[3] -> pools from pnfs
91    // output[4] -> List of List (all allowed pools)
92   public PnfsFileLocation getPnfsFileLocation(
93                               PnfsId pnfsId ,
94                               StorageInfo storageInfo ,
95                               ProtocolInfo protocolInfo, String linkGroup){
96 
97      return new PnfsFileLocation( pnfsId, storageInfo ,protocolInfo , linkGroup) ;
98 
99   }
100   public class PnfsFileLocation {
101 
102      private List<PoolManagerParameter> _listOfPartitions;
103      private List<List<PoolCostCheckable>> _allowedAndAvailableMatrix;
104      private List<PoolCostCheckable> _acknowledgedPnfsPools;
105      private int  _allowedPoolCount          = 0 ;
106      private int  _availablePoolCount        = 0 ;
107      private boolean  _calculationDone       = false ;
108 
109      private final PnfsId       _pnfsId       ;
110      private final StorageInfo  _storageInfo  ;
111      private final ProtocolInfo _protocolInfo ;
112      private final String _linkGroup          ;
113 
114      //private PoolManagerParameter _recentParameter = _partitionManager.getParameterCopyOf()  ;
115 
116      private PnfsFileLocation( PnfsId pnfsId ,
117                                StorageInfo storageInfo ,
118                                ProtocolInfo protocolInfo ,
119                                String linkGroup){
120 
121         _pnfsId       = pnfsId ;
122         _storageInfo  = storageInfo ;
123         _protocolInfo = protocolInfo ;
124         _linkGroup    = linkGroup;
125      }
126 
127       public List<PoolManagerParameter> getListOfParameter()
128       {
129           return _listOfPartitions;
130       }
131 
132      public void clear(){
133          _allowedAndAvailableMatrix = null ;
134          _calculationDone           = false ;
135      }
136 
137       public PoolManagerParameter getCurrentParameterSet()
138       {
139           return _listOfPartitions.get(0);
140       }
141 
142       public List getAllowedButNotAvailable()
143       {
144           return null;
145       }
146 
147       public List<PoolCostCheckable> getAcknowledgedPnfsPools()
148           throws CacheException, InterruptedException
149       {
150           if (_acknowledgedPnfsPools == null)
151               calculateFileAvailableMatrix();
152           return _acknowledgedPnfsPools;
153       }
154 
155       public int getAllowedPoolCount()
156       {
157           return _allowedPoolCount;
158       }
159 
160       public int getAvailablePoolCount()
161       {
162           return _availablePoolCount;
163       }
164 
165       public List<List<PoolCostCheckable>> getFileAvailableMatrix()
166           throws CacheException, InterruptedException
167       {
168           if (_allowedAndAvailableMatrix == null)
169               calculateFileAvailableMatrix();
170           return _allowedAndAvailableMatrix;
171       }
172      //
173      //   getFileAvailableList
174      //  -------------------------
175      //
176      //  expected  = getPoolsFromPnfs() ;
177      //  allowed[] = getAllowedFromConfiguration() ;
178      //
179      //   +----------------------------------------------------+
180      //   |  for i in  0,1,2,3,...                             |
181      //   |     result = intersection( expected , allowed[i] ) |
182      //   |     found  = CheckFileInPool( result)              |
183      //   |     if( found > 0 )break                           |
184      //   |     if( ! allowFallbackOnCost )break               |
185      //   |     if( minCost( found ) < MAX_COST )break         |
186      //   +----------------------------------------------------+
187      //   |                  found == 0                        |
188      //   |                      |                             |
189      //   |        yes           |             NO              |
190      //   |----------------------|-----------------------------|
191      //   | output[0] = empty    | [0] = SortCpuCost(found)    |
192      //   | output[1] = null     | [1] = null                  |
193      //   | output[2] = null     | [2] = null                  |
194      //   | output[3] = expected | [3] = expected              |
195      //   | output[4] = allowed  | [4] = allowed               |
196      //   +----------------------------------------------------+
197      //
198      //   preparePool2Pool
199      //  -------------------------
200      //
201      //  output[1] = SortCpuCost( CheckFileInPool( expected ) )
202      //
203      //   +----------------------------------------------------+
204      //   |                   output[0] > 0                    |
205      //   |                                                    |
206      //   |        yes              |             NO           |
207      //   |-------------------------|--------------------------|
208      //   | veto = Hash( output[0] )|                          |
209      //   |-------------------------|                          |
210      //   |for i in  0,1,2,3,.      | for i in  0,1,2,3,.      |
211      //   |  tmp = allowed[i]-veto  |   if(allowed[i]==0)cont  |
212      //   |  if( tmp == 0 )continue |                          |
213      //   |  out[2] =               |   out[2] =               |
214      //   |   SortCost(getCost(tmp))|     SortCost(getCost(    |
215      //   |                         |        allowed[i]))      |
216      //   |   break                 |   break                  |
217      //   +----------------------------------------------------+
218      //   |if(out[2] == 0)          |if(out[2] == 0)           |
219      //   |    out[2] = out[0]      |    out[2] = empty        |
220      //   +----------------------------------------------------+
221      //
222       /*
223        *   Input : storage info , pnfsid
224        *   Output :
225        *             _acknowledgedPnfsPools
226        *             _allowedAndAvailableMatrix
227        *             _allowedAndAvailable
228        */
229       private void calculateFileAvailableMatrix()
230           throws CacheException, InterruptedException
231       {
232 
233         if( _storageInfo == null )
234            throw new
235            CacheException(189,"Storage Info not available");
236 
237         String hostName     = _protocolInfo instanceof IpProtocolInfo  ?((IpProtocolInfo)_protocolInfo).getHosts()[0] : null ;
238         String protocolString = _protocolInfo.getProtocol() + "/" + _protocolInfo.getMajorVersion() ;
239         //
240         // will ask the PnfsManager for a hint
241         // about the pool locations of this
242         // pnfsId. Returns an enumeration of
243         // the possible pools.
244         //
245         List<String> expectedFromPnfs = _pnfsHandler.getCacheLocations( _pnfsId ) ;
246         say( "calculateFileAvailableMatrix _expectedFromPnfs : "+expectedFromPnfs ) ;
247         //
248         // check if pools are up and file is really there.
249         // (returns unsorted list of costs)
250         //
251         _acknowledgedPnfsPools =
252             queryPoolsForPnfsId(expectedFromPnfs.iterator(), _pnfsId, 0,
253                                 _protocolInfo.isFileCheckRequired());
254         say( "calculateFileAvailableMatrix _acknowledgedPnfsPools : "+_acknowledgedPnfsPools ) ;
255         Map<String, PoolCostCheckable> availableHash =
256             new HashMap<String, PoolCostCheckable>() ;
257         for( PoolCostCheckable cost: _acknowledgedPnfsPools ){
258            availableHash.put( cost.getPoolName() , cost ) ;
259         }
260         //
261         //  get the prioritized list of allowed pools for this
262         //  request. (We are only allowed to use the level-1
263         //  pools.
264         //
265         PoolPreferenceLevel [] level =
266             _selectionUnit.match( DirectionType.READ ,
267                                   hostName ,
268                                   protocolString ,
269                                   _storageInfo,
270                                   _linkGroup ) ;
271 
272         _listOfPartitions          = new ArrayList<PoolManagerParameter>();
273         _allowedAndAvailableMatrix = new ArrayList<List<PoolCostCheckable>>();
274         _allowedPoolCount          = 0 ;
275         _availablePoolCount        = 0 ;
276 
277         for( int prio = 0 ; prio < level.length ; prio++ ){
278 
279            List<String> poolList = level[prio].getPoolList() ;
280            //
281            //
282            PoolManagerParameter parameter = _partitionManager.getParameterCopyOf(level[prio].getTag()) ;
283            _listOfPartitions.add(  parameter ) ;
284            //
285            // get the allowed pools for this level and
286            // and add them to the result list only if
287            // they are really available.
288            //
289            say( "calculateFileAvailableMatrix : db matrix[*,"+prio+"] "+poolList);
290 
291            List<PoolCostCheckable> result =
292                new ArrayList<PoolCostCheckable>(poolList.size());
293            for (String poolName : poolList) {
294                PoolCostCheckable cost;
295                if ((cost = availableHash.get(poolName)) != null) {
296                    result.add(cost);
297                    _availablePoolCount++;
298                }
299                _allowedPoolCount++;
300            }
301 
302            sortByCost(result, false, parameter);
303 
304            say("calculateFileAvailableMatrix : av matrix[*," + prio + "] "
305                + result);
306 
307            _allowedAndAvailableMatrix.add(result);
308         }
309         //
310         // just in case, let us define a default parameter set
311         //
312         if( _listOfPartitions.size() == 0 )_listOfPartitions.add( _partitionManager.getParameterCopyOf() ) ;
313         //
314         _calculationDone = true ;
315         return  ;
316      }
317 
318       public List<PoolCostCheckable> getCostSortedAvailable()
319           throws CacheException, InterruptedException
320       {
321           //
322           // here we don't now exactly which parameter set to use.
323           //
324           if (!_calculationDone)
325               calculateFileAvailableMatrix();
326           List<PoolCostCheckable> list =
327               new ArrayList<PoolCostCheckable>(getAcknowledgedPnfsPools());
328           sortByCost(list, false);
329           return list;
330       }
331 
332       public List<List<PoolCostCheckable>>
333           getStagePoolMatrix(StorageInfo  storageInfo,
334                              ProtocolInfo protocolInfo,
335                              long         filesize)
336           throws CacheException, InterruptedException
337       {
338           return getFetchPoolMatrix(DirectionType.CACHE,
339                                     storageInfo,
340                                     protocolInfo,
341                                     filesize);
342       }
343 
344       public List<List<PoolCostCheckable>>
345           getFetchPoolMatrix(DirectionType       mode ,        /* cache, p2p */
346                              StorageInfo  storageInfo ,
347                              ProtocolInfo protocolInfo ,
348                              long         filesize  )
349           throws CacheException, InterruptedException
350       {
351 
352         String hostName     =
353                    protocolInfo instanceof IpProtocolInfo ?
354                    ((IpProtocolInfo)protocolInfo).getHosts()[0] :
355                    null ;
356 
357 
358         PoolPreferenceLevel [] level =
359             _selectionUnit.match( mode ,
360                                   hostName ,
361                                   protocolInfo.getProtocol()+"/"+protocolInfo.getMajorVersion() ,
362                                   storageInfo,
363                                   _linkGroup) ;
364         //
365         //
366         if( level.length == 0 )return new ArrayList<List<PoolCostCheckable>>() ;
367 
368         //
369         // Copy the matrix into a linear HashMap(keys).
370         // Exclude pools which contain the file.
371         //
372         List<PoolCostCheckable> acknowledged =
373             getAcknowledgedPnfsPools();
374         Map<String, PoolCostCheckable> poolMap =
375             new HashMap<String,PoolCostCheckable>();
376         Set<String> poolAvailableSet =
377             new HashSet<String>();
378         for (PoolCheckable pool : acknowledged)
379             poolAvailableSet.add(pool.getPoolName());
380         for (int prio = 0; prio < level.length; prio++) {
381            for (String poolName : level[prio].getPoolList()) {
382               //
383               // skip if pool already contains the file.
384               //
385               if (poolAvailableSet.contains(poolName))
386                   continue;
387 
388               poolMap.put(poolName, null);
389            }
390         }
391         //
392         // Add the costs to the pool list.
393         //
394         for (PoolCostCheckable cost :
395                  queryPoolsForCost(poolMap.keySet().iterator(), filesize)) {
396             poolMap.put(cost.getPoolName(), cost);
397         }
398         //
399         // Build a new matrix containing the Costs.
400         //
401         _listOfPartitions = new ArrayList<PoolManagerParameter>();
402         List<List<PoolCostCheckable>> costMatrix =
403             new ArrayList<List<PoolCostCheckable>>();
404         for (int prio = 0; prio < level.length; prio++) {
405             //
406             // skip empty level
407             //
408             PoolManagerParameter parameter =
409                 _partitionManager.getParameterCopyOf(level[prio].getTag());
410            _listOfPartitions.add(parameter);
411 
412             List<String> poolList = level[prio].getPoolList() ;
413             if( poolList.size() == 0 )continue ;
414 
415             List<PoolCostCheckable> row = new ArrayList<PoolCostCheckable>();
416             for (String pool : poolList) {
417                PoolCostCheckable cost = poolMap.get(pool);
418                if (cost != null)
419                    row.add(cost);
420             }
421             //
422             // skip if non of the pools is available
423             //
424             if( row.size() == 0 )continue ;
425             //
426             // sort according to (cpu & space) cost
427             //
428             sortByCost( row , true , parameter ) ;
429             //
430             // and add it to the matrix
431             //
432             costMatrix.add( row ) ;
433         }
434 
435         return costMatrix ;
436      }
437      private void say(String message ){
438          _log.debug("PFL ["+_pnfsId+"] : "+message);
439      }
440 
441       public List<PoolCostCheckable> getStorePoolList(long filesize)
442           throws CacheException, InterruptedException
443       {
444           return getStorePoolList(_storageInfo, _protocolInfo, filesize);
445       }
446 
447       private List<PoolCostCheckable>
448           getStorePoolList(StorageInfo  storageInfo,
449                            ProtocolInfo protocolInfo,
450                            long         filesize)
451           throws CacheException, InterruptedException
452       {
453         String  hostName    =
454                    protocolInfo instanceof IpProtocolInfo ?
455                    ((IpProtocolInfo)protocolInfo).getHosts()[0] :
456                    null ;
457         int  maxDepth      = 9999 ;
458         PoolPreferenceLevel [] level =
459             _selectionUnit.match( DirectionType.WRITE ,
460                                   hostName ,
461                                   protocolInfo.getProtocol()+"/"+protocolInfo.getMajorVersion() ,
462                                   storageInfo,
463                                   _linkGroup ) ;
464         //
465         // this is the final knock out.
466         //
467         if( level.length == 0 )
468            throw new
469            CacheException( 19 ,
470                             "No write pools configured for <"+ storageInfo +
471                                "> in the linkGroup " +
472                                ( _linkGroup == null ? "[none]" : _linkGroup) ) ;
473 
474         List<PoolCostCheckable> costs = null ;
475 
476         PoolManagerParameter parameter = null ;
477 
478         for( int prio = 0 ; prio < Math.min( maxDepth , level.length ) ; prio++ ){
479 
480            costs     = queryPoolsForCost( level[prio].getPoolList().iterator() , filesize ) ;
481 
482            parameter = _partitionManager.getParameterCopyOf(level[prio].getTag()) ;
483 
484            if( costs.size() != 0 )break ;
485         }
486 
487         if( costs == null || costs.size() == 0 )
488            throw new
489            CacheException( 20 ,
490                            "No write pool available for <"+ storageInfo +
491                                "> in the linkGroup " +
492                                ( _linkGroup == null ? "[none]" : _linkGroup));
493 
494         sortByCost( costs , true , parameter ) ;
495 
496         PoolCostCheckable check = costs.get(0) ;
497 
498         double lowestCost = calculateCost( check , true , parameter ) ;
499 
500         /* Notice that
501          *
502          *    !(lowestCost  <= _maxWriteCost)  != (lowerCost > _maxWriteCost)
503          *
504          * when using floating point calculations!
505          */
506         if( !(lowestCost  <= _maxWriteCost) )
507             throw new
508             CacheException( 21 , "Best pool <"+check.getPoolName()+
509                                  "> too high : "+lowestCost ) ;
510 
511         return costs ;
512      }
513 
514       public void sortByCost(List<PoolCostCheckable> list, boolean cpuAndSize)
515       {
516           sortByCost(list, cpuAndSize, getCurrentParameterSet());
517       }
518 
519       private void sortByCost(List<PoolCostCheckable> list, boolean cpuAndSize,
520                               PoolManagerParameter parameter)
521       {
522           ssortByCost(list, cpuAndSize, parameter);
523       }
524   }
525 
526    public void ssortByCost(List<PoolCostCheckable> list, boolean cpuAndSize,
527                            PoolManagerParameter parameter)
528    {
529        Collections.shuffle(list);
530        Collections.sort(list, new CostComparator(cpuAndSize, parameter));
531    }
532 
533    public Comparator<PoolCostCheckable>
534        getCostComparator(boolean both, PoolManagerParameter parameter)
535    {
536        return new CostComparator(both, parameter);
537    }
538 
539   public class CostComparator implements Comparator<PoolCostCheckable> {
540 
541       private final boolean              _useBoth;
542       private final PoolManagerParameter _para;
543       private CostComparator( boolean useBoth , PoolManagerParameter para ){
544         _useBoth = useBoth ;
545         _para    = para ;
546       }
547       public int compare(PoolCostCheckable check1, PoolCostCheckable check2)
548       {
549           return Double.compare(calculateCost(check1, _useBoth, _para),
550                                 calculateCost(check2, _useBoth, _para));
551       }
552    }
553    public double calculateCost( PoolCostCheckable checkable , boolean useBoth , PoolManagerParameter para ){
554       if( useBoth ){
555          return Math.abs(checkable.getSpaceCost())       * para._spaceCostFactor +
556                 Math.abs(checkable.getPerformanceCost()) * para._performanceCostFactor ;
557       }else{
558          return Math.abs(checkable.getPerformanceCost()) * para._performanceCostFactor ;
559       }
560    }
561    /*
562    public double getMinPerformanceCost( List list ){
563       double cost = 1000000.0 ;
564       for( int i = 0 ; i < list.size() ; i++ ){
565          double x = ((PoolCostCheckable)(list.get(i))).getPerformanceCost() ;
566          cost = Math.min( cost , x ) ;
567       }
568       return cost ;
569    }
570    */
571    //------------------------------------------------------------------------------
572    //
573    //  'queryPoolsForPnfsId' sends PoolCheckFileMessages to all pools
574    //  specified in the pool iterator. It waits until all replies
575    //  have arrived, the global timeout has expired or the thread
576    //  was interrupted.
577    //
578 
579    private List<PoolCostCheckable> queryPoolsForPnfsId(Iterator<String> pools,
580                                                        PnfsId pnfsId,
581                                                        long filesize,
582                                                        boolean checkFileExistence)
583        throws InterruptedException
584    {
585        List<PoolCostCheckable> list = new ArrayList<PoolCostCheckable>();
586 
587        if (checkFileExistence) {
588 
589            SpreadAndWait control = new SpreadAndWait(getCellEndpoint(),
590                    _poolTimeout);
591 
592            while (pools.hasNext()) {
593 
594                String poolName = pools.next();
595                //
596                // deselection inactive and disabled pools
597                //
598                PoolSelectionUnit.SelectionPool pool = _selectionUnit
599                        .getPool(poolName);
600                if ((pool == null) || !pool.canRead() || !pool.isActive())
601                    continue;
602 
603                _log.info("queryPoolsForPnfsId : PoolCheckFileRequest to : "
604                          + poolName);
605                //
606                // send query
607                //
608                CellMessage cellMessage = new CellMessage(
609                        new CellPath(poolName), new PoolCheckFileMessage(
610                                poolName, pnfsId));
611 
612                try {
613                    control.send(cellMessage);
614                } catch (Exception exc) {
615                    //
616                    // here we don't care about exceptions
617                    //
618                    _log.warn("Exception sending PoolCheckFileRequest to "
619                            + poolName + " : " + exc);
620                }
621            }
622 
623            //
624            // scan the replies
625            //
626            CellMessage answer = null;
627 
628            while ((answer = control.next()) != null) {
629 
630                Object message = answer.getMessageObject();
631 
632                if (!(message instanceof PoolCheckFileMessage)) {
633                    _log.warn("queryPoolsForPnfsId : Unexpected message from ("
634                         + answer.getSourcePath()
635                         + ") "
636                         + message.getClass());
637                    continue;
638                }
639 
640                PoolCheckFileMessage poolMessage =
641                    (PoolCheckFileMessage) message;
642                _log.info("queryPoolsForPnfsId : reply : " + poolMessage);
643 
644                boolean have = poolMessage.getHave();
645                String poolName = poolMessage.getPoolName();
646                if (have) {
647 
648                    PoolCostCheckable cost =
649                        _costModule.getPoolCost(poolName, filesize);
650                    if (cost != null) {
651                        PoolCheckAdapter check = new PoolCheckAdapter(cost);
652                        check.setHave(have);
653                        check.setPnfsId(pnfsId);
654                        list.add(check);
655                        _log.info("queryPoolsForPnfsId : returning : " + check);
656                    }
657                } else if (!poolMessage.getWaiting() && poolMessage.getReturnCode() == 0) {
658                    _log.warn("queryPoolsForPnfsId : clearingCacheLocation for pnfsId "
659                                    + pnfsId + " at pool " + poolName);
660                    _pnfsHandler.clearCacheLocation(pnfsId, poolName);
661                }
662            }
663 
664        } else {
665 
666            while ( pools.hasNext() ) {
667 
668                String poolName = pools.next();
669                PoolCostCheckable cost =
670                    _costModule.getPoolCost(poolName, filesize);
671                if (cost != null) {
672                    PoolCheckAdapter check = new PoolCheckAdapter(cost);
673                    check.setHave(true);
674                    check.setPnfsId(pnfsId);
675                    list.add(check);
676                }
677            }
678 
679        }
680 
681        _log.info("queryPoolsForPnfsId : number of valid replies : "
682                + list.size());
683        return list;
684 
685    }
686    public List<PoolCostCheckable>
687        queryPoolsByLinkName(String linkName, long filesize)
688        throws InterruptedException
689    {
690       List<String> pools = new ArrayList<String>() ;
691 
692       PoolSelectionUnit.SelectionLink link = _selectionUnit.getLinkByName( linkName ) ;
693       PoolManagerParameter       parameter = _partitionManager.getParameterCopyOf( link.getTag()  ) ;
694 
695       for( Iterator<PoolSelectionUnit.SelectionPool> i = link.pools() ; i.hasNext() ; ){
696          pools.add( i.next().getName() ) ;
697       }
698 
699       List<PoolCostCheckable> list =
700           queryPoolsForCost( pools.iterator() , filesize ) ;
701 
702       ssortByCost( list , true , parameter ) ;
703 
704       return list ;
705    }
706    private boolean _dontAskForCost = true ;
707    private List<PoolCostCheckable> queryPoolsForCost(Iterator<String> pools,
708                                                      long filesize)
709        throws InterruptedException
710    {
711        List<PoolCostCheckable> list = new ArrayList<PoolCostCheckable>();
712        SpreadAndWait control =
713            new SpreadAndWait(getCellEndpoint(), _poolTimeout);
714 
715        while( pools.hasNext() ){
716 
717            String poolName = pools.next();
718            PoolCostCheckable costCheck = _costModule.getPoolCost( poolName , filesize ) ;
719            if( costCheck != null ){
720               list.add( costCheck ) ;
721               _log.info( "queryPoolsForCost : costModule : "+poolName+" ("+filesize+") "+costCheck);
722            }else{
723               //
724               // send query
725               //
726               if( _dontAskForCost )continue ;
727               CellMessage  cellMessage =
728                      new CellMessage(  new CellPath(poolName),
729                                        new PoolCheckCostMessage(poolName,filesize)
730                                     );
731 
732               _log.info( "queryPoolsForCost : "+poolName+" query sent");
733               try{
734                  control.send( cellMessage ) ;
735               }catch(Exception exc){
736                  //
737                  // here we don't care about exceptions
738                  //
739                   _log.warn("queryPoolsForCost : Exception sending PoolCheckFileRequest to "+poolName+" : "+exc);
740               }
741            }
742 
743        }
744 
745        if( _dontAskForCost )return list ;
746 
747        //
748        // scan the replies
749        //
750        CellMessage answer = null ;
751 
752        while( ( answer = control.next() ) != null ){
753 
754           Object message = answer.getMessageObject();
755 
756           if( ! ( message instanceof PoolCostCheckable )){
757              _log.warn("queryPoolsForCost : Unexpected message from ("+
758                   answer.getSourcePath()+") "+message.getClass());
759              continue ;
760           }
761           PoolCostCheckable poolMessage = (PoolCostCheckable)message;
762           _log.info( "queryPoolsForCost : reply : "+poolMessage ) ;
763           list.add( poolMessage ) ;
764        }
765        _log.info( "queryPoolsForCost : number of valid replies : "+list.size() );
766        return list ;
767    }
768 
769    private PoolManagerPoolInformation getPoolInformation(PoolSelectionUnit.SelectionPool pool)
770        throws InterruptedException
771    {
772        String name = pool.getName();
773        PoolManagerPoolInformation info = new PoolManagerPoolInformation(name);
774        PoolCostCheckable cost = _costModule.getPoolCost(name, 0);
775        if (!pool.isActive() || cost == null) {
776            info.setSpaceCost(Double.POSITIVE_INFINITY);
777            info.setCpuCost(Double.POSITIVE_INFINITY);
778        } else {
779            info.setSpaceCost(cost.getSpaceCost());
780            info.setCpuCost(cost.getPerformanceCost());
781        }
782        info.setPoolCostInfo(_costModule.getPoolCostInfo(name));
783        return info;
784    }
785 
786    private Collection<PoolManagerPoolInformation>
787        getPoolInformation(Iterator<PoolSelectionUnit.SelectionPool> pools)
788        throws InterruptedException
789    {
790        List<PoolManagerPoolInformation> result = new ArrayList();
791        while (pools.hasNext()) {
792            result.add(getPoolInformation(pools.next()));
793        }
794        return result;
795    }
796 
797    public PoolManagerPoolInformation getPoolInformation(String name)
798        throws InterruptedException, NoSuchElementException
799    {
800        PoolSelectionUnit.SelectionPool pool = _selectionUnit.getPool(name);
801        if (pool == null) {
802            throw new NoSuchElementException("No such pool: " + name);
803        }
804        return getPoolInformation(pool);
805    }
806 
807    public Collection<PoolManagerPoolInformation>
808        getPoolsByLink(String linkName)
809        throws InterruptedException, NoSuchElementException
810    {
811        PoolSelectionUnit.SelectionLink link =
812            _selectionUnit.getLinkByName(linkName);
813        return getPoolInformation(link.pools());
814    }
815 
816    public Collection<PoolManagerPoolInformation>
817        getPoolsByPoolGroup(String poolGroup)
818        throws InterruptedException, NoSuchElementException
819    {
820        Collection<PoolSelectionUnit.SelectionPool> pools =
821            _selectionUnit.getPoolsByPoolGroup(poolGroup);
822        return getPoolInformation(pools.iterator());
823    }
824 
825    /**
826     * Fetch the percentile performance cost; that is, the cost
827     * of the <code>n</code>th pool, in increasing order of performance cost,
828     * where <code>n</code> is <code>(int)floor( fraction * numberOfPools)</code>
829     * @param fraction the percentile fraction.  The value must be between 0 and 1.
830     * @return the nth percentile performance cost, or 0 if there are no pools.
831     */
832    public double getPoolsPercentilePerformanceCost( double fraction) {
833        return _costModule.getPoolsPercentilePerformanceCost( fraction);
834    }
835 
836}

[all classes][diskCacheV111.poolManager]
EMMA 2.0.5312 (C) Vladimir Roubtsov