1 | // $Id: PoolMonitorV5.java,v 1.32 2007-08-01 20:00:45 tigran Exp $ |
2 | |
3 | package diskCacheV111.poolManager ; |
4 | |
5 | import java.util.ArrayList; |
6 | import java.util.Collection; |
7 | import java.util.Collections; |
8 | import java.util.Comparator; |
9 | import java.util.HashMap; |
10 | import java.util.HashSet; |
11 | import java.util.Iterator; |
12 | import java.util.List; |
13 | import java.util.Map; |
14 | import java.util.Set; |
15 | import java.util.NoSuchElementException; |
16 | |
17 | import org.slf4j.Logger; |
18 | import org.slf4j.LoggerFactory; |
19 | import org.dcache.cells.AbstractCellComponent; |
20 | |
21 | import diskCacheV111.poolManager.PoolSelectionUnit.DirectionType; |
22 | import diskCacheV111.util.CacheException; |
23 | import diskCacheV111.util.PnfsHandler; |
24 | import diskCacheV111.util.PnfsId; |
25 | import diskCacheV111.util.SpreadAndWait; |
26 | import diskCacheV111.vehicles.IpProtocolInfo; |
27 | import diskCacheV111.vehicles.PoolCheckCostMessage; |
28 | import diskCacheV111.vehicles.PoolCheckFileMessage; |
29 | import diskCacheV111.vehicles.PoolCheckable; |
30 | import diskCacheV111.vehicles.PoolCostCheckable; |
31 | import diskCacheV111.vehicles.PoolManagerPoolInformation; |
32 | import diskCacheV111.vehicles.ProtocolInfo; |
33 | import diskCacheV111.vehicles.StorageInfo; |
34 | import dmg.cells.nucleus.CellMessage; |
35 | import dmg.cells.nucleus.CellPath; |
36 | |
37 | public class PoolMonitorV5 |
38 | extends AbstractCellComponent |
39 | { |
40 | private final static Logger _log = LoggerFactory.getLogger(PoolMonitorV5.class); |
41 | |
42 | private long _poolTimeout = 15 * 1000; |
43 | private PoolSelectionUnit _selectionUnit ; |
44 | private PnfsHandler _pnfsHandler ; |
45 | private CostModule _costModule ; |
46 | private double _maxWriteCost = 1000000.0; |
47 | private PartitionManager _partitionManager ; |
48 | |
49 | public PoolMonitorV5() |
50 | { |
51 | } |
52 | |
53 | public void setPoolSelectionUnit(PoolSelectionUnit selectionUnit) |
54 | { |
55 | _selectionUnit = selectionUnit; |
56 | } |
57 | |
58 | public void setPnfsHandler(PnfsHandler pnfsHandler) |
59 | { |
60 | _pnfsHandler = pnfsHandler; |
61 | } |
62 | |
63 | public void setCostModule(CostModule costModule) |
64 | { |
65 | _costModule = costModule; |
66 | } |
67 | |
68 | public void setPartitionManager(PartitionManager partitionManager) |
69 | { |
70 | _partitionManager = partitionManager; |
71 | } |
72 | |
73 | public void messageToCostModule( CellMessage cellMessage ){ |
74 | _costModule.messageArrived(cellMessage); |
75 | } |
76 | public void setPoolTimeout( long poolTimeout ){ |
77 | _poolTimeout = poolTimeout ; |
78 | } |
79 | /* |
80 | public void setSpaceCost( double spaceCost ){ |
81 | _spaceCostFactor = spaceCost ; |
82 | } |
83 | public void setPerformanceCost( double performanceCost ){ |
84 | _performanceCostFactor = performanceCost ; |
85 | }*/ |
86 | public long getPoolTimeout(){ return _poolTimeout ;} |
87 | // output[0] -> Allowed and Available |
88 | // output[1] -> available but not allowed (sorted, cpu) |
89 | // output[2] -> allowed but not available (sorted, cpu + space) |
90 | // output[3] -> pools from pnfs |
91 | // output[4] -> List of List (all allowed pools) |
92 | public PnfsFileLocation getPnfsFileLocation( |
93 | PnfsId pnfsId , |
94 | StorageInfo storageInfo , |
95 | ProtocolInfo protocolInfo, String linkGroup){ |
96 | |
97 | return new PnfsFileLocation( pnfsId, storageInfo ,protocolInfo , linkGroup) ; |
98 | |
99 | } |
100 | public class PnfsFileLocation { |
101 | |
102 | private List<PoolManagerParameter> _listOfPartitions; |
103 | private List<List<PoolCostCheckable>> _allowedAndAvailableMatrix; |
104 | private List<PoolCostCheckable> _acknowledgedPnfsPools; |
105 | private int _allowedPoolCount = 0 ; |
106 | private int _availablePoolCount = 0 ; |
107 | private boolean _calculationDone = false ; |
108 | |
109 | private final PnfsId _pnfsId ; |
110 | private final StorageInfo _storageInfo ; |
111 | private final ProtocolInfo _protocolInfo ; |
112 | private final String _linkGroup ; |
113 | |
114 | //private PoolManagerParameter _recentParameter = _partitionManager.getParameterCopyOf() ; |
115 | |
116 | private PnfsFileLocation( PnfsId pnfsId , |
117 | StorageInfo storageInfo , |
118 | ProtocolInfo protocolInfo , |
119 | String linkGroup){ |
120 | |
121 | _pnfsId = pnfsId ; |
122 | _storageInfo = storageInfo ; |
123 | _protocolInfo = protocolInfo ; |
124 | _linkGroup = linkGroup; |
125 | } |
126 | |
127 | public List<PoolManagerParameter> getListOfParameter() |
128 | { |
129 | return _listOfPartitions; |
130 | } |
131 | |
132 | public void clear(){ |
133 | _allowedAndAvailableMatrix = null ; |
134 | _calculationDone = false ; |
135 | } |
136 | |
137 | public PoolManagerParameter getCurrentParameterSet() |
138 | { |
139 | return _listOfPartitions.get(0); |
140 | } |
141 | |
142 | public List getAllowedButNotAvailable() |
143 | { |
144 | return null; |
145 | } |
146 | |
147 | public List<PoolCostCheckable> getAcknowledgedPnfsPools() |
148 | throws CacheException, InterruptedException |
149 | { |
150 | if (_acknowledgedPnfsPools == null) |
151 | calculateFileAvailableMatrix(); |
152 | return _acknowledgedPnfsPools; |
153 | } |
154 | |
155 | public int getAllowedPoolCount() |
156 | { |
157 | return _allowedPoolCount; |
158 | } |
159 | |
160 | public int getAvailablePoolCount() |
161 | { |
162 | return _availablePoolCount; |
163 | } |
164 | |
165 | public List<List<PoolCostCheckable>> getFileAvailableMatrix() |
166 | throws CacheException, InterruptedException |
167 | { |
168 | if (_allowedAndAvailableMatrix == null) |
169 | calculateFileAvailableMatrix(); |
170 | return _allowedAndAvailableMatrix; |
171 | } |
172 | // |
173 | // getFileAvailableList |
174 | // ------------------------- |
175 | // |
176 | // expected = getPoolsFromPnfs() ; |
177 | // allowed[] = getAllowedFromConfiguration() ; |
178 | // |
179 | // +----------------------------------------------------+ |
180 | // | for i in 0,1,2,3,... | |
181 | // | result = intersection( expected , allowed[i] ) | |
182 | // | found = CheckFileInPool( result) | |
183 | // | if( found > 0 )break | |
184 | // | if( ! allowFallbackOnCost )break | |
185 | // | if( minCost( found ) < MAX_COST )break | |
186 | // +----------------------------------------------------+ |
187 | // | found == 0 | |
188 | // | | | |
189 | // | yes | NO | |
190 | // |----------------------|-----------------------------| |
191 | // | output[0] = empty | [0] = SortCpuCost(found) | |
192 | // | output[1] = null | [1] = null | |
193 | // | output[2] = null | [2] = null | |
194 | // | output[3] = expected | [3] = expected | |
195 | // | output[4] = allowed | [4] = allowed | |
196 | // +----------------------------------------------------+ |
197 | // |
198 | // preparePool2Pool |
199 | // ------------------------- |
200 | // |
201 | // output[1] = SortCpuCost( CheckFileInPool( expected ) ) |
202 | // |
203 | // +----------------------------------------------------+ |
204 | // | output[0] > 0 | |
205 | // | | |
206 | // | yes | NO | |
207 | // |-------------------------|--------------------------| |
208 | // | veto = Hash( output[0] )| | |
209 | // |-------------------------| | |
210 | // |for i in 0,1,2,3,. | for i in 0,1,2,3,. | |
211 | // | tmp = allowed[i]-veto | if(allowed[i]==0)cont | |
212 | // | if( tmp == 0 )continue | | |
213 | // | out[2] = | out[2] = | |
214 | // | SortCost(getCost(tmp))| SortCost(getCost( | |
215 | // | | allowed[i])) | |
216 | // | break | break | |
217 | // +----------------------------------------------------+ |
218 | // |if(out[2] == 0) |if(out[2] == 0) | |
219 | // | out[2] = out[0] | out[2] = empty | |
220 | // +----------------------------------------------------+ |
221 | // |
222 | /* |
223 | * Input : storage info , pnfsid |
224 | * Output : |
225 | * _acknowledgedPnfsPools |
226 | * _allowedAndAvailableMatrix |
227 | * _allowedAndAvailable |
228 | */ |
229 | private void calculateFileAvailableMatrix() |
230 | throws CacheException, InterruptedException |
231 | { |
232 | |
233 | if( _storageInfo == null ) |
234 | throw new |
235 | CacheException(189,"Storage Info not available"); |
236 | |
237 | String hostName = _protocolInfo instanceof IpProtocolInfo ?((IpProtocolInfo)_protocolInfo).getHosts()[0] : null ; |
238 | String protocolString = _protocolInfo.getProtocol() + "/" + _protocolInfo.getMajorVersion() ; |
239 | // |
240 | // will ask the PnfsManager for a hint |
241 | // about the pool locations of this |
242 | // pnfsId. Returns an enumeration of |
243 | // the possible pools. |
244 | // |
245 | List<String> expectedFromPnfs = _pnfsHandler.getCacheLocations( _pnfsId ) ; |
246 | say( "calculateFileAvailableMatrix _expectedFromPnfs : "+expectedFromPnfs ) ; |
247 | // |
248 | // check if pools are up and file is really there. |
249 | // (returns unsorted list of costs) |
250 | // |
251 | _acknowledgedPnfsPools = |
252 | queryPoolsForPnfsId(expectedFromPnfs.iterator(), _pnfsId, 0, |
253 | _protocolInfo.isFileCheckRequired()); |
254 | say( "calculateFileAvailableMatrix _acknowledgedPnfsPools : "+_acknowledgedPnfsPools ) ; |
255 | Map<String, PoolCostCheckable> availableHash = |
256 | new HashMap<String, PoolCostCheckable>() ; |
257 | for( PoolCostCheckable cost: _acknowledgedPnfsPools ){ |
258 | availableHash.put( cost.getPoolName() , cost ) ; |
259 | } |
260 | // |
261 | // get the prioritized list of allowed pools for this |
262 | // request. (We are only allowed to use the level-1 |
263 | // pools. |
264 | // |
265 | PoolPreferenceLevel [] level = |
266 | _selectionUnit.match( DirectionType.READ , |
267 | hostName , |
268 | protocolString , |
269 | _storageInfo, |
270 | _linkGroup ) ; |
271 | |
272 | _listOfPartitions = new ArrayList<PoolManagerParameter>(); |
273 | _allowedAndAvailableMatrix = new ArrayList<List<PoolCostCheckable>>(); |
274 | _allowedPoolCount = 0 ; |
275 | _availablePoolCount = 0 ; |
276 | |
277 | for( int prio = 0 ; prio < level.length ; prio++ ){ |
278 | |
279 | List<String> poolList = level[prio].getPoolList() ; |
280 | // |
281 | // |
282 | PoolManagerParameter parameter = _partitionManager.getParameterCopyOf(level[prio].getTag()) ; |
283 | _listOfPartitions.add( parameter ) ; |
284 | // |
285 | // get the allowed pools for this level and |
286 | // and add them to the result list only if |
287 | // they are really available. |
288 | // |
289 | say( "calculateFileAvailableMatrix : db matrix[*,"+prio+"] "+poolList); |
290 | |
291 | List<PoolCostCheckable> result = |
292 | new ArrayList<PoolCostCheckable>(poolList.size()); |
293 | for (String poolName : poolList) { |
294 | PoolCostCheckable cost; |
295 | if ((cost = availableHash.get(poolName)) != null) { |
296 | result.add(cost); |
297 | _availablePoolCount++; |
298 | } |
299 | _allowedPoolCount++; |
300 | } |
301 | |
302 | sortByCost(result, false, parameter); |
303 | |
304 | say("calculateFileAvailableMatrix : av matrix[*," + prio + "] " |
305 | + result); |
306 | |
307 | _allowedAndAvailableMatrix.add(result); |
308 | } |
309 | // |
310 | // just in case, let us define a default parameter set |
311 | // |
312 | if( _listOfPartitions.size() == 0 )_listOfPartitions.add( _partitionManager.getParameterCopyOf() ) ; |
313 | // |
314 | _calculationDone = true ; |
315 | return ; |
316 | } |
317 | |
318 | public List<PoolCostCheckable> getCostSortedAvailable() |
319 | throws CacheException, InterruptedException |
320 | { |
321 | // |
322 | // here we don't now exactly which parameter set to use. |
323 | // |
324 | if (!_calculationDone) |
325 | calculateFileAvailableMatrix(); |
326 | List<PoolCostCheckable> list = |
327 | new ArrayList<PoolCostCheckable>(getAcknowledgedPnfsPools()); |
328 | sortByCost(list, false); |
329 | return list; |
330 | } |
331 | |
332 | public List<List<PoolCostCheckable>> |
333 | getStagePoolMatrix(StorageInfo storageInfo, |
334 | ProtocolInfo protocolInfo, |
335 | long filesize) |
336 | throws CacheException, InterruptedException |
337 | { |
338 | return getFetchPoolMatrix(DirectionType.CACHE, |
339 | storageInfo, |
340 | protocolInfo, |
341 | filesize); |
342 | } |
343 | |
344 | public List<List<PoolCostCheckable>> |
345 | getFetchPoolMatrix(DirectionType mode , /* cache, p2p */ |
346 | StorageInfo storageInfo , |
347 | ProtocolInfo protocolInfo , |
348 | long filesize ) |
349 | throws CacheException, InterruptedException |
350 | { |
351 | |
352 | String hostName = |
353 | protocolInfo instanceof IpProtocolInfo ? |
354 | ((IpProtocolInfo)protocolInfo).getHosts()[0] : |
355 | null ; |
356 | |
357 | |
358 | PoolPreferenceLevel [] level = |
359 | _selectionUnit.match( mode , |
360 | hostName , |
361 | protocolInfo.getProtocol()+"/"+protocolInfo.getMajorVersion() , |
362 | storageInfo, |
363 | _linkGroup) ; |
364 | // |
365 | // |
366 | if( level.length == 0 )return new ArrayList<List<PoolCostCheckable>>() ; |
367 | |
368 | // |
369 | // Copy the matrix into a linear HashMap(keys). |
370 | // Exclude pools which contain the file. |
371 | // |
372 | List<PoolCostCheckable> acknowledged = |
373 | getAcknowledgedPnfsPools(); |
374 | Map<String, PoolCostCheckable> poolMap = |
375 | new HashMap<String,PoolCostCheckable>(); |
376 | Set<String> poolAvailableSet = |
377 | new HashSet<String>(); |
378 | for (PoolCheckable pool : acknowledged) |
379 | poolAvailableSet.add(pool.getPoolName()); |
380 | for (int prio = 0; prio < level.length; prio++) { |
381 | for (String poolName : level[prio].getPoolList()) { |
382 | // |
383 | // skip if pool already contains the file. |
384 | // |
385 | if (poolAvailableSet.contains(poolName)) |
386 | continue; |
387 | |
388 | poolMap.put(poolName, null); |
389 | } |
390 | } |
391 | // |
392 | // Add the costs to the pool list. |
393 | // |
394 | for (PoolCostCheckable cost : |
395 | queryPoolsForCost(poolMap.keySet().iterator(), filesize)) { |
396 | poolMap.put(cost.getPoolName(), cost); |
397 | } |
398 | // |
399 | // Build a new matrix containing the Costs. |
400 | // |
401 | _listOfPartitions = new ArrayList<PoolManagerParameter>(); |
402 | List<List<PoolCostCheckable>> costMatrix = |
403 | new ArrayList<List<PoolCostCheckable>>(); |
404 | for (int prio = 0; prio < level.length; prio++) { |
405 | // |
406 | // skip empty level |
407 | // |
408 | PoolManagerParameter parameter = |
409 | _partitionManager.getParameterCopyOf(level[prio].getTag()); |
410 | _listOfPartitions.add(parameter); |
411 | |
412 | List<String> poolList = level[prio].getPoolList() ; |
413 | if( poolList.size() == 0 )continue ; |
414 | |
415 | List<PoolCostCheckable> row = new ArrayList<PoolCostCheckable>(); |
416 | for (String pool : poolList) { |
417 | PoolCostCheckable cost = poolMap.get(pool); |
418 | if (cost != null) |
419 | row.add(cost); |
420 | } |
421 | // |
422 | // skip if non of the pools is available |
423 | // |
424 | if( row.size() == 0 )continue ; |
425 | // |
426 | // sort according to (cpu & space) cost |
427 | // |
428 | sortByCost( row , true , parameter ) ; |
429 | // |
430 | // and add it to the matrix |
431 | // |
432 | costMatrix.add( row ) ; |
433 | } |
434 | |
435 | return costMatrix ; |
436 | } |
437 | private void say(String message ){ |
438 | _log.debug("PFL ["+_pnfsId+"] : "+message); |
439 | } |
440 | |
441 | public List<PoolCostCheckable> getStorePoolList(long filesize) |
442 | throws CacheException, InterruptedException |
443 | { |
444 | return getStorePoolList(_storageInfo, _protocolInfo, filesize); |
445 | } |
446 | |
447 | private List<PoolCostCheckable> |
448 | getStorePoolList(StorageInfo storageInfo, |
449 | ProtocolInfo protocolInfo, |
450 | long filesize) |
451 | throws CacheException, InterruptedException |
452 | { |
453 | String hostName = |
454 | protocolInfo instanceof IpProtocolInfo ? |
455 | ((IpProtocolInfo)protocolInfo).getHosts()[0] : |
456 | null ; |
457 | int maxDepth = 9999 ; |
458 | PoolPreferenceLevel [] level = |
459 | _selectionUnit.match( DirectionType.WRITE , |
460 | hostName , |
461 | protocolInfo.getProtocol()+"/"+protocolInfo.getMajorVersion() , |
462 | storageInfo, |
463 | _linkGroup ) ; |
464 | // |
465 | // this is the final knock out. |
466 | // |
467 | if( level.length == 0 ) |
468 | throw new |
469 | CacheException( 19 , |
470 | "No write pools configured for <"+ storageInfo + |
471 | "> in the linkGroup " + |
472 | ( _linkGroup == null ? "[none]" : _linkGroup) ) ; |
473 | |
474 | List<PoolCostCheckable> costs = null ; |
475 | |
476 | PoolManagerParameter parameter = null ; |
477 | |
478 | for( int prio = 0 ; prio < Math.min( maxDepth , level.length ) ; prio++ ){ |
479 | |
480 | costs = queryPoolsForCost( level[prio].getPoolList().iterator() , filesize ) ; |
481 | |
482 | parameter = _partitionManager.getParameterCopyOf(level[prio].getTag()) ; |
483 | |
484 | if( costs.size() != 0 )break ; |
485 | } |
486 | |
487 | if( costs == null || costs.size() == 0 ) |
488 | throw new |
489 | CacheException( 20 , |
490 | "No write pool available for <"+ storageInfo + |
491 | "> in the linkGroup " + |
492 | ( _linkGroup == null ? "[none]" : _linkGroup)); |
493 | |
494 | sortByCost( costs , true , parameter ) ; |
495 | |
496 | PoolCostCheckable check = costs.get(0) ; |
497 | |
498 | double lowestCost = calculateCost( check , true , parameter ) ; |
499 | |
500 | /* Notice that |
501 | * |
502 | * !(lowestCost <= _maxWriteCost) != (lowerCost > _maxWriteCost) |
503 | * |
504 | * when using floating point calculations! |
505 | */ |
506 | if( !(lowestCost <= _maxWriteCost) ) |
507 | throw new |
508 | CacheException( 21 , "Best pool <"+check.getPoolName()+ |
509 | "> too high : "+lowestCost ) ; |
510 | |
511 | return costs ; |
512 | } |
513 | |
514 | public void sortByCost(List<PoolCostCheckable> list, boolean cpuAndSize) |
515 | { |
516 | sortByCost(list, cpuAndSize, getCurrentParameterSet()); |
517 | } |
518 | |
519 | private void sortByCost(List<PoolCostCheckable> list, boolean cpuAndSize, |
520 | PoolManagerParameter parameter) |
521 | { |
522 | ssortByCost(list, cpuAndSize, parameter); |
523 | } |
524 | } |
525 | |
526 | public void ssortByCost(List<PoolCostCheckable> list, boolean cpuAndSize, |
527 | PoolManagerParameter parameter) |
528 | { |
529 | Collections.shuffle(list); |
530 | Collections.sort(list, new CostComparator(cpuAndSize, parameter)); |
531 | } |
532 | |
533 | public Comparator<PoolCostCheckable> |
534 | getCostComparator(boolean both, PoolManagerParameter parameter) |
535 | { |
536 | return new CostComparator(both, parameter); |
537 | } |
538 | |
539 | public class CostComparator implements Comparator<PoolCostCheckable> { |
540 | |
541 | private final boolean _useBoth; |
542 | private final PoolManagerParameter _para; |
543 | private CostComparator( boolean useBoth , PoolManagerParameter para ){ |
544 | _useBoth = useBoth ; |
545 | _para = para ; |
546 | } |
547 | public int compare(PoolCostCheckable check1, PoolCostCheckable check2) |
548 | { |
549 | return Double.compare(calculateCost(check1, _useBoth, _para), |
550 | calculateCost(check2, _useBoth, _para)); |
551 | } |
552 | } |
553 | public double calculateCost( PoolCostCheckable checkable , boolean useBoth , PoolManagerParameter para ){ |
554 | if( useBoth ){ |
555 | return Math.abs(checkable.getSpaceCost()) * para._spaceCostFactor + |
556 | Math.abs(checkable.getPerformanceCost()) * para._performanceCostFactor ; |
557 | }else{ |
558 | return Math.abs(checkable.getPerformanceCost()) * para._performanceCostFactor ; |
559 | } |
560 | } |
561 | /* |
562 | public double getMinPerformanceCost( List list ){ |
563 | double cost = 1000000.0 ; |
564 | for( int i = 0 ; i < list.size() ; i++ ){ |
565 | double x = ((PoolCostCheckable)(list.get(i))).getPerformanceCost() ; |
566 | cost = Math.min( cost , x ) ; |
567 | } |
568 | return cost ; |
569 | } |
570 | */ |
571 | //------------------------------------------------------------------------------ |
572 | // |
573 | // 'queryPoolsForPnfsId' sends PoolCheckFileMessages to all pools |
574 | // specified in the pool iterator. It waits until all replies |
575 | // have arrived, the global timeout has expired or the thread |
576 | // was interrupted. |
577 | // |
578 | |
579 | private List<PoolCostCheckable> queryPoolsForPnfsId(Iterator<String> pools, |
580 | PnfsId pnfsId, |
581 | long filesize, |
582 | boolean checkFileExistence) |
583 | throws InterruptedException |
584 | { |
585 | List<PoolCostCheckable> list = new ArrayList<PoolCostCheckable>(); |
586 | |
587 | if (checkFileExistence) { |
588 | |
589 | SpreadAndWait control = new SpreadAndWait(getCellEndpoint(), |
590 | _poolTimeout); |
591 | |
592 | while (pools.hasNext()) { |
593 | |
594 | String poolName = pools.next(); |
595 | // |
596 | // deselection inactive and disabled pools |
597 | // |
598 | PoolSelectionUnit.SelectionPool pool = _selectionUnit |
599 | .getPool(poolName); |
600 | if ((pool == null) || !pool.canRead() || !pool.isActive()) |
601 | continue; |
602 | |
603 | _log.info("queryPoolsForPnfsId : PoolCheckFileRequest to : " |
604 | + poolName); |
605 | // |
606 | // send query |
607 | // |
608 | CellMessage cellMessage = new CellMessage( |
609 | new CellPath(poolName), new PoolCheckFileMessage( |
610 | poolName, pnfsId)); |
611 | |
612 | try { |
613 | control.send(cellMessage); |
614 | } catch (Exception exc) { |
615 | // |
616 | // here we don't care about exceptions |
617 | // |
618 | _log.warn("Exception sending PoolCheckFileRequest to " |
619 | + poolName + " : " + exc); |
620 | } |
621 | } |
622 | |
623 | // |
624 | // scan the replies |
625 | // |
626 | CellMessage answer = null; |
627 | |
628 | while ((answer = control.next()) != null) { |
629 | |
630 | Object message = answer.getMessageObject(); |
631 | |
632 | if (!(message instanceof PoolCheckFileMessage)) { |
633 | _log.warn("queryPoolsForPnfsId : Unexpected message from (" |
634 | + answer.getSourcePath() |
635 | + ") " |
636 | + message.getClass()); |
637 | continue; |
638 | } |
639 | |
640 | PoolCheckFileMessage poolMessage = |
641 | (PoolCheckFileMessage) message; |
642 | _log.info("queryPoolsForPnfsId : reply : " + poolMessage); |
643 | |
644 | boolean have = poolMessage.getHave(); |
645 | String poolName = poolMessage.getPoolName(); |
646 | if (have) { |
647 | |
648 | PoolCostCheckable cost = |
649 | _costModule.getPoolCost(poolName, filesize); |
650 | if (cost != null) { |
651 | PoolCheckAdapter check = new PoolCheckAdapter(cost); |
652 | check.setHave(have); |
653 | check.setPnfsId(pnfsId); |
654 | list.add(check); |
655 | _log.info("queryPoolsForPnfsId : returning : " + check); |
656 | } |
657 | } else if (!poolMessage.getWaiting() && poolMessage.getReturnCode() == 0) { |
658 | _log.warn("queryPoolsForPnfsId : clearingCacheLocation for pnfsId " |
659 | + pnfsId + " at pool " + poolName); |
660 | _pnfsHandler.clearCacheLocation(pnfsId, poolName); |
661 | } |
662 | } |
663 | |
664 | } else { |
665 | |
666 | while ( pools.hasNext() ) { |
667 | |
668 | String poolName = pools.next(); |
669 | PoolCostCheckable cost = |
670 | _costModule.getPoolCost(poolName, filesize); |
671 | if (cost != null) { |
672 | PoolCheckAdapter check = new PoolCheckAdapter(cost); |
673 | check.setHave(true); |
674 | check.setPnfsId(pnfsId); |
675 | list.add(check); |
676 | } |
677 | } |
678 | |
679 | } |
680 | |
681 | _log.info("queryPoolsForPnfsId : number of valid replies : " |
682 | + list.size()); |
683 | return list; |
684 | |
685 | } |
686 | public List<PoolCostCheckable> |
687 | queryPoolsByLinkName(String linkName, long filesize) |
688 | throws InterruptedException |
689 | { |
690 | List<String> pools = new ArrayList<String>() ; |
691 | |
692 | PoolSelectionUnit.SelectionLink link = _selectionUnit.getLinkByName( linkName ) ; |
693 | PoolManagerParameter parameter = _partitionManager.getParameterCopyOf( link.getTag() ) ; |
694 | |
695 | for( Iterator<PoolSelectionUnit.SelectionPool> i = link.pools() ; i.hasNext() ; ){ |
696 | pools.add( i.next().getName() ) ; |
697 | } |
698 | |
699 | List<PoolCostCheckable> list = |
700 | queryPoolsForCost( pools.iterator() , filesize ) ; |
701 | |
702 | ssortByCost( list , true , parameter ) ; |
703 | |
704 | return list ; |
705 | } |
706 | private boolean _dontAskForCost = true ; |
707 | private List<PoolCostCheckable> queryPoolsForCost(Iterator<String> pools, |
708 | long filesize) |
709 | throws InterruptedException |
710 | { |
711 | List<PoolCostCheckable> list = new ArrayList<PoolCostCheckable>(); |
712 | SpreadAndWait control = |
713 | new SpreadAndWait(getCellEndpoint(), _poolTimeout); |
714 | |
715 | while( pools.hasNext() ){ |
716 | |
717 | String poolName = pools.next(); |
718 | PoolCostCheckable costCheck = _costModule.getPoolCost( poolName , filesize ) ; |
719 | if( costCheck != null ){ |
720 | list.add( costCheck ) ; |
721 | _log.info( "queryPoolsForCost : costModule : "+poolName+" ("+filesize+") "+costCheck); |
722 | }else{ |
723 | // |
724 | // send query |
725 | // |
726 | if( _dontAskForCost )continue ; |
727 | CellMessage cellMessage = |
728 | new CellMessage( new CellPath(poolName), |
729 | new PoolCheckCostMessage(poolName,filesize) |
730 | ); |
731 | |
732 | _log.info( "queryPoolsForCost : "+poolName+" query sent"); |
733 | try{ |
734 | control.send( cellMessage ) ; |
735 | }catch(Exception exc){ |
736 | // |
737 | // here we don't care about exceptions |
738 | // |
739 | _log.warn("queryPoolsForCost : Exception sending PoolCheckFileRequest to "+poolName+" : "+exc); |
740 | } |
741 | } |
742 | |
743 | } |
744 | |
745 | if( _dontAskForCost )return list ; |
746 | |
747 | // |
748 | // scan the replies |
749 | // |
750 | CellMessage answer = null ; |
751 | |
752 | while( ( answer = control.next() ) != null ){ |
753 | |
754 | Object message = answer.getMessageObject(); |
755 | |
756 | if( ! ( message instanceof PoolCostCheckable )){ |
757 | _log.warn("queryPoolsForCost : Unexpected message from ("+ |
758 | answer.getSourcePath()+") "+message.getClass()); |
759 | continue ; |
760 | } |
761 | PoolCostCheckable poolMessage = (PoolCostCheckable)message; |
762 | _log.info( "queryPoolsForCost : reply : "+poolMessage ) ; |
763 | list.add( poolMessage ) ; |
764 | } |
765 | _log.info( "queryPoolsForCost : number of valid replies : "+list.size() ); |
766 | return list ; |
767 | } |
768 | |
769 | private PoolManagerPoolInformation getPoolInformation(PoolSelectionUnit.SelectionPool pool) |
770 | throws InterruptedException |
771 | { |
772 | String name = pool.getName(); |
773 | PoolManagerPoolInformation info = new PoolManagerPoolInformation(name); |
774 | PoolCostCheckable cost = _costModule.getPoolCost(name, 0); |
775 | if (!pool.isActive() || cost == null) { |
776 | info.setSpaceCost(Double.POSITIVE_INFINITY); |
777 | info.setCpuCost(Double.POSITIVE_INFINITY); |
778 | } else { |
779 | info.setSpaceCost(cost.getSpaceCost()); |
780 | info.setCpuCost(cost.getPerformanceCost()); |
781 | } |
782 | info.setPoolCostInfo(_costModule.getPoolCostInfo(name)); |
783 | return info; |
784 | } |
785 | |
786 | private Collection<PoolManagerPoolInformation> |
787 | getPoolInformation(Iterator<PoolSelectionUnit.SelectionPool> pools) |
788 | throws InterruptedException |
789 | { |
790 | List<PoolManagerPoolInformation> result = new ArrayList(); |
791 | while (pools.hasNext()) { |
792 | result.add(getPoolInformation(pools.next())); |
793 | } |
794 | return result; |
795 | } |
796 | |
797 | public PoolManagerPoolInformation getPoolInformation(String name) |
798 | throws InterruptedException, NoSuchElementException |
799 | { |
800 | PoolSelectionUnit.SelectionPool pool = _selectionUnit.getPool(name); |
801 | if (pool == null) { |
802 | throw new NoSuchElementException("No such pool: " + name); |
803 | } |
804 | return getPoolInformation(pool); |
805 | } |
806 | |
807 | public Collection<PoolManagerPoolInformation> |
808 | getPoolsByLink(String linkName) |
809 | throws InterruptedException, NoSuchElementException |
810 | { |
811 | PoolSelectionUnit.SelectionLink link = |
812 | _selectionUnit.getLinkByName(linkName); |
813 | return getPoolInformation(link.pools()); |
814 | } |
815 | |
816 | public Collection<PoolManagerPoolInformation> |
817 | getPoolsByPoolGroup(String poolGroup) |
818 | throws InterruptedException, NoSuchElementException |
819 | { |
820 | Collection<PoolSelectionUnit.SelectionPool> pools = |
821 | _selectionUnit.getPoolsByPoolGroup(poolGroup); |
822 | return getPoolInformation(pools.iterator()); |
823 | } |
824 | |
825 | /** |
826 | * Fetch the percentile performance cost; that is, the cost |
827 | * of the <code>n</code>th pool, in increasing order of performance cost, |
828 | * where <code>n</code> is <code>(int)floor( fraction * numberOfPools)</code> |
829 | * @param fraction the percentile fraction. The value must be between 0 and 1. |
830 | * @return the nth percentile performance cost, or 0 if there are no pools. |
831 | */ |
832 | public double getPoolsPercentilePerformanceCost( double fraction) { |
833 | return _costModule.getPoolsPercentilePerformanceCost( fraction); |
834 | } |
835 | |
836 | } |