1 | package diskCacheV111.poolManager ; |
2 | |
3 | import java.io.PrintWriter; |
4 | import java.util.Arrays; |
5 | import java.util.ArrayList; |
6 | import java.util.Collection; |
7 | import java.util.Collections; |
8 | import java.util.Date; |
9 | import java.util.HashMap; |
10 | import java.util.Iterator; |
11 | import java.util.List; |
12 | import java.util.Map; |
13 | import java.util.NoSuchElementException; |
14 | import java.util.StringTokenizer; |
15 | |
16 | import org.slf4j.Logger; |
17 | import org.slf4j.LoggerFactory; |
18 | import org.dcache.poolmanager.Utils; |
19 | |
20 | import diskCacheV111.poolManager.PoolSelectionUnit.DirectionType; |
21 | import diskCacheV111.pools.PoolV2Mode; |
22 | import diskCacheV111.util.CacheException; |
23 | import diskCacheV111.util.PnfsId; |
24 | import diskCacheV111.util.Version; |
25 | import diskCacheV111.vehicles.GenericStorageInfo; |
26 | import diskCacheV111.vehicles.IpProtocolInfo; |
27 | import diskCacheV111.vehicles.PoolCostCheckable; |
28 | import diskCacheV111.vehicles.PoolLinkGroupInfo; |
29 | import diskCacheV111.vehicles.PoolManagerGetPoolListMessage; |
30 | import diskCacheV111.vehicles.PoolManagerPoolModeMessage; |
31 | import diskCacheV111.vehicles.PoolManagerPoolUpMessage; |
32 | import diskCacheV111.vehicles.PoolManagerGetPoolsByNameMessage; |
33 | import diskCacheV111.vehicles.PoolManagerGetPoolsByLinkMessage; |
34 | import diskCacheV111.vehicles.PoolManagerGetPoolsByPoolGroupMessage; |
35 | import diskCacheV111.vehicles.PoolMgrGetPoolByLink; |
36 | import diskCacheV111.vehicles.PoolMgrGetPoolLinkGroups; |
37 | import diskCacheV111.vehicles.PoolMgrQueryPoolsMsg; |
38 | import diskCacheV111.vehicles.PoolMgrSelectWritePoolMsg; |
39 | import diskCacheV111.vehicles.PoolStatusChangedMessage; |
40 | import diskCacheV111.vehicles.ProtocolInfo; |
41 | import diskCacheV111.vehicles.QuotaMgrCheckQuotaMessage; |
42 | import diskCacheV111.vehicles.StorageInfo; |
43 | import diskCacheV111.vehicles.PoolManagerPoolInformation; |
44 | import dmg.cells.nucleus.CellInfo; |
45 | import dmg.cells.nucleus.CellMessage; |
46 | import dmg.cells.nucleus.CellPath; |
47 | import dmg.cells.nucleus.CellVersion; |
48 | import dmg.cells.nucleus.DelayedReply; |
49 | import dmg.cells.nucleus.CDC; |
50 | import dmg.cells.nucleus.NoRouteToCellException; |
51 | import dmg.util.Args; |
52 | import dmg.util.CommandException; |
53 | |
54 | import org.dcache.cells.AbstractCellComponent; |
55 | import org.dcache.cells.CellCommandListener; |
56 | import org.dcache.cells.CellMessageReceiver; |
57 | import org.dcache.vehicles.PoolManagerSelectLinkGroupForWriteMessage; |
58 | |
59 | public class PoolManagerV5 |
60 | extends AbstractCellComponent |
61 | implements CellCommandListener, |
62 | CellMessageReceiver |
63 | { |
64 | private int _writeThreads = 0 ; |
65 | private int _readThreads = 0 ; |
66 | |
67 | private int _counterPoolUp = 0 ; |
68 | private int _counterSelectWritePool= 0 ; |
69 | private int _counterSelectReadPool = 0 ; |
70 | |
71 | private Map _readHandlerList = new HashMap() ; |
72 | private final Object _readHandlerLock = new Object() ; |
73 | |
74 | private PoolSelectionUnit _selectionUnit ; |
75 | private PoolMonitorV5 _poolMonitor ; |
76 | |
77 | private long _pnfsTimeout = 15 * 1000; |
78 | private long _readPoolTimeout = 15 * 1000; |
79 | private long _poolFetchTimeout = 5 * 24 * 3600 * 1000; |
80 | private long _writePoolTimeout = 15 * 1000; |
81 | private long _poolTimeout = 15 * 1000; |
82 | |
83 | private CostModule _costModule ; |
84 | private CellPath _poolStatusRelayPath = null ; |
85 | |
86 | private RequestContainerV5 _requestContainer ; |
87 | private WatchdogThread _watchdog = null ; |
88 | |
89 | private boolean _sendCostInfo = false ; //VP |
90 | private boolean _quotasEnabled = false ; |
91 | private String _quotaManager = "none"; |
92 | |
93 | |
94 | private final static Logger _log = LoggerFactory.getLogger(PoolManagerV5.class); |
95 | private final static Logger _logPoolMonitor = LoggerFactory.getLogger("logger.org.dcache.poolmonitor." + PoolManagerV5.class.getName()); |
96 | |
97 | |
98 | public PoolManagerV5() |
99 | { |
100 | } |
101 | |
102 | public void setPoolSelectionUnit(PoolSelectionUnit selectionUnit) |
103 | { |
104 | _selectionUnit = selectionUnit; |
105 | } |
106 | |
107 | public void setCostModule(CostModule costModule) |
108 | { |
109 | _costModule = costModule; |
110 | } |
111 | |
112 | public void setPoolMonitor(PoolMonitorV5 poolMonitor) |
113 | { |
114 | _poolMonitor = poolMonitor; |
115 | } |
116 | |
117 | public void setRequestContainer(RequestContainerV5 requestContainer) |
118 | { |
119 | _requestContainer = requestContainer; |
120 | } |
121 | |
122 | public void setPoolStatusRelayPath(CellPath poolStatusRelayPath) |
123 | { |
124 | _poolStatusRelayPath = |
125 | (poolStatusRelayPath.hops() == 0) |
126 | ? null |
127 | : poolStatusRelayPath; |
128 | } |
129 | |
130 | public void setQuotaManager(String quotaManager) |
131 | { |
132 | _quotaManager = quotaManager; |
133 | _quotasEnabled = !_quotaManager.equals("none"); |
134 | } |
135 | |
136 | public void setSendCostInfo(boolean sendCostInfo) |
137 | { |
138 | _sendCostInfo = sendCostInfo; |
139 | } |
140 | |
141 | public void init() |
142 | { |
143 | String watchdogParam = getArgs().getOpt("watchdog"); |
144 | if (watchdogParam != null && watchdogParam.length() > 0) { |
145 | _watchdog = new WatchdogThread(watchdogParam); |
146 | } else { |
147 | _watchdog = new WatchdogThread(); |
148 | } |
149 | _log.info("Watchdog : " + _watchdog); |
150 | } |
151 | |
152 | @Override |
153 | public CellInfo getCellInfo(CellInfo info) |
154 | { |
155 | PoolManagerCellInfo pminfo = new PoolManagerCellInfo(info); |
156 | pminfo.setCellVersion(new CellVersion(Version.getVersion(),"$Revision: 14256 $")); |
157 | pminfo.setPoolList(_selectionUnit.getActivePools()); |
158 | return pminfo; |
159 | } |
160 | |
161 | @Override |
162 | public void printSetup(PrintWriter writer) |
163 | { |
164 | writer.print("#\n# Setup of "); |
165 | writer.print(getCellName()); |
166 | writer.print(" ("); |
167 | writer.print(getClass().getName()); |
168 | writer.print(") at "); |
169 | writer.println(new Date().toString()); |
170 | writer.println("#"); |
171 | writer.print("set timeout pool "); |
172 | writer.println(""+(_poolMonitor.getPoolTimeout()/1000L)); |
173 | writer.println("#"); |
174 | } |
175 | |
176 | private class WatchdogThread implements Runnable { |
177 | private long _deathDetected = 10L * 60L * 1000L; // 10 minutes |
178 | private long _sleepTimer = 1L * 60L * 1000L; // 1 minute |
179 | private long _watchdogSequenceCounter = 0L; |
180 | |
181 | public WatchdogThread() { |
182 | new Thread(this, "watchdog").start(); |
183 | _log.info("WatchdogThread initialized with : " + this); |
184 | } |
185 | |
186 | public WatchdogThread(String parameter) { |
187 | // |
188 | // [<deathDetection>]:[<sleeper>] |
189 | // |
190 | long deathDetected = 0; |
191 | long sleeping = 0; |
192 | try { |
193 | StringTokenizer st = new StringTokenizer(parameter, ":"); |
194 | String tmp = null; |
195 | if (st.hasMoreTokens()) { |
196 | tmp = st.nextToken(); |
197 | if (tmp.length() > 0) |
198 | deathDetected = Long.parseLong(tmp); |
199 | } |
200 | if (st.hasMoreTokens()) { |
201 | tmp = st.nextToken(); |
202 | if (tmp.length() > 0) |
203 | sleeping = Long.parseLong(tmp); |
204 | } |
205 | |
206 | if ((deathDetected < 10) || (sleeping < 10)) |
207 | throw new IllegalArgumentException("Timers to small : " + parameter); |
208 | |
209 | if (deathDetected > 0L) |
210 | _deathDetected = deathDetected * 1000L; |
211 | if (sleeping > 0L) |
212 | _sleepTimer = sleeping * 1000L; |
213 | |
214 | } catch (Exception ee) { |
215 | _log.warn("WatchdogThread : illegal arguments [" + parameter + "] (using defaults) " + ee.getMessage()); |
216 | } |
217 | new Thread(this, "watchdog").start(); |
218 | _log.info("WatchdogThread initialized with : " + this); |
219 | } |
220 | |
221 | public void run() { |
222 | _log.info("watchdog thread activated"); |
223 | while (true) { |
224 | try { |
225 | Thread.sleep(_sleepTimer); |
226 | } catch (InterruptedException e) { |
227 | _log.info("watchdog thread interrupted"); |
228 | break; |
229 | } |
230 | runWatchdogSequence(_deathDetected); |
231 | _watchdogSequenceCounter++; |
232 | } |
233 | _log.info("watchdog finished"); |
234 | } |
235 | |
236 | @Override |
237 | public String toString() { |
238 | return "DeathDetection=" + (_deathDetected / 1000L) + ";Sleep=" |
239 | + (_sleepTimer / 1000L) + ";Counter=" |
240 | + _watchdogSequenceCounter + ";"; |
241 | } |
242 | } |
243 | |
244 | public PoolManagerPoolModeMessage |
245 | messageArrived(PoolManagerPoolModeMessage msg) |
246 | { |
247 | PoolSelectionUnit.SelectionPool pool = _selectionUnit.getPool(msg |
248 | .getPoolName()); |
249 | if (pool == null) { |
250 | msg.setFailed(563, "Pool not found : " + msg.getPoolName()); |
251 | } else if (msg.getPoolMode() == PoolManagerPoolModeMessage.UNDEFINED) { |
252 | // |
253 | // get pool mode |
254 | // |
255 | msg.setPoolMode(PoolManagerPoolModeMessage.READ | (pool.isReadOnly() ? 0 : PoolManagerPoolModeMessage.WRITE)); |
256 | } else { |
257 | // |
258 | // set pool mode |
259 | // |
260 | pool.setReadOnly((msg.getPoolMode() & PoolManagerPoolModeMessage.WRITE) == 0); |
261 | } |
262 | |
263 | msg.setSucceeded(); |
264 | return msg; |
265 | } |
266 | |
267 | private void runWatchdogSequence(long deathDetectedTimer) |
268 | { |
269 | for (String name : _selectionUnit.getDefinedPools(false)) { |
270 | PoolSelectionUnit.SelectionPool pool = _selectionUnit.getPool(name); |
271 | if (pool != null) { |
272 | if (pool.getActive() > deathDetectedTimer |
273 | && pool.setSerialId(0L)) { |
274 | |
275 | if( _logPoolMonitor.isDebugEnabled() ) { |
276 | _logPoolMonitor.debug("Pool " + name + " declared as DOWN (no ping in " + deathDetectedTimer/1000 +" seconds)."); |
277 | } |
278 | _requestContainer.poolStatusChanged(name, PoolStatusChangedMessage.DOWN); |
279 | sendPoolStatusRelay(name, PoolStatusChangedMessage.DOWN, |
280 | null, 666, "DEAD"); |
281 | } |
282 | } |
283 | } |
284 | } |
285 | |
286 | @Override |
287 | public void getInfo( PrintWriter pw ){ |
288 | pw.println("PoolManager V [$Id: PoolManagerV5.java,v 1.48 2007-10-10 08:05:34 tigran Exp $]"); |
289 | pw.println(" SelectionUnit : "+_selectionUnit.getVersion() ) ; |
290 | pw.println(" Write Threads : "+_writeThreads) ; |
291 | pw.println(" Read Threads : "+_readThreads) ; |
292 | pw.println(" Pool Timeout : "+_poolMonitor.getPoolTimeout()/1000L) ; |
293 | pw.println("Message counts") ; |
294 | pw.println(" PoolUp : "+_counterPoolUp ) ; |
295 | pw.println(" SelectReadPool : "+_counterSelectReadPool ) ; |
296 | pw.println(" SelectWritePool : "+_counterSelectWritePool ) ; |
297 | if( _watchdog == null ){ |
298 | pw.println(" Watchdog : disabled" ) ; |
299 | }else{ |
300 | pw.println(" Watchdog : "+_watchdog ) ; |
301 | } |
302 | } |
303 | public String hh_set_max_threads = " # DEPRICATED " ; |
304 | public String ac_set_max_threads_$_1( Args args )throws CommandException{ |
305 | return "" ; |
306 | } |
307 | public String hh_set_timeout_pool = "[-read] [-write] <timeout/secs>" ; |
308 | public String ac_set_timeout_pool_$_1( Args args )throws CommandException{ |
309 | boolean isWrite = args.getOpt("write") != null ; |
310 | boolean isRead = args.getOpt("read") != null ; |
311 | long timeout = Integer.parseInt(args.argv(0)) * 1000 ; |
312 | if( ( ! isWrite ) && ( ! isRead ) ){ |
313 | _readPoolTimeout = _writePoolTimeout = timeout ; |
314 | _poolMonitor.setPoolTimeout(_readPoolTimeout); |
315 | return "" ; |
316 | } |
317 | if( isWrite )_writePoolTimeout = timeout ; |
318 | if( isRead ){ |
319 | _readPoolTimeout = timeout ; |
320 | _poolMonitor.setPoolTimeout(_readPoolTimeout); |
321 | } |
322 | return "" ; |
323 | } |
324 | public String hh_set_timeout_pnfs = "<timeout/secs>" ; |
325 | public String ac_set_timeout_pnfs_$_1( Args args )throws CommandException{ |
326 | _pnfsTimeout = Integer.parseInt(args.argv(0)) * 1000 ; |
327 | return "" ; |
328 | } |
329 | public String hh_set_timeout_fetch = "<timeout/min>" ; |
330 | public String ac_set_timeout_fetch_$_1( Args args )throws CommandException{ |
331 | _poolFetchTimeout = Integer.parseInt(args.argv(0)) * 1000 * 60 ; |
332 | return "" ; |
333 | } |
334 | public String hh_getpoolsbylink = "<linkName> [-size=<filesize>]" ; |
335 | public String ac_getpoolsbylink_$_1( Args args )throws Exception { |
336 | String sizeString = args.getOpt("size") ; |
337 | long size = sizeString == null ? 50000000L : Long.parseLong( sizeString ) ; |
338 | String linkName = args.argv(0) ; |
339 | |
340 | List list = _poolMonitor.queryPoolsByLinkName( linkName , size ) ; |
341 | |
342 | StringBuffer sb = new StringBuffer() ; |
343 | for( Iterator i = list.iterator() ; i.hasNext() ; ){ |
344 | sb.append( i.next().toString() ).append("\n"); |
345 | } |
346 | return sb.toString() ; |
347 | } |
348 | |
349 | public synchronized |
350 | void messageArrived(PoolManagerPoolUpMessage poolMessage) |
351 | { |
352 | _counterPoolUp++; |
353 | |
354 | String poolName = poolMessage.getPoolName(); |
355 | PoolSelectionUnit.SelectionPool pool = |
356 | _selectionUnit.getPool(poolName, true); |
357 | |
358 | PoolV2Mode newMode = poolMessage.getPoolMode(); |
359 | PoolV2Mode oldMode = pool.getPoolMode(); |
360 | |
361 | if (_logPoolMonitor.isDebugEnabled()) { |
362 | _logPoolMonitor.debug("PoolUp message from " + poolName |
363 | + " with mode " + newMode |
364 | + " and serialId " + poolMessage.getSerialId()); |
365 | } |
366 | |
367 | /* For compatibility with previous versions of dCache, a pool |
368 | * marked DISABLED, but without any other DISABLED_ flags set |
369 | * is considered fully disabled. |
370 | */ |
371 | boolean disabled = |
372 | newMode.getMode() == PoolV2Mode.DISABLED |
373 | || newMode.isDisabled(PoolV2Mode.DISABLED_DEAD) |
374 | || newMode.isDisabled(PoolV2Mode.DISABLED_STRICT); |
375 | |
376 | /* By convention, the serial number is set to zero when a pool |
377 | * is disabled. This is used by the watchdog to identify, that |
378 | * we have already announced that the pool is down. |
379 | */ |
380 | long serial = disabled ? 0 : poolMessage.getSerialId(); |
381 | |
382 | /* Any change in the kind of operations a pool might be able |
383 | * to perform has to be propagated to a number of other |
384 | * components. |
385 | * |
386 | * Notice that calling setSerialId has a side-effect, which is |
387 | * why we call it first. |
388 | */ |
389 | boolean changed = |
390 | pool.setSerialId(serial) |
391 | || pool.isActive() == disabled |
392 | || (newMode.getMode() != oldMode.getMode()) |
393 | || !pool.getHsmInstances().equals(poolMessage.getHsmInstances()); |
394 | |
395 | pool.setPoolMode(newMode); |
396 | pool.setHsmInstances(poolMessage.getHsmInstances()); |
397 | pool.setActive(!disabled); |
398 | |
399 | /* Notify others in case the pool status has changed. Due to |
400 | * limitations of the PoolStatusChangedMessage, we will often |
401 | * send a RESTART notification, when in fact only the pool |
402 | * mode has changed. |
403 | */ |
404 | if (changed) { |
405 | _logPoolMonitor.info("Pool " + poolName + " changed from mode " |
406 | + oldMode + " to " + newMode); |
407 | |
408 | if (disabled) { |
409 | _requestContainer.poolStatusChanged(poolName, |
410 | PoolStatusChangedMessage.DOWN); |
411 | sendPoolStatusRelay(poolName, PoolStatusChangedMessage.DOWN, |
412 | poolMessage.getPoolMode(), |
413 | poolMessage.getCode(), |
414 | poolMessage.getMessage()); |
415 | } else { |
416 | _requestContainer.poolStatusChanged(poolName, |
417 | PoolStatusChangedMessage.UP); |
418 | sendPoolStatusRelay(poolName, PoolStatusChangedMessage.RESTART); |
419 | } |
420 | } |
421 | } |
422 | |
423 | private void sendPoolStatusRelay( String poolName , int status ){ |
424 | sendPoolStatusRelay( poolName , status , null , 0 , null ) ; |
425 | } |
426 | private void sendPoolStatusRelay( String poolName , int status , |
427 | PoolV2Mode poolMode , |
428 | int statusCode , String statusMessage ){ |
429 | |
430 | if( _poolStatusRelayPath == null )return ; |
431 | |
432 | try{ |
433 | |
434 | PoolStatusChangedMessage msg = new PoolStatusChangedMessage( poolName , status ) ; |
435 | msg.setPoolMode( poolMode ) ; |
436 | msg.setDetail( statusCode , statusMessage ) ; |
437 | _log.info("sendPoolStatusRelay : "+msg); |
438 | sendMessage( |
439 | new CellMessage( _poolStatusRelayPath , msg ) |
440 | ) ; |
441 | |
442 | }catch(Exception ee ){ |
443 | _log.warn("Failed to send poolStatus changed message : "+ee ) ; |
444 | } |
445 | } |
446 | |
447 | public PoolMgrGetPoolLinkGroups |
448 | messageArrived(PoolMgrGetPoolLinkGroups msg) |
449 | { |
450 | Collection<PoolLinkGroupInfo> linkGroupInfos = Utils.linkGroupInfos(_selectionUnit, _costModule).values(); |
451 | |
452 | PoolLinkGroupInfo[] poolLinkGroupInfos = linkGroupInfos.toArray(new PoolLinkGroupInfo[linkGroupInfos.size()]); |
453 | msg.setPoolLinkGroupInfos(poolLinkGroupInfos); |
454 | msg.setSucceeded(); |
455 | return msg; |
456 | } |
457 | |
458 | public PoolManagerGetPoolListMessage |
459 | messageArrived(PoolManagerGetPoolListMessage msg) |
460 | { |
461 | String [] pools = _selectionUnit.getActivePools() ; |
462 | msg.setPoolList(Arrays.asList(pools)) ; |
463 | msg.setSucceeded(); |
464 | return msg; |
465 | } |
466 | |
467 | public PoolMgrGetPoolByLink |
468 | messageArrived(PoolMgrGetPoolByLink msg) |
469 | throws CacheException |
470 | { |
471 | String linkName = msg.getLinkName(); |
472 | long filesize = msg.getFilesize(); |
473 | |
474 | try { |
475 | List<PoolCostCheckable> pools = |
476 | _poolMonitor.queryPoolsByLinkName( linkName , filesize ) ; |
477 | if ((pools == null) || pools.isEmpty()) |
478 | throw new CacheException(57, "No appropriate pools found for link: "+linkName); |
479 | msg.setPoolName(pools.get(0).getPoolName()); |
480 | msg.setSucceeded(); |
481 | } catch (InterruptedException e) { |
482 | throw new CacheException(CacheException.UNEXPECTED_SYSTEM_EXCEPTION, |
483 | "Pool manager is shutting down"); |
484 | } |
485 | return msg; |
486 | } |
487 | |
488 | public PoolManagerGetPoolsByNameMessage |
489 | messageArrived(PoolManagerGetPoolsByNameMessage msg) |
490 | throws CacheException |
491 | { |
492 | try { |
493 | List<PoolManagerPoolInformation> pools = new ArrayList(); |
494 | for (String name: msg.getPoolNames()) { |
495 | try { |
496 | pools.add(_poolMonitor.getPoolInformation(name)); |
497 | } catch (NoSuchElementException e) { |
498 | /* Don't include a pool that doesn't exist. |
499 | */ |
500 | } |
501 | } |
502 | msg.setPools(pools); |
503 | msg.setSucceeded(); |
504 | } catch (InterruptedException e) { |
505 | throw new CacheException(CacheException.UNEXPECTED_SYSTEM_EXCEPTION, |
506 | "Pool manager is shutting down"); |
507 | } |
508 | return msg; |
509 | } |
510 | |
511 | public PoolManagerGetPoolsByLinkMessage |
512 | messageArrived(PoolManagerGetPoolsByLinkMessage msg) |
513 | throws CacheException |
514 | { |
515 | try { |
516 | msg.setPools(_poolMonitor.getPoolsByLink(msg.getLink())); |
517 | msg.setSucceeded(); |
518 | } catch (InterruptedException e) { |
519 | throw new CacheException(CacheException.UNEXPECTED_SYSTEM_EXCEPTION, |
520 | "Pool manager is shutting down"); |
521 | } catch (NoSuchElementException e) { |
522 | Collection<PoolManagerPoolInformation> empty = |
523 | Collections.emptyList(); |
524 | msg.setPools(empty); |
525 | msg.setSucceeded(); |
526 | } |
527 | return msg; |
528 | } |
529 | |
530 | public PoolManagerGetPoolsByPoolGroupMessage |
531 | messageArrived(PoolManagerGetPoolsByPoolGroupMessage msg) |
532 | throws CacheException |
533 | { |
534 | try { |
535 | msg.setPools(_poolMonitor.getPoolsByPoolGroup(msg.getPoolGroup())); |
536 | msg.setSucceeded(); |
537 | } catch (InterruptedException e) { |
538 | throw new CacheException(CacheException.UNEXPECTED_SYSTEM_EXCEPTION, |
539 | "Pool manager is shutting down"); |
540 | } catch (NoSuchElementException e) { |
541 | Collection<PoolManagerPoolInformation> empty = |
542 | Collections.emptyList(); |
543 | msg.setPools(empty); |
544 | msg.setSucceeded(); |
545 | } |
546 | return msg; |
547 | } |
548 | |
549 | public PoolMgrQueryPoolsMsg |
550 | messageArrived(PoolMgrQueryPoolsMsg msg) |
551 | { |
552 | DirectionType accessType = msg.getAccessType(); |
553 | msg.setPoolList(PoolPreferenceLevel.fromPoolPreferenceLevelToList( |
554 | _selectionUnit.match(accessType, |
555 | msg.getNetUnitName(), |
556 | msg.getProtocolUnitName(), |
557 | msg.getStorageInfo(), |
558 | null))); |
559 | msg.setSucceeded(); |
560 | return msg; |
561 | } |
562 | |
563 | private static class XProtocolInfo implements IpProtocolInfo { |
564 | private String [] _host = new String[1] ; |
565 | |
566 | private static final long serialVersionUID = -5817364111427851052L; |
567 | |
568 | private XProtocolInfo( String hostName ){ |
569 | _host[0] = hostName ; |
570 | } |
571 | public String getProtocol(){ return "DCap" ; } |
572 | public int getMinorVersion(){ return 0 ; } |
573 | public int getMajorVersion(){ return 0 ; } |
574 | public String getVersionString(){ return "0.0" ; } |
575 | public String [] getHosts(){ return _host ; } |
576 | public int getPort(){ return 0 ; } |
577 | public boolean isFileCheckRequired() { return true; } |
578 | } |
579 | private static class XStorageInfo extends GenericStorageInfo { |
580 | |
581 | private static final long serialVersionUID = -6624549402952279903L; |
582 | |
583 | private XStorageInfo( String hsm , String storageClass ){ |
584 | super(hsm,storageClass); |
585 | } |
586 | @Override |
587 | public String getBitfileId(){ return "" ; } |
588 | @Override |
589 | public long getFileSize(){ return 100 ; } |
590 | @Override |
591 | public void setFileSize( long fileSize ){} |
592 | @Override |
593 | public boolean isStored(){ return true ; } |
594 | |
595 | } |
596 | public String hh_get_av_pools = "<pnfsId> <hsm> <storageClass> <host>" ; |
597 | public String ac_get_av_pools_$_4( Args args ) throws Exception { |
598 | try{ |
599 | PnfsId pnfsId = new PnfsId( args.argv(0) ) ; |
600 | XStorageInfo storageInfo = new XStorageInfo( args.argv(1) , args.argv(2) ) ; |
601 | XProtocolInfo protocolInfo = new XProtocolInfo( args.argv(3) ) ; |
602 | |
603 | PoolMonitorV5.PnfsFileLocation _pnfsFileLocation = |
604 | _poolMonitor.getPnfsFileLocation( pnfsId , |
605 | storageInfo , |
606 | protocolInfo, null ) ; |
607 | |
608 | List available = _pnfsFileLocation.getFileAvailableMatrix() ; |
609 | |
610 | Iterator i = ((List)available.get(0)).iterator() ; |
611 | StringBuffer sb = new StringBuffer() ; |
612 | sb.append("Available and allowed\n"); |
613 | while( i.hasNext() ){ |
614 | sb.append(" ").append( i.next().toString() ).append("\n"); |
615 | } |
616 | sb.append("Allowed (not available)\n"); |
617 | if( ( available = _pnfsFileLocation.getAllowedButNotAvailable() ) != null ){ |
618 | i = available.iterator() ; |
619 | while( i.hasNext() ){ |
620 | sb.append(" ").append( i.next().toString() ).append("\n"); |
621 | } |
622 | } |
623 | return sb.toString() ; |
624 | |
625 | }catch( Exception ee ){ |
626 | |
627 | ee.printStackTrace() ; |
628 | throw ee ; |
629 | } |
630 | } |
631 | /* |
632 | public String hh_get_pools = "<hsm> <storageClass> <host>"+ |
633 | " [-size=<size>] [-mode=stage|store]" ; |
634 | public String ac_get_pools_$_3( Args args ) throws Exception { |
635 | String mode = args.getOpt("mode") ; |
636 | mode = mode == null ? "stage" : mode ; |
637 | long size = 0L ; |
638 | String sizeString = args.getOpt("size") ; |
639 | if( sizeString != null )size = Long.parseLong(sizeString); |
640 | try{ |
641 | XStorageInfo storageInfo = new XStorageInfo( args.argv(0) , args.argv(1) ) ; |
642 | XProtocolInfo protocolInfo = new XProtocolInfo( args.argv(2) ) ; |
643 | |
644 | List list = mode.equals("stage") ? |
645 | _poolMonitor.getStagePoolList( storageInfo , protocolInfo , size ) : |
646 | _poolMonitor.getStorePoolList( storageInfo , protocolInfo , size ) ; |
647 | |
648 | Iterator i = list.iterator() ; |
649 | StringBuffer sb = new StringBuffer() ; |
650 | while( i.hasNext() ){ |
651 | sb.append( i.next().toString() ).append("\n"); |
652 | } |
653 | return sb.toString() ; |
654 | |
655 | }catch( Exception ee ){ |
656 | |
657 | ee.printStackTrace() ; |
658 | throw ee ; |
659 | } |
660 | } |
661 | */ |
662 | |
663 | private boolean quotasExceeded( StorageInfo info ){ |
664 | |
665 | String storageClass = info.getStorageClass()+"@"+info.getHsm() ; |
666 | |
667 | QuotaMgrCheckQuotaMessage quotas = new QuotaMgrCheckQuotaMessage( storageClass ) ; |
668 | CellMessage msg = new CellMessage( new CellPath(_quotaManager) , quotas ) ; |
669 | try{ |
670 | msg = sendAndWait( msg , 20000L ) ; |
671 | if( msg == null ){ |
672 | _log.warn("quotasExceeded of "+storageClass+" : request timed out"); |
673 | return false ; |
674 | } |
675 | Object obj = msg.getMessageObject() ; |
676 | if( ! (obj instanceof QuotaMgrCheckQuotaMessage ) ){ |
677 | _log.warn("quotasExceeded of "+storageClass+" : unexpected object arrived : "+obj.getClass().getName()); |
678 | return false ; |
679 | } |
680 | |
681 | return ((QuotaMgrCheckQuotaMessage)obj).isHardQuotaExceeded() ; |
682 | |
683 | }catch(Exception ee ){ |
684 | |
685 | _log.warn( "quotasExceeded of "+storageClass+" : Exception : "+ee); |
686 | _log.warn(ee.toString()); |
687 | return false ; |
688 | } |
689 | |
690 | } |
691 | |
692 | private long determineExpectedFileSize(long expectedLength, StorageInfo storageInfo) |
693 | { |
694 | if (expectedLength > 0) { |
695 | return expectedLength; |
696 | } |
697 | |
698 | if (storageInfo.getFileSize() > 0) { |
699 | return storageInfo.getFileSize(); |
700 | } |
701 | |
702 | String s = storageInfo.getKey("alloc-size"); |
703 | if (s != null) { |
704 | try { |
705 | return Long.parseLong(s); |
706 | } catch (NumberFormatException e) { |
707 | // bad values are ignored |
708 | } |
709 | } |
710 | |
711 | return 0; |
712 | } |
713 | |
714 | public DelayedReply messageArrived(PoolManagerSelectLinkGroupForWriteMessage message) |
715 | throws CacheException |
716 | { |
717 | if (message.getStorageInfo() == null) { |
718 | throw new IllegalArgumentException("Storage info is missing"); |
719 | } |
720 | if (message.getProtocolInfo() == null ){ |
721 | throw new IllegalArgumentException("Protocol info is missing"); |
722 | } |
723 | |
724 | return new LinkGroupSelectionTask(message); |
725 | } |
726 | |
727 | /** |
728 | * Task for processing link group selection messages. |
729 | */ |
730 | public class LinkGroupSelectionTask |
731 | extends DelayedReply |
732 | implements Runnable |
733 | { |
734 | private final PoolManagerSelectLinkGroupForWriteMessage _message; |
735 | private final CDC _cdc; |
736 | |
737 | public LinkGroupSelectionTask(PoolManagerSelectLinkGroupForWriteMessage message) |
738 | { |
739 | _message = message; |
740 | _cdc = new CDC(); |
741 | new Thread(this, "LinkGroupSelectionTask").start(); |
742 | } |
743 | |
744 | public void run() |
745 | { |
746 | long started = System.currentTimeMillis(); |
747 | _cdc.apply(); |
748 | try { |
749 | _log.info("Select link group handler started"); |
750 | |
751 | _message.setLinkGroups(selectLinkGroups()); |
752 | _message.setSucceeded(); |
753 | |
754 | _log.info("Select link group handler finished after {} ms", |
755 | (System.currentTimeMillis() - started)); |
756 | } catch (Exception e) { |
757 | _message.setFailed(CacheException.UNEXPECTED_SYSTEM_EXCEPTION, |
758 | e.getMessage()); |
759 | } finally { |
760 | try { |
761 | send(_message); |
762 | } catch (NoRouteToCellException e) { |
763 | _log.error("Failed to send reply: " + e.getMessage()); |
764 | } catch (InterruptedException e) { |
765 | _log.warn("Link group selection handler was interrupted"); |
766 | } finally { |
767 | CDC.clear(); |
768 | } |
769 | } |
770 | } |
771 | |
772 | protected List<String> selectLinkGroups() |
773 | { |
774 | StorageInfo storageInfo = _message.getStorageInfo(); |
775 | ProtocolInfo protocolInfo = _message.getProtocolInfo(); |
776 | long expectedLength = |
777 | determineExpectedFileSize(_message.getFileSize(), storageInfo); |
778 | String protocol = |
779 | protocolInfo.getProtocol() + "/" + protocolInfo.getMajorVersion(); |
780 | String hostName = |
781 | (protocolInfo instanceof IpProtocolInfo) |
782 | ? ((IpProtocolInfo) protocolInfo).getHosts()[0] |
783 | : null; |
784 | |
785 | Collection<String> linkGroups = _message.getLinkGroups(); |
786 | if (linkGroups == null) { |
787 | linkGroups = |
788 | Utils.linkGroupInfos(_selectionUnit, _costModule).keySet(); |
789 | } |
790 | |
791 | List<String> outputLinkGroups = |
792 | new ArrayList<String>(linkGroups.size()); |
793 | |
794 | for (String linkGroup: linkGroups) { |
795 | PoolPreferenceLevel [] level = |
796 | _selectionUnit.match(DirectionType.WRITE, |
797 | hostName, |
798 | protocol, |
799 | storageInfo, |
800 | linkGroup); |
801 | if (level.length > 0) { |
802 | outputLinkGroups.add(linkGroup); |
803 | } |
804 | } |
805 | |
806 | return outputLinkGroups; |
807 | } |
808 | } |
809 | |
810 | /////////////////////////////////////////////////////////////// |
811 | // |
812 | // the write io request handler |
813 | // |
814 | public DelayedReply messageArrived(CellMessage envelope, |
815 | PoolMgrSelectWritePoolMsg msg) |
816 | { |
817 | return new WriteRequestHandler(envelope, msg); |
818 | } |
819 | |
820 | public class WriteRequestHandler extends DelayedReply implements Runnable |
821 | { |
822 | private CellMessage _envelope; |
823 | private PoolMgrSelectWritePoolMsg _request; |
824 | private PnfsId _pnfsId; |
825 | |
826 | public WriteRequestHandler(CellMessage envelope, |
827 | PoolMgrSelectWritePoolMsg msg) |
828 | { |
829 | _envelope = envelope; |
830 | _request = msg; |
831 | _pnfsId = _request.getPnfsId(); |
832 | new Thread(this, "writeHandler").start(); |
833 | } |
834 | |
835 | public void run(){ |
836 | |
837 | StorageInfo storageInfo = _request.getStorageInfo() ; |
838 | ProtocolInfo protocolInfo = _request.getProtocolInfo() ; |
839 | |
840 | _log.info( _pnfsId.toString()+" write handler started" ); |
841 | long started = System.currentTimeMillis(); |
842 | |
843 | if( storageInfo == null ){ |
844 | requestFailed( 21 , "Storage info not available for write request : "+_pnfsId ) ; |
845 | return ; |
846 | }else if( protocolInfo == null ){ |
847 | requestFailed( 22 , "Protocol info not available for write request : "+_pnfsId ) ; |
848 | return ; |
849 | } |
850 | if( _quotasEnabled && quotasExceeded( storageInfo ) ){ |
851 | requestFailed( 55 , "Quotas Exceeded for StorageClass : "+storageInfo.getStorageClass() ) ; |
852 | return ; |
853 | } |
854 | |
855 | long expectedLength = |
856 | determineExpectedFileSize(_request.getFileSize(), storageInfo); |
857 | |
858 | /* The cost module relies on the expected file size. |
859 | */ |
860 | _request.setFileSize(expectedLength); |
861 | |
862 | try{ |
863 | |
864 | List<PoolCostCheckable> storeList = _poolMonitor. |
865 | getPnfsFileLocation( _pnfsId , storageInfo , protocolInfo, _request.getLinkGroup() ). |
866 | getStorePoolList( expectedLength ) ; |
867 | /* |
868 | List storeList = |
869 | _poolMonitor.getStorePoolList( storageInfo , |
870 | protocolInfo , |
871 | expectedLength ); |
872 | */ |
873 | String poolName = storeList.get(0).getPoolName() ; |
874 | |
875 | if (_sendCostInfo) |
876 | _requestContainer.sendCostMsg( |
877 | _pnfsId, storeList.get(0), true |
878 | ); //VP |
879 | |
880 | _log.info(_pnfsId+" write handler selected "+poolName+" after "+ |
881 | ( System.currentTimeMillis() - started ) ); |
882 | requestSucceeded( poolName ) ; |
883 | |
884 | }catch(CacheException ce ){ |
885 | requestFailed( ce.getRc() , ce.getMessage() ) ; |
886 | }catch(Exception ee ){ |
887 | requestFailed( 17 , ee.getMessage() ) ; |
888 | } |
889 | } |
890 | |
891 | protected void requestFailed(int errorCode, String errorMessage) |
892 | { |
893 | _request.setFailed(errorCode, errorMessage); |
894 | try { |
895 | send(_request); |
896 | } catch (Exception e) { |
897 | _log.warn("Exception requestFailed : " + e, e); |
898 | } |
899 | } |
900 | |
901 | protected void requestSucceeded(String poolName) |
902 | { |
903 | _request.setPoolName(poolName); |
904 | _request.setSucceeded(); |
905 | try { |
906 | send(_request); |
907 | _costModule.messageArrived(_envelope); |
908 | } catch (Exception e) { |
909 | _log.warn("Exception in requestSucceeded : " + e, e); |
910 | } |
911 | } |
912 | } |
913 | |
914 | public String ac_free_$_0(Args args) { |
915 | |
916 | |
917 | Map<String, PoolLinkGroupInfo> linkGroupSize = Utils.linkGroupInfos(_selectionUnit, _costModule); |
918 | |
919 | StringBuilder sb = new StringBuilder(); |
920 | |
921 | for(Map.Entry<String, PoolLinkGroupInfo> linkGourp: linkGroupSize.entrySet() ) { |
922 | sb.append(linkGourp.getKey()).append(" : ") |
923 | .append(linkGourp.getValue().getAvailableSpaceInBytes() ).append("\n"); |
924 | } |
925 | |
926 | return sb.toString(); |
927 | |
928 | } |
929 | } |