EMMA Coverage Report (generated Mon Aug 23 17:21:34 CEST 2010)
[all classes][diskCacheV111.poolManager]

COVERAGE SUMMARY FOR SOURCE FILE [RequestContainerV5.java]

nameclass, %method, %block, %line, %
RequestContainerV5.java50%  (1/2)17%  (19/114)6%   (342/5990)8%   (89/1056)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class RequestContainerV5$PoolRequestHandler0%   (0/1)0%   (0/44)0%   (0/3931)0%   (0/698)
RequestContainerV5$PoolRequestHandler (RequestContainerV5, PnfsId, String, in... 0%   (0/1)0%   (0/115)0%   (0/33)
access$000 (RequestContainerV5$PoolRequestHandler, Object): void 0%   (0/1)0%   (0/4)0%   (0/1)
access$100 (RequestContainerV5$PoolRequestHandler): void 0%   (0/1)0%   (0/3)0%   (0/1)
access$1100 (RequestContainerV5$PoolRequestHandler): void 0%   (0/1)0%   (0/3)0%   (0/1)
access$200 (RequestContainerV5$PoolRequestHandler, boolean): void 0%   (0/1)0%   (0/4)0%   (0/1)
access$2100 (RequestContainerV5$PoolRequestHandler): CDC 0%   (0/1)0%   (0/3)0%   (0/1)
access$2200 (RequestContainerV5$PoolRequestHandler): void 0%   (0/1)0%   (0/3)0%   (0/1)
access$2300 (RequestContainerV5$PoolRequestHandler): LinkedList 0%   (0/1)0%   (0/3)0%   (0/1)
access$2402 (RequestContainerV5$PoolRequestHandler, boolean): boolean 0%   (0/1)0%   (0/5)0%   (0/1)
access$300 (RequestContainerV5$PoolRequestHandler): int 0%   (0/1)0%   (0/3)0%   (0/1)
access$400 (RequestContainerV5$PoolRequestHandler, int, String): void 0%   (0/1)0%   (0/5)0%   (0/1)
access$900 (RequestContainerV5$PoolRequestHandler): UOID 0%   (0/1)0%   (0/3)0%   (0/1)
access$902 (RequestContainerV5$PoolRequestHandler, UOID): UOID 0%   (0/1)0%   (0/5)0%   (0/1)
add (Object): void 0%   (0/1)0%   (0/57)0%   (0/9)
addRequest (CellMessage): void 0%   (0/1)0%   (0/70)0%   (0/16)
alive (): void 0%   (0/1)0%   (0/11)0%   (0/4)
answerRequest (int): boolean 0%   (0/1)0%   (0/76)0%   (0/19)
askForFileStoreLocation (PoolSelectionUnit$DirectionType): PoolCostCheckable 0%   (0/1)0%   (0/222)0%   (0/37)
askForPoolToPool (boolean): int 0%   (0/1)0%   (0/546)0%   (0/96)
askForStaging (): int 0%   (0/1)0%   (0/102)0%   (0/15)
askIfAvailable (): int 0%   (0/1)0%   (0/417)0%   (0/70)
canStage (): boolean 0%   (0/1)0%   (0/68)0%   (0/13)
clearSteering (): void 0%   (0/1)0%   (0/34)0%   (0/7)
errorHandler (): void 0%   (0/1)0%   (0/121)0%   (0/18)
exercisePool2PoolReply (Message): int 0%   (0/1)0%   (0/94)0%   (0/15)
exerciseStageReply (Message): int 0%   (0/1)0%   (0/104)0%   (0/19)
expireRequests (): void 0%   (0/1)0%   (0/69)0%   (0/15)
failed (int, String): void 0%   (0/1)0%   (0/39)0%   (0/8)
getPoolCandidate (): String 0%   (0/1)0%   (0/16)0%   (0/1)
getPoolCandidateState (): String 0%   (0/1)0%   (0/32)0%   (0/1)
getRestoreHandlerInfo (): RestoreHandlerInfo 0%   (0/1)0%   (0/21)0%   (0/1)
getStorageInfo (): void 0%   (0/1)0%   (0/78)0%   (0/16)
handleCommandObject (Object []): void 0%   (0/1)0%   (0/98)0%   (0/22)
mailForYou (Object): void 0%   (0/1)0%   (0/4)0%   (0/2)
nextStep (int, int): void 0%   (0/1)0%   (0/173)0%   (0/30)
outOfResources (String): void 0%   (0/1)0%   (0/39)0%   (0/6)
retry (boolean): void 0%   (0/1)0%   (0/22)0%   (0/6)
sendFetchRequest (String, StorageInfo): boolean 0%   (0/1)0%   (0/83)0%   (0/10)
sendPool2PoolRequest (String, String): void 0%   (0/1)0%   (0/96)0%   (0/12)
setError (int, String): void 0%   (0/1)0%   (0/7)0%   (0/3)
stateEngine (Object): void 0%   (0/1)0%   (0/887)0%   (0/171)
stateLoop (): void 0%   (0/1)0%   (0/137)0%   (0/19)
toString (): String 0%   (0/1)0%   (0/43)0%   (0/1)
waitFor (long): void 0%   (0/1)0%   (0/6)0%   (0/2)
     
class RequestContainerV5100% (1/1)27%  (19/70)17%  (342/2059)25%  (89/358)
ac_rc_destroy_$_1 (Args): String 0%   (0/1)0%   (0/48)0%   (0/8)
ac_rc_failed_$_1_3 (Args): String 0%   (0/1)0%   (0/66)0%   (0/10)
ac_rc_ls_$_0_1 (Args): String 0%   (0/1)0%   (0/144)0%   (0/28)
ac_rc_retry_$_1 (Args): String 0%   (0/1)0%   (0/118)0%   (0/20)
ac_rc_select_$_0_3 (Args): String 0%   (0/1)0%   (0/120)0%   (0/16)
ac_rc_suspend_$_0_1 (Args): String 0%   (0/1)0%   (0/57)0%   (0/13)
ac_replicate_$_2 (Args): String 0%   (0/1)0%   (0/91)0%   (0/15)
ac_xrc_ls (Args): Object 0%   (0/1)0%   (0/51)0%   (0/9)
access$1000 (): Logger 0%   (0/1)0%   (0/2)0%   (0/1)
access$1200 (RequestContainerV5, CellMessage): void 0%   (0/1)0%   (0/4)0%   (0/1)
access$1300 (RequestContainerV5): PoolMonitorV5 0%   (0/1)0%   (0/3)0%   (0/1)
access$1500 (RequestContainerV5): int 0%   (0/1)0%   (0/3)0%   (0/1)
access$1600 (RequestContainerV5, CellMessage): void 0%   (0/1)0%   (0/4)0%   (0/1)
access$1700 (RequestContainerV5): SimpleDateFormat 0%   (0/1)0%   (0/3)0%   (0/1)
access$1800 (RequestContainerV5, CellMessage): void 0%   (0/1)0%   (0/4)0%   (0/1)
access$1900 (RequestContainerV5): Map 0%   (0/1)0%   (0/3)0%   (0/1)
access$2000 (RequestContainerV5, CellMessage): void 0%   (0/1)0%   (0/4)0%   (0/1)
access$2500 (RequestContainerV5): ThreadPool 0%   (0/1)0%   (0/3)0%   (0/1)
access$2600 (int): String 0%   (0/1)0%   (0/3)0%   (0/1)
access$2700 (RequestContainerV5): CheckStagePermission 0%   (0/1)0%   (0/3)0%   (0/1)
access$2800 (RequestContainerV5, PnfsId, StorageInfo, int, String): void 0%   (0/1)0%   (0/7)0%   (0/1)
access$2900 (RequestContainerV5): Map 0%   (0/1)0%   (0/3)0%   (0/1)
access$3000 (RequestContainerV5): boolean 0%   (0/1)0%   (0/3)0%   (0/1)
access$3100 (RequestContainerV5): boolean 0%   (0/1)0%   (0/3)0%   (0/1)
access$3200 (RequestContainerV5, PnfsId, String, boolean): void 0%   (0/1)0%   (0/6)0%   (0/1)
access$3400 (RequestContainerV5): boolean 0%   (0/1)0%   (0/3)0%   (0/1)
access$3500 (RequestContainerV5): boolean 0%   (0/1)0%   (0/3)0%   (0/1)
access$3708 (RequestContainerV5): int 0%   (0/1)0%   (0/8)0%   (0/1)
access$3900 (RequestContainerV5): int 0%   (0/1)0%   (0/3)0%   (0/1)
access$4100 (RequestContainerV5, CellMessage, long): CellMessage 0%   (0/1)0%   (0/5)0%   (0/1)
access$4200 (RequestContainerV5): int 0%   (0/1)0%   (0/3)0%   (0/1)
access$4300 (RequestContainerV5): long 0%   (0/1)0%   (0/3)0%   (0/1)
access$4400 (RequestContainerV5): String 0%   (0/1)0%   (0/3)0%   (0/1)
access$4500 (RequestContainerV5, PoolManagerParameter): double 0%   (0/1)0%   (0/4)0%   (0/1)
access$4600 (RequestContainerV5): int 0%   (0/1)0%   (0/3)0%   (0/1)
access$500 (RequestContainerV5): long 0%   (0/1)0%   (0/3)0%   (0/1)
access$700 (RequestContainerV5): PartitionManager 0%   (0/1)0%   (0/3)0%   (0/1)
access$800 (RequestContainerV5): Map 0%   (0/1)0%   (0/3)0%   (0/1)
getCurrentCostCut (PoolManagerParameter): double 0%   (0/1)0%   (0/12)0%   (0/1)
getInfo (PrintWriter): void 0%   (0/1)0%   (0/267)0%   (0/23)
getSameHostCopyMode (): String 0%   (0/1)0%   (0/17)0%   (0/2)
getSameHostRetryMode (): String 0%   (0/1)0%   (0/19)0%   (0/1)
messageArrived (CellMessage, Object): void 0%   (0/1)0%   (0/31)0%   (0/8)
messageArrived (CellMessage, PoolMgrSelectReadPoolMsg): void 0%   (0/1)0%   (0/143)0%   (0/25)
modeToString (int): String 0%   (0/1)0%   (0/30)0%   (0/10)
poolStatusChanged (String, int): void 0%   (0/1)0%   (0/103)0%   (0/19)
printSetup (PrintWriter): void 0%   (0/1)0%   (0/82)0%   (0/11)
sendCostMsg (PnfsId, PoolCostCheckable, boolean): void 0%   (0/1)0%   (0/2)0%   (0/2)
sendHitMsg (PnfsId, String, boolean): void 0%   (0/1)0%   (0/37)0%   (0/7)
sendInfoMessage (PnfsId, StorageInfo, int, String): void 0%   (0/1)0%   (0/36)0%   (0/7)
setHitInfoMessages (boolean): void 0%   (0/1)0%   (0/4)0%   (0/2)
run (): void 100% (1/1)3%   (2/58)6%   (1/18)
ac_rc_set_max_restore_$_1 (Args): String 100% (1/1)39%  (11/28)38%  (3/8)
ac_rc_set_sameHostCopy_$_1 (Args): String 100% (1/1)51%  (30/59)61%  (7.4/12)
ac_rc_set_sameHostRetry_$_1 (Args): String 100% (1/1)52%  (22/42)67%  (6/9)
ac_rc_onerror_$_1 (Args): String 100% (1/1)70%  (21/30)73%  (3.7/5)
<static initializer> 100% (1/1)100% (39/39)100% (2/2)
RequestContainerV5 (): void 100% (1/1)100% (4/4)100% (2/2)
RequestContainerV5 (int): void 100% (1/1)100% (129/129)100% (40/40)
ac_rc_set_max_retries_$_1 (Args): String 100% (1/1)100% (8/8)100% (2/2)
ac_rc_set_max_threads_$_1 (Args): String 100% (1/1)100% (18/18)100% (3/3)
ac_rc_set_poolpingtimer_$_1 (Args): String 100% (1/1)100% (10/10)100% (2/2)
ac_rc_set_retry_$_1 (Args): String 100% (1/1)100% (10/10)100% (2/2)
ac_rc_set_warning_path_$_0_1 (Args): String 100% (1/1)100% (11/11)100% (3/3)
setPartitionManager (PartitionManager): void 100% (1/1)100% (4/4)100% (2/2)
setPoolMonitor (PoolMonitorV5): void 100% (1/1)100% (4/4)100% (2/2)
setPoolSelectionUnit (PoolSelectionUnit): void 100% (1/1)100% (4/4)100% (2/2)
setSendCostInfo (boolean): void 100% (1/1)100% (4/4)100% (2/2)
setStageConfigurationFile (String): void 100% (1/1)100% (7/7)100% (2/2)
setThreadPool (ThreadPool): void 100% (1/1)100% (4/4)100% (2/2)

1// $Id: RequestContainerV5.java,v 1.62 2007-09-02 17:51:31 tigran Exp $
2 
3package diskCacheV111.poolManager ;
4 
5import java.io.PrintWriter;
6import java.io.IOException;
7import java.text.SimpleDateFormat;
8import java.util.ArrayList;
9import java.util.Collections;
10import java.util.Date;
11import java.util.HashMap;
12import java.util.Iterator;
13import java.util.LinkedList;
14import java.util.List;
15import java.util.Map;
16import java.util.Set;
17import java.util.SortedMap;
18import java.util.TreeMap;
19import java.util.regex.Pattern;
20import java.util.regex.PatternSyntaxException;
21 
22import org.slf4j.Logger;
23import org.slf4j.LoggerFactory;
24import org.dcache.cells.AbstractCellComponent;
25import org.dcache.cells.CellCommandListener;
26import org.dcache.cells.CellMessageReceiver;
27 
28import diskCacheV111.poolManager.PoolSelectionUnit.DirectionType;
29import diskCacheV111.util.CacheException;
30import diskCacheV111.util.CheckStagePermission;
31import diskCacheV111.util.ExtendedRunnable;
32import diskCacheV111.util.PnfsId;
33import diskCacheV111.util.ThreadPool;
34import diskCacheV111.vehicles.DCapProtocolInfo;
35import diskCacheV111.vehicles.IpProtocolInfo;
36import diskCacheV111.vehicles.Message;
37import diskCacheV111.vehicles.PnfsGetStorageInfoMessage;
38import diskCacheV111.vehicles.Pool2PoolTransferMsg;
39import diskCacheV111.vehicles.PoolCheckable;
40import diskCacheV111.vehicles.PoolCostCheckable;
41import diskCacheV111.vehicles.PoolFetchFileMessage;
42import diskCacheV111.vehicles.PoolHitInfoMessage;
43import diskCacheV111.vehicles.PoolMgrReplicateFileMsg;
44import diskCacheV111.vehicles.PoolMgrSelectPoolMsg;
45import diskCacheV111.vehicles.PoolMgrSelectReadPoolMsg;
46import diskCacheV111.vehicles.PoolStatusChangedMessage;
47import diskCacheV111.vehicles.ProtocolInfo;
48import diskCacheV111.vehicles.RestoreHandlerInfo;
49import diskCacheV111.vehicles.StorageInfo;
50import diskCacheV111.vehicles.WarningPnfsFileInfoMessage;
51import dmg.cells.nucleus.CDC;
52import dmg.cells.nucleus.CellMessage;
53import dmg.cells.nucleus.CellPath;
54import dmg.cells.nucleus.NoRouteToCellException;
55import dmg.cells.nucleus.UOID;
56import dmg.util.Args;
57 
58import org.dcache.auth.FQANPrincipal;
59import org.dcache.auth.Subjects;
60import org.globus.gsi.jaas.GlobusPrincipal;
61 
62public class RequestContainerV5
63    extends AbstractCellComponent
64    implements Runnable, CellCommandListener, CellMessageReceiver
65{
66    private static final Logger _log =
67        LoggerFactory.getLogger(RequestContainerV5.class);
68 
69    /**
70     * State of CheckFilePingHandler.
71     */
72    private enum PingState { STOPPED, WAITING, QUERYING };
73 
74    private static final String POOL_UNKNOWN_STRING  = "<unknown>" ;
75 
76    private static final String STRING_NEVER      = "never" ;
77    private static final String STRING_BESTEFFORT = "besteffort" ;
78    private static final String STRING_NOTCHECKED = "notchecked" ;
79 
80    private static final int SAME_HOST_RETRY_NEVER      = 0 ;
81    private static final int SAME_HOST_RETRY_BESTEFFORT = 1 ;
82    private static final int SAME_HOST_RETRY_NOTCHECKED = 2 ;
83    /** value in milliseconds */
84    private static final int DEFAULT_RETRY_INTERVAL = 60000;
85 
86    private final Map<UOID, PoolRequestHandler>     _messageHash   = new HashMap<UOID, PoolRequestHandler>() ;
87    private final Map<String, PoolRequestHandler>   _handlerHash   = new HashMap<String, PoolRequestHandler>() ;
88 
89    private String      _warningPath   = "billing" ;
90    private long        _retryTimer    = 15 * 60 * 1000 ;
91 
92    private int         _maxRequestClumping = 1 ;
93 
94    private String      _onError       = "suspend" ;
95    private int         _maxRetries    = 3 ;
96    private int         _maxRestore    = -1 ;
97    private boolean     _sendCostInfo  = false ;
98 
99    private CheckStagePermission _stagePolicyDecisionPoint;
100 
101    private boolean     _sendHitInfo   = false ;
102 
103    private int         _restoreExceeded = 0 ;
104    private boolean     _suspendIncoming = false ;
105    private boolean     _suspendStaging  = false ;
106 
107    private PoolSelectionUnit  _selectionUnit;
108    private PoolMonitorV5      _poolMonitor;
109    private final SimpleDateFormat   _formatter        = new SimpleDateFormat ("MM.dd HH:mm:ss");
110    private ThreadPool         _threadPool ;
111    private final Map<PnfsId, CacheException>            _selections       = new HashMap<PnfsId, CacheException>() ;
112    private PartitionManager   _partitionManager ;
113    private long               _checkFilePingTimer = 10 * 60 * 1000 ;
114    /** value in milliseconds */
115    private final int _stagingRetryInterval;
116 
117    /**
118     * define host selection behavior on restore retry
119     */
120    private int _sameHostRetry = SAME_HOST_RETRY_NOTCHECKED ;
121 
122    /**
123     * Tape Protection.
124     * allStates defines that all states are allowed.
125     * allStatesExceptStage defines that all states except STAGE are allowed.
126     */
127    public static final int allStates = PoolRequestHandler.ST_STAGE | PoolRequestHandler.ST_INIT | PoolRequestHandler.ST_DONE
128            | PoolRequestHandler.ST_POOL_2_POOL | PoolRequestHandler.ST_WAITING | PoolRequestHandler.ST_WAITING_FOR_STAGING
129            | PoolRequestHandler.ST_WAITING_FOR_POOL_2_POOL | PoolRequestHandler.ST_SUSPENDED;
130 
131    public static final int allStatesExceptStage = PoolRequestHandler.ST_INIT | PoolRequestHandler.ST_DONE | PoolRequestHandler.ST_POOL_2_POOL
132            | PoolRequestHandler.ST_WAITING | PoolRequestHandler.ST_WAITING_FOR_STAGING | PoolRequestHandler.ST_WAITING_FOR_POOL_2_POOL
133            | PoolRequestHandler.ST_SUSPENDED;
134 
135    private static String modeToString(int mode){
136        switch (mode) {
137        case 1:
138            return "ST_INIT";
139        case 2:
140            return "ST_DONE";
141        case 4:
142            return "ST_POOL_2_POOL";
143        case 8:
144            return "ST_STAGE";
145        case 16:
146            return "ST_WAITING";
147        case 32:
148            return "ST_WAITING_FOR_STAGING";
149        case 64:
150            return "ST_WAITING_FOR_POOL_2_POOL";
151        case 128:
152            return "ST_SUSPENDED";
153       default:
154            throw new IllegalArgumentException("Invalid mode : " + mode);
155       }
156    }
157 
158    public RequestContainerV5( int stagingRetryInterval) {
159        _stagingRetryInterval = stagingRetryInterval;
160        new Thread(this,"Container-ticker").start();
161    }
162 
163    public RequestContainerV5()
164    {
165        this( DEFAULT_RETRY_INTERVAL);
166    }
167 
168    public void setPoolSelectionUnit(PoolSelectionUnit selectionUnit)
169    {
170        _selectionUnit = selectionUnit;
171    }
172 
173    public void setPoolMonitor(PoolMonitorV5 poolMonitor)
174    {
175        _poolMonitor = poolMonitor;
176    }
177 
178    public void setPartitionManager(PartitionManager partitionManager)
179    {
180        _partitionManager = partitionManager;
181    }
182 
183    public void setThreadPool(ThreadPool threadPool)
184    {
185        _threadPool = threadPool;
186    }
187 
188    public void setHitInfoMessages(boolean sendHitInfo)
189    {
190        _sendHitInfo = sendHitInfo;
191    }
192 
193    public void messageArrived(CellMessage envelope, Object message)
194    {
195        UOID uoid = envelope.getLastUOID();
196        PoolRequestHandler handler;
197 
198        synchronized (_messageHash) {
199            handler = _messageHash.remove(uoid);
200            if (handler == null) {
201                return;
202            }
203        }
204 
205        handler.mailForYou(message);
206    }
207 
208    public void run(){
209       try{
210          while( ! Thread.interrupted() ){
211 
212             Thread.sleep(_stagingRetryInterval) ;
213 
214             List<PoolRequestHandler> list = null ;
215             synchronized( _handlerHash ){
216                list = new ArrayList<PoolRequestHandler>( _handlerHash.values() ) ;
217             }
218             try{
219                for( PoolRequestHandler handler: list ){
220                   if( handler == null )continue ;
221                   handler.alive() ;
222                }
223             }catch(Throwable t){
224                _log.warn(t.toString()) ;
225                continue ;
226             }
227          }
228       }catch(InterruptedException ie ){
229          _log.warn("Container-ticker done");
230       }
231    }
232    public void poolStatusChanged(String poolName, int poolStatus) {
233        _log.info("Restore Manager : got 'poolRestarted' for " + poolName);
234        try {
235            List<PoolRequestHandler> list = null;
236            synchronized (_handlerHash) {
237                list = new ArrayList<PoolRequestHandler>(_handlerHash.values());
238            }
239 
240            for (PoolRequestHandler rph : list) {
241 
242                if (rph == null)
243                    continue;
244 
245 
246                switch( poolStatus ) {
247                    case PoolStatusChangedMessage.UP:
248                        /*
249                         * if pool is up, re-try all request scheduled to this pool
250                         * and all requests, which do not have any pool candidates
251                         *
252                         * in this construction we will fall down to next case
253                         */
254                        if (rph.getPoolCandidate().equals(POOL_UNKNOWN_STRING) ) {
255                            _log.info("Restore Manager : retrying : " + rph);
256                            rph.retry(false);
257                        }
258                    case PoolStatusChangedMessage.DOWN:
259                        /*
260                         * if pool is down, re-try all request scheduled to this
261                         * pool
262                         */
263                        if (rph.getPoolCandidate().equals(poolName) ) {
264                            _log.info("Restore Manager : retrying : " + rph);
265                            rph.retry(false);
266                        }
267                }
268 
269            }
270 
271        } catch (Exception ee) {
272            _log.warn("Problem retrying pool " + poolName + " (" + ee + ")");
273        }
274    }
275 
276    @Override
277    public void getInfo(PrintWriter pw)
278    {
279       PoolManagerParameter def = _partitionManager.getParameterCopyOf() ;
280 
281       pw.println("Restore Controller [$Revision: 14204 $]\n") ;
282       pw.println( "      Retry Timeout : "+(_retryTimer/1000)+" seconds" ) ;
283       pw.println( "  Thread Controller : "+_threadPool ) ;
284       pw.println( "    Maximum Retries : "+_maxRetries ) ;
285       pw.println( "    Pool Ping Timer : "+(_checkFilePingTimer/1000) + " seconds" ) ;
286       pw.println( "           On Error : "+_onError ) ;
287       pw.println( "       Warning Path : "+_warningPath ) ;
288       pw.println( "          Allow p2p : "+( def._p2pAllowed ? "on" : "off" )+
289                                          " oncost="+( def._p2pOnCost ? "on" : "off" )+
290                                          " fortransfer="+( def._p2pForTransfer ? "on" : "off" ) );
291       pw.println( "      Allow staging : "+(def._hasHsmBackend ? "on":"off") ) ;
292       pw.println( "Allow stage on cost : "+(def._stageOnCost ? "on":"off") ) ;
293       pw.println( "          P2p Slope : "+(float)def._slope ) ;
294       pw.println( "     P2p Max Copies : "+def._maxPnfsFileCopies) ;
295       pw.println( "          Cost Cuts : idle="+def._minCostCut+",p2p="+def.getCostCutString()+
296                                       ",alert="+def._alertCostCut+",halt="+def._panicCostCut+
297                                       ",fallback="+def._fallbackCostCut) ;
298       pw.println( "      Restore Limit : "+(_maxRestore<0?"unlimited":(""+_maxRestore)));
299       pw.println( "   Restore Exceeded : "+_restoreExceeded ) ;
300       pw.println( "Allow same host p2p : "+getSameHostCopyMode() ) ;
301       pw.println( "    Same host retry : "+getSameHostRetryMode() ) ;
302       if( _suspendIncoming )
303            pw.println( "   Suspend Incoming : on (not persistent)");
304       if( _suspendStaging )
305            pw.println( "   Suspend Staging  : on (not persistent)");
306    }
307 
308    @Override
309    public void printSetup(PrintWriter pw)
310    {
311        pw.append("#\n# Submodule [rc] : ").append(this.getClass().toString()).append("\n#\n");
312        pw.append("rc onerror ").println(_onError);
313        pw.append("rc set max retries ").println(_maxRetries);
314        pw.append("rc set retry ").println(_retryTimer/1000);
315        pw.append("rc set warning path ").println(_warningPath);
316        pw.append("rc set poolpingtimer ").println(_checkFilePingTimer/1000);
317        pw.append("rc set max restore ")
318            .println(_maxRestore<0?"unlimited":(""+_maxRestore));
319        pw.append("rc set sameHostCopy ")
320            .println(getSameHostCopyMode());
321        pw.append("rc set sameHostRetry ")
322            .println(getSameHostRetryMode());
323        pw.append("rc set max threads ")
324            .println(_threadPool.getMaxThreadCount());
325    }
326 
327    private String getSameHostRetryMode(){
328        return _sameHostRetry == SAME_HOST_RETRY_NEVER      ? STRING_NEVER :
329               _sameHostRetry == SAME_HOST_RETRY_BESTEFFORT ? STRING_BESTEFFORT :
330               _sameHostRetry == SAME_HOST_RETRY_NOTCHECKED ? STRING_NOTCHECKED :
331               "UNDEFINED" ;
332 
333     }
334 
335     private String getSameHostCopyMode(){
336       PoolManagerParameter def = _partitionManager.getParameterCopyOf() ;
337       return def._allowSameHostCopy == PoolManagerParameter.P2P_SAME_HOST_BEST_EFFORT ? STRING_BESTEFFORT :
338              def._allowSameHostCopy == PoolManagerParameter.P2P_SAME_HOST_NEVER       ? STRING_NEVER :
339              STRING_NOTCHECKED ;
340     }
341 
342    public String hh_rc_set_max_threads = "<threadCount> # 0 : no limits" ;
343    public String ac_rc_set_max_threads_$_1( Args args ){
344       int n = Integer.parseInt(args.argv(0));
345       _threadPool.setMaxThreadCount(n);
346       return "New max thread count : "+n;
347    }
348 
349    public String hh_rc_set_sameHostCopy = STRING_NEVER+"|"+STRING_BESTEFFORT+"|"+STRING_NOTCHECKED ;
350    public String ac_rc_set_sameHostCopy_$_1( Args args ){
351        String type = args.argv(0) ;
352        PoolManagerParameter para = _partitionManager.getDefaultPartitionInfo().getParameter() ;
353        synchronized( para ){
354           if( type.equals(STRING_NEVER) ){
355              para._allowSameHostCopy = PoolManagerParameter.P2P_SAME_HOST_NEVER ;
356           }else if( type.equals(STRING_BESTEFFORT) ){
357              para._allowSameHostCopy = PoolManagerParameter.P2P_SAME_HOST_BEST_EFFORT ;
358           }else if( type.equals(STRING_NOTCHECKED) ){
359              para._allowSameHostCopy = PoolManagerParameter.P2P_SAME_HOST_NOT_CHECKED ;
360           }else{
361              throw new
362              IllegalArgumentException("Value not supported : "+type) ;
363           }
364        }
365        return "" ;
366     }
367 
368    public String hh_rc_set_sameHostRetry = STRING_NEVER+"|"+STRING_BESTEFFORT+"|"+STRING_NOTCHECKED ;
369    public String ac_rc_set_sameHostRetry_$_1( Args args ){
370 
371       String value = args.argv(0) ;
372       if( value.equals(STRING_NEVER) ){
373           _sameHostRetry = SAME_HOST_RETRY_NEVER ;
374       }else if( value.equals( STRING_BESTEFFORT ) ){
375           _sameHostRetry = SAME_HOST_RETRY_BESTEFFORT ;
376       }else if( value.equals( STRING_NOTCHECKED) ){
377           _sameHostRetry = SAME_HOST_RETRY_NOTCHECKED ;
378       }else
379          throw new
380          IllegalArgumentException("Value not supported for \"set sameHostRetry\" : "+value ) ;
381       return "" ;
382    }
383 
384    public String hh_rc_set_max_restore = "<maxNumberOfRestores>" ;
385    public String ac_rc_set_max_restore_$_1( Args args ){
386       if( args.argv(0).equals("unlimited") ){
387          _maxRestore = -1 ;
388          return "" ;
389       }
390       int n = Integer.parseInt(args.argv(0));
391       if( n < 0 )
392         throw new
393         IllegalArgumentException("must be >=0") ;
394       _maxRestore = n ;
395       return "" ;
396    }
397    public String hh_rc_select = "[<pnfsId> [<errorNumber> [<errorMessage>]] [-remove]]" ;
398    public String ac_rc_select_$_0_3( Args args ){
399 
400       synchronized( _selections ){
401          if( args.argc() == 0 ){
402             StringBuilder sb = new StringBuilder() ;
403             for( Map.Entry<PnfsId, CacheException > entry: _selections.entrySet() ){
404 
405                sb.append(entry.getKey().toString()).
406                   append("  ").
407                   append(entry.getValue().toString()).
408                   append("\n");
409             }
410             return sb.toString() ;
411          }
412          boolean remove = args.getOpt("remove") != null ;
413          PnfsId  pnfsId = new PnfsId(args.argv(0));
414 
415          if( remove ){
416             _selections.remove( pnfsId ) ;
417             return "" ;
418          }
419          int    errorNumber  = args.argc() > 1 ? Integer.parseInt(args.argv(1)) : 1 ;
420          String errorMessage = args.argc() > 2 ? args.argv(2) : ("Failed-"+errorNumber);
421 
422          _selections.put( pnfsId , new CacheException(errorNumber,errorMessage) ) ;
423       }
424       return "" ;
425    }
426    public String hh_rc_set_warning_path = " # where to send the warnings to" ;
427    public String ac_rc_set_warning_path_$_0_1( Args args ){
428       if( args.argc() > 0 ){
429          _warningPath = args.argv(0) ;
430       }
431       return _warningPath ;
432    }
433    public String fh_rc_set_poolpingtimer =
434    " rc set poolpingtimer <timer/seconds> "+
435    ""+
436    "    If set to a nonzero value, the restore handler will frequently"+
437    "    check the pool whether the request is still pending, failed"+
438    "    or has been successful" +
439    "";
440    public String hh_rc_set_poolpingtimer = "<checkPoolFileTimer/seconds>" ;
441    public String ac_rc_set_poolpingtimer_$_1(Args args ){
442       _checkFilePingTimer = 1000L * Long.parseLong(args.argv(0));
443       return "" ;
444    }
445    public String hh_rc_set_retry = "<retryTimer/seconds>" ;
446    public String ac_rc_set_retry_$_1(Args args ){
447       _retryTimer = 1000L * Long.parseLong(args.argv(0));
448       return "" ;
449    }
450    public String hh_rc_set_max_retries = "<maxNumberOfRetries>" ;
451    public String ac_rc_set_max_retries_$_1(Args args ){
452       _maxRetries = Integer.parseInt(args.argv(0));
453       return "" ;
454    }
455    public String hh_rc_suspend = "[on|off] -all" ;
456    public String ac_rc_suspend_$_0_1( Args args ){
457       boolean all = args.getOpt("all") != null ;
458       if( args.argc() == 0 ){
459          if(all)_suspendIncoming = true ;
460          _suspendStaging = true ;
461       }else{
462 
463          String mode = args.argv(0) ;
464          if( mode.equals("on") ){
465              if(all)_suspendIncoming = true ;
466              _suspendStaging = true ;
467          }else if( mode.equals("off") ){
468              if(all)_suspendIncoming = false ;
469              _suspendStaging = false ;
470          }else{
471              throw new
472              IllegalArgumentException("Usage : rc suspend [on|off]");
473          }
474 
475       }
476       return "" ;
477    }
478    public String hh_rc_onerror = "suspend|fail" ;
479    public String ac_rc_onerror_$_1(Args args ){
480       String onerror = args.argv(0) ;
481       if( ( ! onerror.equals("suspend") ) &&
482           ( ! onerror.equals("fail") )  )
483             throw new
484             IllegalArgumentException("Usage : rc onerror fail|suspend") ;
485 
486       _onError = onerror ;
487       return "onerror "+_onError ;
488    }
489    public String fh_rc_retry =
490       "NAME\n"+
491       "           rc retry\n\n"+
492       "SYNOPSIS\n"+
493       "           I)  rc retry <pnfsId> [OPTIONS]\n"+
494       "           II) rc retry * -force-all [OPTIONS]\n\n"+
495       "DESCRIPTION\n"+
496       "           Forces a 'restore request' to be retried.\n"+
497       "           While  using syntax I , a single request  is retried,\n"+
498       "           syntax II retries all requests which reported an error.\n"+
499       "           If the '-force-all' options is given, all requests are\n"+
500       "           retried, regardless of their current status.\n\n"+
501       "           -update-si\n"+
502       "                   fetch the storage info again before performing\n"+
503       "                   the retry. \n"+
504       "\n" ;
505    public String hh_rc_retry = "<pnfsId>|* -force-all -update-si" ;
506    public String ac_rc_retry_$_1( Args args ) throws CacheException {
507       StringBuffer sb = new StringBuffer() ;
508       boolean forceAll = args.getOpt("force-all") != null ;
509       boolean updateSi = args.getOpt("update-si") != null ;
510       if( args.argv(0).equals("*") ){
511          List<PoolRequestHandler> all;
512          //
513          // Remember : we are not allowed to call 'retry' as long
514          // as we  are holding the _handlerHash lock.
515          //
516          synchronized( _handlerHash ){
517             all = new ArrayList<PoolRequestHandler>( _handlerHash.values() ) ;
518          }
519          for (PoolRequestHandler rph : all) {
520             try{
521                if( forceAll || ( rph._currentRc != 0 ) )rph.retry(updateSi) ;
522             }catch(Exception ee){
523                sb.append(ee.getMessage()).append("\n");
524             }
525          }
526       }else{
527          PoolRequestHandler rph;
528          synchronized( _handlerHash ){
529             rph = _handlerHash.get(args.argv(0));
530             if( rph == null )
531                throw new
532                IllegalArgumentException("Not found : "+args.argv(0) ) ;
533          }
534          rph.retry(updateSi) ;
535       }
536       return sb.toString() ;
537    }
538    public String hh_rc_failed = "<pnfsId> [<errorNumber> [<errorMessage>]]" ;
539    public String ac_rc_failed_$_1_3( Args args ) throws CacheException {
540       int    errorNumber = args.argc() > 1 ? Integer.parseInt(args.argv(1)) : 1;
541       String errorString = args.argc() > 2 ? args.argv(2) : "Operator Intervention" ;
542 
543       PoolRequestHandler rph = null ;
544 
545       synchronized( _handlerHash ){
546          rph = _handlerHash.get(args.argv(0));
547          if( rph == null )
548             throw new
549             IllegalArgumentException("Not found : "+args.argv(0) ) ;
550       }
551       rph.failed(errorNumber,errorString) ;
552       return "" ;
553    }
554    public String hh_rc_destroy = "<pnfsId> # !!!  use with care" ;
555    public String ac_rc_destroy_$_1( Args args ) throws CacheException {
556 
557       PoolRequestHandler rph = null ;
558 
559       synchronized( _handlerHash ){
560          rph = _handlerHash.get(args.argv(0));
561          if( rph == null )
562             throw new
563             IllegalArgumentException("Not found : "+args.argv(0) ) ;
564 
565          _handlerHash.remove( args.argv(0) ) ;
566       }
567       return "" ;
568    }
569    public String hh_rc_ls = " [<regularExpression>] [-w] # lists pending requests" ;
570    public String ac_rc_ls_$_0_1( Args args ){
571       StringBuilder sb  = new StringBuilder() ;
572 
573       Pattern  pattern = args.argc() > 0 ? Pattern.compile(args.argv(0)) : null ;
574 
575       if( args.getOpt("w") == null ){
576          List<PoolRequestHandler>    allRequestHandlers = null ;
577          synchronized( _handlerHash ){
578              allRequestHandlers = new ArrayList<PoolRequestHandler>( _handlerHash.values() ) ;
579          }
580 
581          for( PoolRequestHandler h : allRequestHandlers ){
582 
583              if( h == null )continue ;
584              String line = h.toString() ;
585              if( ( pattern == null ) || pattern.matcher(line).matches() )
586                 sb.append(line).append("\n");
587          }
588       }else{
589 
590           Map<UOID, PoolRequestHandler>  allPendingRequestHandlers   = new HashMap<UOID, PoolRequestHandler>() ;
591          synchronized(_messageHash){
592              allPendingRequestHandlers.putAll( _messageHash ) ;
593          }
594 
595          for (Map.Entry<UOID, PoolRequestHandler> requestHandler : allPendingRequestHandlers.entrySet()) {
596 
597                UOID uoid = requestHandler.getKey();
598                PoolRequestHandler h = requestHandler.getValue();
599 
600                if (h == null)
601                    continue;
602                String line = uoid.toString() + " " + h.toString();
603                if ((pattern == null) || pattern.matcher(line).matches())
604                    sb.append(line).append("\n");
605 
606            }
607        }
608       return sb.toString();
609    }
610    public String hh_xrc_ls = " # lists pending requests (binary)" ;
611    public Object ac_xrc_ls( Args args ){
612 
613       List<PoolRequestHandler> all  = null ;
614       synchronized( _handlerHash ){
615          all = new ArrayList<PoolRequestHandler>( _handlerHash.values() ) ;
616       }
617 
618       List<RestoreHandlerInfo>          list = new ArrayList<RestoreHandlerInfo>() ;
619 
620       for( PoolRequestHandler h: all  ){
621          if( h  == null )continue ;
622          list.add( h.getRestoreHandlerInfo() ) ;
623       }
624       return list.toArray( new RestoreHandlerInfo[list.size()] ) ;
625    }
626 
627    public void messageArrived(CellMessage envelope,
628                               PoolMgrSelectReadPoolMsg request)
629        throws PatternSyntaxException, IOException
630    {
631        boolean enforceP2P = false ;
632 
633        PnfsId       pnfsId       = request.getPnfsId() ;
634        ProtocolInfo protocolInfo = request.getProtocolInfo() ;
635        int allowedStates = request.getAllowedStates();
636 
637        String  hostName    =
638               protocolInfo instanceof IpProtocolInfo ?
639               ((IpProtocolInfo)protocolInfo).getHosts()[0] :
640               "NoSuchHost" ;
641 
642        String netName      = _selectionUnit.getNetIdentifier(hostName);
643        String protocolNameFromInfo = protocolInfo.getProtocol()+"/"+protocolInfo.getMajorVersion() ;
644 
645        String protocolName = _selectionUnit.getProtocolUnit( protocolNameFromInfo ) ;
646        if( protocolName == null ) {
647          throw new
648            IllegalArgumentException("Protocol not found : "+protocolNameFromInfo);
649        }
650 
651        if( request instanceof PoolMgrReplicateFileMsg ){
652           if( request.isReply() ){
653               _log.warn("Unexpected PoolMgrReplicateFileMsg arrived (is a reply)");
654               return ;
655           }else{
656               enforceP2P = true ;
657           }
658        }
659        String canonicalName = pnfsId +"@"+netName+"-"+protocolName+(enforceP2P?"-p2p":"")  ;
660        //
661        //
662        PoolRequestHandler handler = null ;
663        _log.info( "Adding request for : "+canonicalName ) ;
664        synchronized( _handlerHash ){
665           //
666           handler = _handlerHash.get(canonicalName);
667           if( handler == null ){
668              _handlerHash.put(
669                     canonicalName ,
670                     handler = new PoolRequestHandler( pnfsId , canonicalName, allowedStates ) ) ;
671           }
672           handler.addRequest(envelope) ;
673        }
674    }
675 
676 
677    // replicate a file
678    public String hh_replicate = " <pnfsid> <client IP>";
679    public String ac_replicate_$_2(Args args) {
680 
681        String commandReply = "Replication initiated...";
682 
683        try {
684 
685            PnfsId pnfsId = new PnfsId(args.argv(0));
686            PnfsGetStorageInfoMessage getStorageInfo = new PnfsGetStorageInfoMessage(
687                    pnfsId);
688 
689            CellMessage request = new CellMessage(new CellPath("PnfsManager"),
690                    getStorageInfo);
691 
692            request = sendAndWait(request, 30000);
693            if (request == null) {
694                throw new Exception(
695                        "Timeout : PnfsManager request for storageInfo of "
696                                + pnfsId);
697            }
698 
699            getStorageInfo = (PnfsGetStorageInfoMessage) request
700                    .getMessageObject();
701            StorageInfo storageInfo = getStorageInfo.getStorageInfo();
702 
703            // TODO: call p2p direct
704            // send message to yourself
705            PoolMgrReplicateFileMsg req = new PoolMgrReplicateFileMsg(pnfsId,
706                    storageInfo, new DCapProtocolInfo("DCap", 3, 0,
707                            args.argv(1), 2222), storageInfo.getFileSize());
708 
709            sendMessage( new CellMessage(new CellPath("PoolManager"), req) );
710 
711        } catch (Exception ee) {
712            commandReply = "P2P failed : " + ee.getMessage();
713        }
714 
715        return commandReply;
716    }
717 
718    private static final String [] ST_STRINGS = {
719 
720        "Init" , "Done" , "Pool2Pool" , "Staging" , "Waiting" ,
721        "WaitingForStaging" , "WaitingForP2P" , "Suspended"
722    } ;
723 
724    ///////////////////////////////////////////////////////////////
725    //
726    // the read io request handler
727    //
728    private class PoolRequestHandler  {
729 
730        protected PnfsId       _pnfsId;
731        protected final List<CellMessage>    _messages = new ArrayList<CellMessage>() ;
732        protected int          _retryCounter = -1 ;
733        private final CDC _cdc = new CDC();
734 
735 
736        private   UOID         _waitingFor    = null ;
737        private   long         _waitUntil     = 0 ;
738 
739        private   String       _state         = "[<idle>]";
740        private   int          _mode          = ST_INIT ;
741        private   final int    _allowedStates;
742        private   boolean      _stagingDenied = false;
743        private   int          _currentRc     = 0 ;
744        private   String       _currentRm     = "" ;
745 
746        private   PoolCostCheckable _bestPool = null ;
747        private   PoolCheckable     _poolCandidateInfo    = null ;
748        private   PoolCheckable     _p2pPoolCandidateInfo = null ;
749        private   PoolCheckable     _p2pSourcePoolInfo    = null ;
750 
751        private   final long   _started       = System.currentTimeMillis() ;
752        private   String       _name          = null ;
753 
754        private   StorageInfo  _storageInfo   = null ;
755        private   ProtocolInfo _protocolInfo  = null ;
756 
757        private   boolean _enforceP2P            = false ;
758        private   int     _destinationFileStatus = Pool2PoolTransferMsg.UNDETERMINED ;
759 
760        private CheckFilePingHandler  _pingHandler = new CheckFilePingHandler(_checkFilePingTimer) ;
761 
762        private PoolMonitorV5.PnfsFileLocation _pnfsFileLocation  = null ;
763        private PoolManagerParameter           _parameter         = _partitionManager.getParameterCopyOf() ;
764 
765        /**
766         * Indicates the next time a TTL of a request message will be
767         * exceeded.
768         */
769        private long _nextTtlTimeout = Long.MAX_VALUE;
770 
771        private class CheckFilePingHandler {
772            private long _timeInterval = 0;
773            private long _timer = 0;
774            private String _candidate;
775            private PingState _state = PingState.STOPPED;
776            private String _query;
777 
778            private CheckFilePingHandler(long timerInterval)
779            {
780                _timeInterval = timerInterval;
781            }
782 
783            private void startP2P(String candidate)
784            {
785                if (_timeInterval <= 0L || candidate == null)
786                    return;
787                _candidate = candidate;
788                _timer = _timeInterval + System.currentTimeMillis();
789                _state = PingState.WAITING;
790                _query = "pp ls";
791            }
792 
793            private void startStage(String candidate)
794            {
795                if (_timeInterval <= 0L || candidate == null)
796                    return;
797                _candidate = candidate;
798                _timer = _timeInterval + System.currentTimeMillis();
799                _state = PingState.WAITING;
800                _query = "rh ls";
801            }
802 
803            private void stop()
804            {
805                _candidate = null;
806                _state = PingState.STOPPED;
807                synchronized (_messageHash) {
808                    if (_waitingFor != null)
809                        _messageHash.remove(_waitingFor);
810                }
811            }
812 
813            private void alive()
814            {
815                if ((_candidate == null) || (_timer == 0L))
816                    return;
817 
818                long now = System.currentTimeMillis();
819                if (now > _timer) {
820                    switch (_state) {
821                    case WAITING:
822                        _log.info("CheckFilePingHandler : sending " + _query + " to " + _candidate);
823                        sendQuery();
824                        _state = PingState.QUERYING;
825                        break;
826 
827                    case QUERYING:
828                        /* No reply since last query.
829                         */
830                        _log.info("CheckFilePingHandler : request died");
831                        stop();
832                        errorHandler();
833                        break;
834 
835                    case STOPPED:
836                        return;
837                    }
838                    _timer = _timeInterval + now;
839                }
840            }
841 
842            private void gotReply(Object object)
843            {
844                if (_state == PingState.QUERYING && object instanceof String) {
845                    String s = (String) object;
846                    if (s.contains(_pnfsId.toString())) {
847                        _log.info("CheckFilePingHandler : request is alive");
848                        _state = PingState.WAITING;
849                    }
850                }
851            }
852 
853            private void sendQuery()
854            {
855                CellMessage envelope =
856                    new CellMessage(new CellPath(_candidate), _query);
857                synchronized (_messageHash) {
858                    try {
859                        sendMessage(envelope);
860                        _waitingFor = envelope.getUOID();
861                        _messageHash.put(_waitingFor, PoolRequestHandler.this);
862                    } catch (Exception e) {
863                        _log.warn("Can't send pool ping to " + _candidate + " :", e);
864                    }
865                }
866            }
867        }
868 
869        public PoolRequestHandler( PnfsId pnfsId , String canonicalName, int allowedStates ){
870 
871            _pnfsId  = pnfsId ;
872            _name    = canonicalName ;
873            _allowedStates = allowedStates ;
874        }
875        //...........................................................
876        //
877        // the following methods can be called from outside
878        // at any time.
879        //...........................................................
880        //
881        // add request is assumed to be synchronized by a higher level.
882        //
883        public void addRequest( CellMessage message ){
884 
885           _messages.add(message);
886           _stagingDenied = false;
887 
888           long ttl = message.getTtl();
889           if (ttl < Long.MAX_VALUE) {
890               long timeout = System.currentTimeMillis() + ttl;
891               _nextTtlTimeout = Math.min(_nextTtlTimeout, timeout);
892           }
893 
894           if (_pnfsFileLocation != null)return ;
895 
896           PoolMgrSelectReadPoolMsg request =
897                (PoolMgrSelectReadPoolMsg)message.getMessageObject() ;
898 
899           _storageInfo  = request.getStorageInfo() ;
900           _protocolInfo = request.getProtocolInfo() ;
901 
902           if( request instanceof PoolMgrReplicateFileMsg ){
903              _enforceP2P            = true ;
904              _destinationFileStatus = ((PoolMgrReplicateFileMsg)request).getDestinationFileStatus() ;
905           }
906 
907           _pnfsFileLocation =
908                _poolMonitor.getPnfsFileLocation( _pnfsId ,
909                                                  _storageInfo ,
910                                                  _protocolInfo, request.getLinkGroup()) ;
911 
912           //
913           //
914           //
915           add(null) ;
916        }
917        public String getPoolCandidate() {
918            return _poolCandidateInfo == null ? (_p2pPoolCandidateInfo == null ? POOL_UNKNOWN_STRING
919                    : _p2pPoolCandidateInfo.getPoolName())
920                    : _poolCandidateInfo.getPoolName();
921        }
922 
923        private String getPoolCandidateState() {
924            return _poolCandidateInfo != null ? _poolCandidateInfo
925                    .getPoolName()
926                    : _p2pPoolCandidateInfo != null ? ((_p2pSourcePoolInfo == null ? POOL_UNKNOWN_STRING
927                            : _p2pSourcePoolInfo.getPoolName())
928                            + "->" + _p2pPoolCandidateInfo.getPoolName())
929                            : POOL_UNKNOWN_STRING;
930        }
931        public RestoreHandlerInfo getRestoreHandlerInfo(){
932           return new RestoreHandlerInfo(
933                  _name,
934                  _messages.size(),
935                  _retryCounter ,
936                  _started ,
937                  getPoolCandidateState() ,
938                  _state ,
939                  _currentRc ,
940                  _currentRm ) ;
941        }
942        @Override
943        public String toString(){
944           return _name+" m="+_messages.size()+" r="+
945                  _retryCounter+" ["+getPoolCandidateState()+"] ["+_state+"] "+
946                  "{"+_currentRc+","+_currentRm+"}" ;
947        }
948        //
949        //
950        private void mailForYou( Object message ){
951           //
952           // !!!!!!!!! remove this
953           //
954           //if( message instanceof PoolFetchFileMessage ){
955           //    _log.info("mailForYou !!!!! reply ignored ") ;
956           //    return ;
957           //}
958           add( message ) ;
959        }
960        private void alive(){
961 
962           Object [] command = new Object[1] ;
963           command[0] = "alive" ;
964 
965           add( command ) ;
966 
967        }
968        private void retry(boolean updateSi) throws CacheException {
969           Object [] command = new Object[2] ;
970           command[0] = "retry" ;
971           command[1] = updateSi ? "update" : "" ;
972           _pnfsFileLocation.clear() ;
973           add( command ) ;
974        }
975        private void failed( int errorNumber , String errorMessage )
976                throws CacheException {
977 
978           if( errorNumber > 0 ){
979              Object [] command = new Object[3] ;
980              command[0] = "failed" ;
981              command[1] = Integer.valueOf(errorNumber) ;
982              command[2] = errorMessage == null ?
983                           ( "Error-"+_currentRc ) :
984                           errorMessage ;
985 
986 
987              add( command ) ;
988              return ;
989           }
990           throw new
991           IllegalArgumentException("Error number must be > 0");
992 
993        }
994 
995        //...................................................................
996        //
997        // from now on, methods can only be called from within
998        // the state mechanism. (which is thread save because
999        // we only allow to run a single thread at a time.
1000        //
1001        private void waitFor( long millis ){
1002           _waitUntil = System.currentTimeMillis() + millis ;
1003        }
1004        private void clearSteering(){
1005           synchronized( _messageHash ){
1006 
1007              if( _waitingFor != null )_messageHash.remove( _waitingFor ) ;
1008           }
1009           _waitingFor = null ;
1010           _waitUntil  = 0L ;
1011 
1012           //
1013           // and the ping handler
1014           //
1015           _pingHandler.stop() ;
1016 
1017 
1018        }
1019        private void setError( int errorCode , String errorMessage ){
1020           _currentRc = errorCode ;
1021           _currentRm = errorMessage ;
1022        }
1023        private boolean sendFetchRequest( String poolName , StorageInfo storageInfo )
1024            throws NoRouteToCellException
1025        {
1026 
1027            CellMessage cellMessage = new CellMessage(
1028                                new CellPath( poolName ),
1029                                new PoolFetchFileMessage(
1030                                        poolName,
1031                                        storageInfo,
1032                                        _pnfsId          )
1033                                );
1034            synchronized( _messageHash ){
1035                if( ( _maxRestore >=0 ) &&
1036                    ( _messageHash.size() >= _maxRestore ) )return false ;
1037                sendMessage( cellMessage );
1038                _poolMonitor.messageToCostModule( cellMessage ) ;
1039                _messageHash.put( _waitingFor = cellMessage.getUOID() , this ) ;
1040                _state = "Staging "+_formatter.format(new Date()) ;
1041            }
1042            return true ;
1043        }
1044        private void sendPool2PoolRequest( String sourcePool , String destPool )
1045            throws NoRouteToCellException
1046        {
1047 
1048            Pool2PoolTransferMsg pool2pool =
1049                  new Pool2PoolTransferMsg(sourcePool,destPool,_pnfsId,_storageInfo) ;
1050            pool2pool.setDestinationFileStatus( _destinationFileStatus ) ;
1051            _log.info("Sending pool2pool request : "+pool2pool);
1052            CellMessage cellMessage =
1053                new CellMessage(
1054                                  new CellPath( destPool ),
1055                                  pool2pool
1056                                );
1057 
1058            synchronized( _messageHash ){
1059                sendMessage( cellMessage );
1060                _poolMonitor.messageToCostModule( cellMessage ) ;
1061                if( _waitingFor != null )_messageHash.remove( _waitingFor ) ;
1062                _messageHash.put( _waitingFor = cellMessage.getUOID() , this ) ;
1063                _state = "[P2P "+_formatter.format(new Date())+"]" ;
1064            }
1065        }
1066 
1067        /**
1068         * Removes request messages whos time to live has been
1069         * exceeded. Messages are dropped; no reply is sent to the
1070         * requestor, as we assume it is no longer waiting for the
1071         * reply.
1072         */
1073        private void expireRequests()
1074        {
1075            /* Access to _messages is controlled by a lock on
1076             * _handlerHash.
1077             */
1078            synchronized (_handlerHash) {
1079                long now = System.currentTimeMillis();
1080                _nextTtlTimeout = Long.MAX_VALUE;
1081 
1082                Iterator<CellMessage> i = _messages.iterator();
1083                while (i.hasNext()) {
1084                    CellMessage message = i.next();
1085                    long ttl = message.getTtl();
1086                    if (message.getLocalAge() >= ttl) {
1087                        _log.info("Discarding request from "
1088                                  + message.getSourceAddress().getCellName()
1089                                  + " because its time to live has been exceeded.");
1090                        i.remove();
1091                    } else if (ttl < Long.MAX_VALUE) {
1092                        _nextTtlTimeout = Math.min(_nextTtlTimeout, now + ttl);
1093                    }
1094                }
1095            }
1096        }
1097 
1098        private boolean answerRequest(int count) {
1099            //
1100            // if there is an error we won't continue ;
1101            //
1102            if (_currentRc != 0)
1103                count = 100000;
1104            //
1105 
1106            Iterator<CellMessage> messages = _messages.iterator();
1107            for (int i = 0; (i < count) && messages.hasNext(); i++) {
1108                CellMessage m =  messages.next();
1109                PoolMgrSelectPoolMsg rpm = (PoolMgrSelectPoolMsg) m.getMessageObject();
1110                if (_currentRc == 0) {
1111                    rpm.setPoolName(_poolCandidateInfo.getPoolName());
1112                    rpm.setSucceeded();
1113                } else {
1114                    rpm.setFailed(_currentRc, _currentRm);
1115                }
1116                try {
1117                    m.revertDirection();
1118                    sendMessage(m);
1119                    _poolMonitor.messageToCostModule(m);
1120                } catch (Exception e) {
1121                    _log.warn("Exception requestSucceeded : " + e);
1122                    _log.warn(e.toString());
1123                }
1124                messages.remove();
1125            }
1126            return messages.hasNext();
1127        }
1128        //
1129        // and the heart ...
1130        //
1131        private static final int RT_DONE       = 0 ;
1132        private static final int RT_OK         = 1 ;
1133        private static final int RT_FOUND      = 2 ;
1134        private static final int RT_NOT_FOUND  = 3 ;
1135        private static final int RT_ERROR      = 4 ;
1136        private static final int RT_OUT_OF_RESOURCES = 5 ;
1137        private static final int RT_CONTINUE         = 6 ;
1138        private static final int RT_COST_EXCEEDED    = 7 ;
1139        private static final int RT_NOT_PERMITTED    = 8 ;
1140        private static final int RT_S_COST_EXCEEDED  = 9 ;
1141        private static final int RT_DELAY  = 10 ;
1142 
1143        private static final int ST_INIT        = 1 ;
1144        private static final int ST_DONE        = 2 ;
1145        private static final int ST_POOL_2_POOL = 4 ;
1146        private static final int ST_STAGE       = 8 ;
1147        private static final int ST_WAITING     = 16 ;
1148        private static final int ST_WAITING_FOR_STAGING     = 32 ;
1149        private static final int ST_WAITING_FOR_POOL_2_POOL = 64 ;
1150        private static final int ST_SUSPENDED   = 128 ;
1151 
1152        private static final int CONTINUE        = 0 ;
1153        private static final int WAIT            = 1 ;
1154 
1155        private LinkedList _fifo              = new LinkedList() ;
1156        private boolean    _stateEngineActive = false ;
1157        private boolean    _forceContinue     = false ;
1158        private boolean    _overwriteCost     = false ;
1159 
1160        public class RunEngine implements ExtendedRunnable {
1161           public void run(){
1162               _cdc.apply();
1163              try{
1164                 stateLoop() ;
1165              }finally{
1166                  _cdc.clear();
1167                 synchronized( _fifo ){
1168                   _stateEngineActive = false ;
1169                 }
1170              }
1171           }
1172           public void runFailed(){
1173              synchronized( _fifo ){
1174                   _stateEngineActive = false ;
1175              }
1176           }
1177           @Override
1178        public String toString() {
1179              return PoolRequestHandler.this.toString();
1180           }
1181        }
1182        private void add( Object obj ){
1183 
1184           synchronized( _fifo ){
1185               _log.info( "Adding Object : "+obj ) ;
1186               _fifo.addFirst(obj) ;
1187               if( _stateEngineActive )return ;
1188               _log.info( "Starting Engine" ) ;
1189               _stateEngineActive = true ;
1190 
1191               _threadPool.invokeLater( new RunEngine() , "Read-"+_pnfsId ) ;
1192           }
1193        }
1194        private void stateLoop(){
1195 
1196           Object inputObject ;
1197           _log.info( "ACTIVATING STATE ENGINE "+_pnfsId+" "+(System.currentTimeMillis()-_started)) ;
1198 
1199           while( ! Thread.interrupted() ){
1200 
1201              if( ! _forceContinue ){
1202 
1203                 synchronized( _fifo ){
1204                    if( _fifo.size() == 0 ){
1205                       _stateEngineActive = false ;
1206                       return ;
1207                    }
1208                    inputObject = _fifo.removeLast() ;
1209                 }
1210              }else{
1211                 inputObject = null ;
1212              }
1213              _forceContinue = false ;
1214              try{
1215                 _log.info("StageEngine called in mode "+
1216                   modeToString(_mode)+
1217                     " with object "+
1218                        (  inputObject == null ?
1219                             "(NULL)":
1220                            (  inputObject instanceof Object [] ?
1221                                 ((Object[])inputObject)[0].toString() :
1222                                 inputObject.getClass().getName()
1223                            )
1224                        )
1225                    );
1226 
1227                 stateEngine( inputObject ) ;
1228 
1229                 _log.info("StageEngine left with   : " + modeToString(_mode)+
1230                            "  ("+ ( _forceContinue?"Continue":"Wait")+")");
1231 
1232              }catch(Exception ee ){
1233                 _log.warn("Unexpected Exception in state loop for "+_pnfsId+" : "+ee) ;
1234                 _log.warn(ee.toString());
1235              }
1236           }
1237        }
1238 
1239        private boolean canStage()
1240        {
1241            /* If the result is cached or the door disabled staging,
1242             * then we don't check the permissions.
1243             */
1244            if (_stagingDenied || (_allowedStates & ST_STAGE) == 0) {
1245                return false;
1246            }
1247 
1248            /* Staging is allowed if just one of the requests has
1249             * permission to stage.
1250             */
1251            for (CellMessage envelope: _messages) {
1252                try {
1253                    PoolMgrSelectReadPoolMsg msg =
1254                        (PoolMgrSelectReadPoolMsg) envelope.getMessageObject();
1255                    if (_stagePolicyDecisionPoint.canPerformStaging(msg.getSubject(), msg.getStorageInfo())) {
1256                        return true;
1257                    }
1258                } catch (IOException e) {
1259                    _log.error("Failed to verify stage permissions: " + e.getMessage());
1260                } catch (PatternSyntaxException e) {
1261                    _log.error("Failed to verify stage permissions: " + e.getMessage());
1262                }
1263            }
1264 
1265            /* None of the requests had the necessary credentials to
1266             * stage. This result is cached.
1267             */
1268            _stagingDenied = true;
1269            return false;
1270        }
1271 
1272        private void nextStep( int mode , int shouldContinue ){
1273            if (_currentRc == CacheException.NOT_IN_TRASH ||
1274                _currentRc == CacheException.FILE_NOT_FOUND) {
1275                _mode = ST_DONE;
1276                _forceContinue = true;
1277                _state = "Failed";
1278                sendInfoMessage(_pnfsId , _storageInfo ,
1279                                _currentRc , "Failed "+_currentRm);
1280            } else {
1281                if (mode == ST_STAGE && !canStage()) {
1282                    _mode = ST_DONE;
1283                    _forceContinue = true;
1284                    _state = "Failed";
1285                    _log.debug("Subject is not authorized to stage");
1286                    _currentRc = CacheException.FILE_NOT_ONLINE;
1287                    _currentRm = "File not online. Staging not allowed.";
1288                    sendInfoMessage(_pnfsId , _storageInfo ,
1289                                    _currentRc , "Permission denied." + _currentRm);
1290                } else if ((mode & _allowedStates) == 0) {
1291                    _mode = ST_DONE;
1292                    _forceContinue = true;
1293                    _state = "Failed";
1294                    _log.debug("No permission to perform " +
1295                               modeToString(mode));
1296                    _currentRc = CacheException.PERMISSION_DENIED;
1297                    _currentRm = "Permission denied.";
1298                    sendInfoMessage(_pnfsId, _storageInfo, _currentRc,
1299                                    "Permission denied for " +
1300                                    modeToString(mode));
1301                } else if ((mode & _allowedStates) == mode) {
1302                    _mode = mode ;
1303                    _forceContinue = shouldContinue == CONTINUE ;
1304                    if( _mode != ST_DONE ){
1305                        _currentRc = 0 ;
1306                        _currentRm = "" ;
1307                    }
1308                } else {
1309                    _log.error("Value of (mode & _allowedStates) is neither equal to '0' nor to 'mode'." +
1310                               " Current value of mode = " + Integer.toString(mode));
1311                    throw new IllegalStateException("Illegal value of 'mode'.");
1312                }
1313            }
1314        }
1315        //
1316        //  askIfAvailable :
1317        //
1318        //      default : (bestPool=set,overwriteCost=false) otherwise mentioned
1319        //
1320        //      RT_FOUND :
1321        //
1322        //         Because : file is on pool which is allowed and has reasonable cost.
1323        //
1324        //         -> DONE
1325        //
1326        //      RT_NOT_FOUND :
1327        //
1328        //         Because : file is not in cache at all
1329        //
1330        //         (bestPool=0)
1331        //
1332        //         -> _hasHsmBackend : STAGE
1333        //              else         : Suspended (1010, pool unavailable)
1334        //
1335        //      RT_NOT_PERMITTED :
1336        //
1337        //         Because : file not in an permitted pool but somewhere else
1338        //
1339        //         (bestPool=0,overwriteCost=true)
1340        //
1341        //         -> _p2pAllowed ||
1342        //            ! _hasHsmBackend  : P2P
1343        //            else              : STAGE
1344        //
1345        //      RT_COST_EXCEEDED :
1346        //
1347        //         Because : file is in permitted pools but cost is too high.
1348        //
1349        //         -> _p2pOnCost          : P2P
1350        //            _hasHsmBackend &&
1351        //            _stageOnCost        : STAGE
1352        //            else                : 127 , "Cost exceeded (st,p2p not allowed)"
1353        //
1354        //      RT_ERROR :
1355        //
1356        //         Because : - No entry in configuration Permission Matrix
1357        //                   - Code Exception
1358        //
1359        //         (bestPool=0)
1360        //
1361        //         -> STAGE
1362        //
1363        //
1364        //
1365        //  askForPoolToPool( overwriteCost ) :
1366        //
1367        //      RT_FOUND :
1368        //
1369        //         Because : source and destination pool found and cost ok.
1370        //
1371        //         -> DONE
1372        //
1373        //      RT_NOT_PERMITTED :
1374        //
1375        //         Because : - already too many copies (_maxPnfsFileCopies)
1376        //                   - file already everywhere (no destination found)
1377        //                   - SAME_HOST_NEVER : but no valid combination found
1378        //
1379        //         -> DONE 'using bestPool'
1380        //
1381        //      RT_S_COST_EXCEEDED (only if ! overwriteCost ) :
1382        //
1383        //         Because : best source pool exceeds 'alert' cost.
1384        //
1385        //         -> _hasHsmBackend &&
1386        //            _stageOnCost    : STAGE
1387        //            bestPool == 0   : 194,"File not present in any reasonable pool"
1388        //            else            : DONE 'using bestPool'
1389        //
1390        //      RT_COST_EXCEEDED (only if ! overwriteCost )  :
1391        //
1392        //         Because : file is in permitted pools but cost of
1393        //                   best destination pool exceeds cost of best
1394        //                   source pool (resp. slope * source).
1395        //
1396        //         -> _bestPool == 0 : 192,"File not present in any reasonable pool"
1397        //            else           : DONE 'using bestPool'
1398        //
1399        //      RT_ERROR :
1400        //
1401        //         Because : - no source pool (code problem)
1402        //                   - Code Exception
1403        //
1404        //         -> 132,"PANIC : Tried to do p2p, but source was empty"
1405        //                or exception text.
1406        //
1407        //  askForStaging :
1408        //
1409        //      RT_FOUND :
1410        //
1411        //         Because : destination pool found and cost ok.
1412        //
1413        //         -> DONE
1414        //
1415        //      RT_NOT_FOUND :
1416        //
1417        //         -> 149 , "No pool candidates available or configured for 'staging'"
1418        //         -> 150 , "No cheap candidates available for 'staging'"
1419        //
1420        //      RT_ERROR :
1421        //
1422        //         Because : - Code Exception
1423        //
1424        private void stateEngine( Object inputObject ) {
1425           int rc = -1;
1426           switch( _mode ){
1427 
1428              case ST_INIT :
1429                 _log.debug( "stateEngine: case ST_INIT");
1430                 synchronized( _selections ){
1431 
1432                    CacheException ce = _selections.get(_pnfsId) ;
1433                    if( ce != null ){
1434                       setError(ce.getRc(),ce.getMessage());
1435                       nextStep( ST_DONE , CONTINUE ) ;
1436                       return ;
1437                    }
1438 
1439                 }
1440 
1441 
1442                 if( inputObject == null ){
1443 
1444 
1445                    if( _suspendIncoming ){
1446                          _state = "Suspended (forced) "+_formatter.format(new Date()) ;
1447                          _currentRc = 1005 ;
1448                          _currentRm = "Suspend enforced";
1449                          _log.debug( " stateEngine: SUSPENDED/WAIT ");
1450                         nextStep( ST_SUSPENDED , WAIT ) ;
1451                         sendInfoMessage( _pnfsId , _storageInfo ,
1452                                          _currentRc , "Suspended (forced) "+_currentRm );
1453                         return ;
1454                    }
1455                    _retryCounter ++ ;
1456                    _pnfsFileLocation.clear() ;
1457                    //
1458                    //
1459                    if( _enforceP2P ){
1460                        setError(0,"");
1461                        nextStep(ST_POOL_2_POOL , CONTINUE) ;
1462                        return ;
1463                    }
1464 
1465                    if( ( rc = askIfAvailable() ) == RT_FOUND ){
1466 
1467                       setError(0,"");
1468                       nextStep( ST_DONE , CONTINUE ) ;
1469                       _log.info("AskIfAvailable found the object");
1470                       if (_sendHitInfo ) sendHitMsg(  _pnfsId, (_bestPool!=null)?_bestPool.getPoolName():"<UNKNOWN>", true );   //VP
1471 
1472                    }else if( rc == RT_NOT_FOUND ){
1473                       //
1474                       //
1475                        _log.debug(" stateEngine: RT_NOT_FOUND ");
1476                       if( _parameter._hasHsmBackend ){
1477                           _log.debug(" stateEngine: parameter has HSM backend ");
1478                          nextStep( ST_STAGE , CONTINUE ) ;
1479                       }else{
1480                          _log.debug(" stateEngine: parameter has NO HSM backend ");
1481                          _state = "Suspended (pool unavailable) "+_formatter.format(new Date()) ;
1482                          _currentRc = 1010 ;
1483                          _currentRm = "Suspend";
1484                          _poolCandidateInfo = null ;
1485                          nextStep( ST_SUSPENDED , WAIT ) ;
1486                       }
1487                       if (_sendHitInfo && _poolCandidateInfo == null) {
1488                           sendHitMsg(  _pnfsId, (_bestPool!=null)?_bestPool.getPoolName():"<UNKNOWN>", false );   //VP
1489                       }
1490                       //
1491                    }else if( rc == RT_NOT_PERMITTED ){
1492                       //
1493                       //  if we can't read the file because 'read is prohibited'
1494                       //  we at least must give dCache the chance to copy it
1495                       //  to another pool (not regarding the cost).
1496                       //
1497                       _overwriteCost = true ;
1498                       //
1499                       //  if we don't have an hsm we overwrite the p2pAllowed
1500                       //
1501                       nextStep( _parameter._p2pAllowed || ! _parameter._hasHsmBackend
1502                                ? ST_POOL_2_POOL : ST_STAGE , CONTINUE ) ;
1503 
1504                    }else if( rc == RT_COST_EXCEEDED ){
1505 
1506                       if( _parameter._p2pOnCost ){
1507 
1508                           nextStep( ST_POOL_2_POOL , CONTINUE ) ;
1509 
1510                       }else if( _parameter._hasHsmBackend &&  _parameter._stageOnCost ){
1511 
1512                           nextStep( ST_STAGE , CONTINUE ) ;
1513 
1514                       }else{
1515 
1516                           setError( 127 , "Cost exceeded (st,p2p not allowed)" ) ;
1517                           nextStep( ST_DONE , CONTINUE ) ;
1518 
1519                       }
1520                    }else if( rc == RT_ERROR ){
1521                       _log.debug( " stateEngine: RT_ERROR");
1522                       nextStep( ST_STAGE , CONTINUE ) ;
1523                       _log.info("AskIfAvailable returned an error, will continue with Staging");
1524 
1525                    }
1526 
1527                 }else if( inputObject instanceof Object [] ){
1528 
1529                      handleCommandObject( (Object [] ) inputObject ) ;
1530 
1531                 }
1532 
1533              break ;
1534 
1535              case ST_POOL_2_POOL :
1536              {
1537                  _log.debug( "stateEngine: case ST_POOL_2_POOL");
1538                 if( inputObject == null ){
1539 
1540                    if( ( rc = askForPoolToPool( _overwriteCost ) ) == RT_FOUND ){
1541 
1542                       nextStep( ST_WAITING_FOR_POOL_2_POOL , WAIT ) ;
1543                       _state = "Pool2Pool "+_formatter.format(new Date()) ;
1544                       setError(0,"");
1545                       _pingHandler.startP2P(_p2pPoolCandidateInfo.getPoolName()) ;
1546 
1547                       if (_sendHitInfo ) sendHitMsg(  _pnfsId,
1548                               (_p2pSourcePoolInfo!=null)?
1549                                   _p2pSourcePoolInfo.getPoolName():
1550                                   "<UNKNOWN>", true );   //VP
1551 
1552                    }else if( rc == RT_NOT_PERMITTED ){
1553 
1554                        if( _bestPool == null) {
1555                            if( _enforceP2P ){
1556                               nextStep( ST_DONE , CONTINUE ) ;
1557                            }else if( _parameter._hasHsmBackend && _storageInfo.isStored() ){
1558                               _log.info("ST_POOL_2_POOL : Pool to pool not permitted, trying to stage the file");
1559                               nextStep( ST_STAGE , CONTINUE ) ;
1560                            }else{
1561                               setError(265,"Pool to pool not permitted");
1562                               nextStep( ST_SUSPENDED , WAIT ) ;
1563                            }
1564                        }else{
1565                            _poolCandidateInfo = _bestPool ;
1566                            _log.info("ST_POOL_2_POOL : Choosing high cost pool "+_poolCandidateInfo.getPoolName());
1567 
1568                          if( _sendCostInfo )sendCostMsg(_pnfsId, _bestPool , false);
1569 
1570                          setError(0,"");
1571                          nextStep( ST_DONE , CONTINUE ) ;
1572                        }
1573 
1574                    }else if( rc == RT_S_COST_EXCEEDED ){
1575 
1576                       _log.info("ST_POOL_2_POOL : RT_S_COST_EXCEEDED");
1577 
1578                       if( _parameter._hasHsmBackend && _parameter._stageOnCost && _storageInfo.isStored() ){
1579 
1580                           if( _enforceP2P ){
1581                              nextStep( ST_DONE , CONTINUE ) ;
1582                           }else{
1583                              _log.info("ST_POOL_2_POOL : staging");
1584                              nextStep( ST_STAGE , CONTINUE ) ;
1585                           }
1586                       }else{
1587 
1588                          if( _bestPool != null ){
1589 
1590                              _poolCandidateInfo = _bestPool;
1591                              _log.info("ST_POOL_2_POOL : Choosing high cost pool "+_poolCandidateInfo.getPoolName());
1592 
1593                             if( _sendCostInfo )sendCostMsg(_pnfsId, _bestPool , false);
1594                             setError(0,"");
1595                             nextStep( ST_DONE , CONTINUE ) ;
1596                          }else{
1597                             //
1598                             // this can't possibly happen
1599                             //
1600                             setError(194,"PANIC : File not present in any reasonable pool");
1601                             nextStep( ST_DONE , CONTINUE ) ;
1602                          }
1603 
1604                       }
1605                    }else if( rc == RT_COST_EXCEEDED ){
1606                       //
1607                       //
1608                       if( _bestPool == null ){
1609                          //
1610                          // this can't possibly happen
1611                          //
1612                          if( _enforceP2P ){
1613                             nextStep( ST_DONE , CONTINUE ) ;
1614                          }else{
1615                             setError(192,"PANIC : File not present in any reasonable pool");
1616                             nextStep( ST_DONE , CONTINUE ) ;
1617                          }
1618 
1619                       }else{
1620 
1621                           _poolCandidateInfo = _bestPool;
1622 
1623                          if( _sendCostInfo )sendCostMsg(_pnfsId, _bestPool , false);
1624 
1625                          _log.info(" found high cost object");
1626 
1627                          setError(0,"");
1628                          nextStep( ST_DONE , CONTINUE ) ;
1629 
1630                       }
1631 
1632 
1633                    }else{
1634 
1635                       if( _enforceP2P ){
1636                          nextStep( ST_DONE , CONTINUE ) ;
1637                       }else if( _parameter._hasHsmBackend && _storageInfo.isStored() ){
1638                          nextStep( ST_STAGE , CONTINUE ) ;
1639                       }else{
1640                          nextStep( ST_SUSPENDED , WAIT ) ;
1641                       }
1642 
1643                    }
1644 
1645                 }
1646              }
1647              break ;
1648 
1649              case ST_STAGE :
1650                 _log.debug( "stateEngine: case ST_STAGE");
1651                 if( inputObject == null ){
1652 
1653                    if( _suspendStaging ){
1654                          _state = "Suspended Stage (forced) "+_formatter.format(new Date()) ;
1655                          _currentRc = 1005 ;
1656                          _currentRm = "Suspend enforced";
1657                         nextStep( ST_SUSPENDED , WAIT ) ;
1658                         sendInfoMessage( _pnfsId , _storageInfo ,
1659                                          _currentRc , "Suspended Stage (forced) "+_currentRm );
1660                         return ;
1661                    }
1662 
1663                    if( ( rc = askForStaging() ) == RT_FOUND ){
1664 
1665                       nextStep( ST_WAITING_FOR_STAGING , WAIT ) ;
1666                       _state = "Staging "+_formatter.format(new Date()) ;
1667                       setError(0,"");
1668                       _pingHandler.startStage(_poolCandidateInfo.getPoolName()) ;
1669 
1670                    }else if( rc == RT_OUT_OF_RESOURCES ){
1671 
1672                       _restoreExceeded ++ ;
1673                       outOfResources("Restore") ;
1674 
1675                    }else{
1676                       //
1677                       // we coudn't find a pool for staging
1678                       //
1679                       errorHandler() ;
1680                    }
1681 
1682                 }
1683 
1684              break ;
1685              case ST_WAITING_FOR_POOL_2_POOL :
1686                 _log.debug( "stateEngine: case ST_WAITING_FOR_POOL_2_POOL");
1687                 if( inputObject instanceof Message ){
1688 
1689                    if( ( rc =  exercisePool2PoolReply((Message)inputObject) ) == RT_OK ){
1690 
1691                       nextStep( _parameter._p2pForTransfer && ! _enforceP2P ? ST_INIT : ST_DONE , CONTINUE ) ;
1692 
1693                    }else if( rc == RT_CONTINUE ){
1694                        //
1695                        //
1696                    }else{
1697                        _log.info("ST_POOL_2_POOL : Pool to pool reported a problem");
1698                        if( _parameter._hasHsmBackend && _storageInfo.isStored() ){
1699 
1700                            _log.info("ST_POOL_2_POOL : trying to stage the file");
1701                            nextStep( ST_STAGE , CONTINUE ) ;
1702 
1703                        }else{
1704                            errorHandler() ;
1705                        }
1706 
1707                    }
1708 
1709                 }else if( inputObject instanceof Object [] ){
1710 
1711                    handleCommandObject( (Object []) inputObject ) ;
1712 
1713                 }else{
1714                     _pingHandler.gotReply(inputObject);
1715                 }
1716 
1717              break ;
1718              case ST_WAITING_FOR_STAGING :
1719                 _log.debug( "stateEngine: case ST_WAITING_FOR_STAGING" );
1720                 if( inputObject instanceof Message ){
1721 
1722                    if( ( rc =  exerciseStageReply( (Message)inputObject ) ) == RT_OK ){
1723 
1724                       nextStep( _parameter._p2pForTransfer ? ST_INIT : ST_DONE , CONTINUE ) ;
1725 
1726                    }else if( rc == RT_DELAY ){
1727                        _state = "Suspended By HSM request";
1728                        nextStep( ST_SUSPENDED , WAIT ) ;
1729                    }else if( rc == RT_CONTINUE ){
1730 
1731                    }else{
1732 
1733                       errorHandler() ;
1734 
1735                    }
1736                 }else if( inputObject instanceof Object [] ){
1737 
1738                    handleCommandObject( (Object []) inputObject ) ;
1739 
1740                 }else{
1741                     _pingHandler.gotReply(inputObject);
1742                 }
1743              break ;
1744              case ST_SUSPENDED :
1745                 _log.debug( "stateEngine: case ST_SUSPENDED" );
1746                 if( inputObject instanceof Object [] ){
1747 
1748                    handleCommandObject( (Object []) inputObject ) ;
1749 
1750                 }
1751              return ;
1752 
1753              case ST_DONE :
1754                 _log.debug( "stateEngine: case ST_DONE" );
1755                 if( inputObject == null ){
1756 
1757                    clearSteering();
1758                    //
1759                    // it is essential that we are not within any other
1760                    // lock when trying to get the handlerHash lock.
1761                    //
1762                    synchronized( _handlerHash ){
1763                       if( answerRequest( _maxRequestClumping ) ){
1764                           nextStep( ST_INIT , CONTINUE ) ;
1765                       }else{
1766                           _handlerHash.remove( _name ) ;
1767                       }
1768                    }
1769                 }
1770 
1771              return ;
1772           }
1773        }
1774        private void handleCommandObject( Object [] c ){
1775 
1776           String command = c[0].toString() ;
1777           if( command.equals("failed") ){
1778 
1779              clearSteering();
1780              setError(((Integer)c[1]).intValue(),c[2].toString());
1781              nextStep(ST_DONE,CONTINUE);
1782 
1783           }else if( command.equals("retry") ){
1784 
1785              _state = "Retry enforced" ;
1786              _retryCounter = 0 ;
1787              clearSteering() ;
1788              _pnfsFileLocation.clear() ;
1789              setError(0,"");
1790              if( ( c.length > 1 ) && c[1].toString().equals("update") )getStorageInfo();
1791              nextStep(ST_INIT,CONTINUE);
1792 
1793           }else if( command.equals("alive") ){
1794 
1795              long now = System.currentTimeMillis() ;
1796 
1797              if (now > _nextTtlTimeout) {
1798                  expireRequests();
1799              }
1800 
1801              if( ( _waitUntil > 0L ) && ( now > _waitUntil ) ){
1802                 nextStep(ST_INIT,CONTINUE);
1803                 clearSteering() ;
1804              }else{
1805                 _pingHandler.alive() ;
1806              }
1807 
1808           }
1809 
1810        }
1811        private void getStorageInfo(){
1812           try{
1813              PnfsGetStorageInfoMessage getStorageInfo = new PnfsGetStorageInfoMessage( _pnfsId ) ;
1814 
1815              CellMessage request = new CellMessage(
1816                                       new CellPath("PnfsManager") ,
1817                                       getStorageInfo ) ;
1818 
1819              request = sendAndWait( request , 30000 ) ;
1820              if( request == null )
1821                 throw new
1822                 Exception("Timeout : PnfsManager request for storageInfo of "+_pnfsId);
1823 
1824              getStorageInfo = (PnfsGetStorageInfoMessage)request.getMessageObject();
1825              switch (getStorageInfo.getReturnCode()) {
1826              case 0:
1827                  _storageInfo = getStorageInfo.getStorageInfo();
1828                  break;
1829              case CacheException.FILE_NOT_FOUND:
1830              case CacheException.NOT_IN_TRASH:
1831                  setError(getStorageInfo.getReturnCode(),
1832                           "File not found");
1833                  break;
1834              default:
1835                  _log.warn("Fetching storage info failed: " +
1836                            getStorageInfo.getErrorObject());
1837                  break;
1838              }
1839           }catch(Exception ee ){
1840               _log.warn("Fetching storage info failed : "+ee);
1841           }
1842        }
1843        private void outOfResources( String detail ){
1844 
1845           clearSteering();
1846           setError(5,"Resource temporarily unavailable : "+detail);
1847           nextStep( ST_DONE , CONTINUE ) ;
1848           _state = "Failed" ;
1849           sendInfoMessage( _pnfsId , _storageInfo ,
1850                            _currentRc , "Failed "+_currentRm );
1851        }
1852        private void errorHandler(){
1853           if(_retryCounter == 0 ){
1854              //
1855              // retry immediately (stager will take another pool)
1856              //
1857              _pnfsFileLocation.clear() ;
1858              getStorageInfo();
1859              nextStep( ST_INIT, CONTINUE ) ;
1860              //
1861           }else if( _retryCounter < _maxRetries ){
1862              //
1863              // now retry only after some time
1864              //
1865              _pnfsFileLocation.clear() ;
1866              getStorageInfo();
1867              waitFor( _retryTimer ) ;
1868              nextStep( ST_INIT , WAIT ) ;
1869              _state = "Waiting "+_formatter.format(new Date()) ;
1870              //
1871           }else{
1872              if( _onError.equals( "suspend" ) ){
1873                 _state = "Suspended "+_formatter.format(new Date()) ;
1874                 nextStep( ST_SUSPENDED , WAIT ) ;
1875                 sendInfoMessage( _pnfsId , _storageInfo ,
1876                                  _currentRc , "Suspended "+_currentRm );
1877              }else{
1878                 nextStep( ST_DONE , CONTINUE ) ;
1879                 _state = "Failed" ;
1880                 sendInfoMessage( _pnfsId , _storageInfo ,
1881                                  _currentRc , "Failed "+_currentRm );
1882              }
1883           }
1884 
1885        }
1886        private int exerciseStageReply( Message messageArrived ){
1887           try{
1888 
1889              if( messageArrived instanceof PoolFetchFileMessage ){
1890                 PoolFetchFileMessage reply = (PoolFetchFileMessage)messageArrived ;
1891 
1892                 int rc;
1893                 _currentRc = reply.getReturnCode();
1894 
1895                 switch(_currentRc) {
1896                     case 0:
1897                         // best candidate is the right one
1898                         rc = RT_OK;
1899                         break;
1900                     case CacheException.HSM_DELAY_ERROR:
1901                         _currentRm = "Suspend by HSM request : " + reply.getErrorObject() == null ?
1902                                 "No info" : reply.getErrorObject().toString() ;
1903                         rc = RT_DELAY;
1904                         break;
1905                     default:
1906                         _currentRm = reply.getErrorObject() == null ?
1907                                 ( "Error="+_currentRc ) : reply.getErrorObject().toString() ;
1908 
1909                         rc =  RT_ERROR ;
1910                 }
1911 
1912                 return rc;
1913 
1914              }else{
1915                 throw new
1916                 CacheException(204,"Invalid message arrived : "+
1917                                messageArrived.getClass().getName());
1918 
1919              }
1920           }catch(Exception ee ){
1921              _currentRc = ee instanceof CacheException ? ((CacheException)ee).getRc() : 102 ;
1922              _currentRm = ee.getMessage();
1923              _log.warn("exerciseStageReply : "+ee ) ;
1924              _log.warn(ee.toString());
1925              return RT_ERROR ;
1926           }
1927        }
1928        private int exercisePool2PoolReply( Message messageArrived ){
1929           try{
1930 
1931              if( messageArrived instanceof Pool2PoolTransferMsg ){
1932                 Pool2PoolTransferMsg reply = (Pool2PoolTransferMsg)messageArrived ;
1933                 _log.info("Pool2PoolTransferMsg replied with : "+reply);
1934                 if( ( _currentRc = reply.getReturnCode() ) == 0 ){
1935                     _poolCandidateInfo = _p2pPoolCandidateInfo ;
1936                    return RT_OK ;
1937 
1938                 }else{
1939 
1940                    _currentRm = reply.getErrorObject() == null ?
1941                                 ( "Error="+_currentRc ) : reply.getErrorObject().toString() ;
1942 
1943                    return RT_ERROR ;
1944 
1945                 }
1946              }else{
1947 
1948                 throw new
1949                 CacheException(205,"Invalid message arrived : "+
1950                                messageArrived.getClass().getName());
1951 
1952              }
1953           }catch(Exception ee ){
1954              _currentRc = ee instanceof CacheException ? ((CacheException)ee).getRc() : 102 ;
1955              _currentRm = ee.getMessage();
1956              _log.warn("exercisePool2PoolReply : "+ee ) ;
1957              _log.warn(ee.toString());
1958              return RT_ERROR ;
1959           }
1960        }
1961        //
1962        //  calculate :
1963        //       matrix = list of list of active
1964        //                pools with file available (sorted)
1965        //
1966        //  if empty :
1967        //        bestPool = 0 , return NOT_FOUND
1968        //
1969        //  else
1970        //        determine best pool by
1971        //
1972        //        if allowFallback :
1973        //           first row for which cost < costCut or
1974        //           if not found, pool with lowest cost.
1975        //        else
1976        //           leftmost pool of first nonzero row
1977        //
1978        //  if bestPool > costCut :
1979        //        return COST_EXCEEDED
1980        //
1981        //  chose best pool from row selected above by :
1982        //     if ( minCostCut > 0 ) :
1983        //         take all pools of the selected row
1984        //         with cost < minCostCut and make hash selection.
1985        //     else
1986        //         take leftmost pool.
1987        //
1988        //  return FOUND
1989        //
1990        //  RESULT :
1991        //      RT_FOUND :
1992        //         file is on pool which is allowed and has reasonable cost.
1993        //      RT_NOT_FOUND :
1994        //         file is not in cache at all
1995        //      RT_NOT_PERMITTED :
1996        //         file not in an permitted pool but somewhere else
1997        //      RT_COST_EXCEEDED :
1998        //         file is in permitted pools but cost is too high.
1999        //      RT_ERROR :
2000        //         - No entry in configuration Permission Matrix
2001        //         - Code Exception
2002        //
2003        private int askIfAvailable(){
2004 
2005           String err = null ;
2006           try{
2007 
2008              List<List<PoolCostCheckable>> avMatrix =
2009                  _pnfsFileLocation.getFileAvailableMatrix();
2010              int matrixSize = avMatrix.size() ;
2011              //
2012              // the DB matrix has no rows, which
2013              // means that there are no pools which are allowed
2014              // to serve this request.
2015              //
2016              if( ( matrixSize == 0 ) ||
2017                  ( _pnfsFileLocation.getAllowedPoolCount() == 0 ) ){
2018 
2019                  err="Configuration Error : No entries in Permission Matrix for this request" ;
2020                  setError(130,err) ;
2021                  _log.warn("askIfAvailable : "+err);
2022                  return RT_ERROR ;
2023 
2024              }
2025              //
2026              // we define the top row as the default parameter set for
2027              // cases where none of the pools hold the file.
2028              //
2029              List<PoolManagerParameter> paraList =
2030                  _pnfsFileLocation.getListOfParameter() ;
2031              _parameter = paraList.get(0);
2032              //
2033              // The file is not in the dCache at all.
2034              //
2035              if( _pnfsFileLocation.getAcknowledgedPnfsPools().size() == 0 ){
2036                  _log.info("askIfAvailable : file not in pool at all");
2037                  return RT_NOT_FOUND ;
2038              }
2039              //
2040              // The file is in the cache but not on a pool where
2041              // we would be allowed to read it from.
2042              //
2043              if( _pnfsFileLocation.getAvailablePoolCount()  == 0 ){
2044                  _log.info("askIfAvailable : file in cache but not in read-allowed pool");
2045                  return RT_NOT_PERMITTED ;
2046              }
2047              //
2048              // File is at least on one pool from which we could
2049              // get it. Now we have to find the pool with the
2050              // best performance cost.
2051              // Matrix is assumed to be sorted, so we
2052              // only have to check the leftmost entry
2053              // in the list (get(0)). Rows could be empty.
2054              //
2055              _bestPool       = null;
2056              List<PoolCostCheckable> bestAv = null;
2057              int  validCount = 0;
2058              List<PoolCostCheckable> tmpList = new ArrayList<PoolCostCheckable>();
2059              int  level      = 0;
2060              boolean allowFallbackOnPerformance = false;
2061 
2062              for( Iterator<List<PoolCostCheckable>> i = avMatrix.iterator() ; i.hasNext() ; level++ ){
2063 
2064                 List<PoolCostCheckable> av = i.next() ;
2065 
2066                 if( av.size() == 0 )continue ;
2067 
2068                 validCount++;
2069                 PoolCostCheckable cost = av.get(0);
2070                 tmpList.add(cost);
2071 
2072                 if( ( _bestPool == null ) ||
2073                     ( _bestPool.getPerformanceCost() > cost.getPerformanceCost() ) ){
2074 
2075                    _bestPool = cost ;
2076                    bestAv    = av ;
2077 
2078                 }
2079                 _parameter = paraList.get(level);
2080                 allowFallbackOnPerformance = _parameter._fallbackCostCut > 0.0 ;
2081 
2082                 if( ( ( ! allowFallbackOnPerformance ) &&
2083                       ( validCount == 1              )    ) ||
2084                     ( _bestPool.getPerformanceCost() < _parameter._fallbackCostCut ) )break ;
2085              }
2086              //
2087              // this can't happen because we already know that
2088              // there are pools which contain the files and which
2089              // are allowed for us.
2090              //
2091              if( _bestPool == null )return RT_NOT_FOUND ;
2092 
2093              double bestPoolPerformanceCost = _bestPool.getPerformanceCost() ;
2094 
2095              if(   (  _parameter.getCostCut()     > 0.0         ) &&
2096                    (  bestPoolPerformanceCost >= getCurrentCostCut( _parameter))    ){
2097 
2098                 if( allowFallbackOnPerformance ){
2099                    //
2100                    // if all costs are too high, the above list
2101                    // has been scanned up to the very end. But it
2102                    // could be that one of the first rows has a
2103                    // better cost than the last one, so we have
2104                    // to correct here.
2105                    //
2106                    _log.info("askIfAvailable : allowFallback , recalculation best cost");
2107                    _bestPool = Collections.min(
2108                                   tmpList ,
2109                                    _poolMonitor.getCostComparator(false,_parameter)
2110                                ) ;
2111 
2112                 }
2113                 _log.info("askIfAvailable : cost exceeded on all available pools, "+
2114                     "best pool would have been "+_bestPool);
2115                 return RT_COST_EXCEEDED ;
2116              }
2117 
2118              if( ( _parameter._panicCostCut > 0.0 ) && ( bestPoolPerformanceCost > _parameter._panicCostCut ) ){
2119                 _log.info("askIfAvailable : cost of best pool exceeds 'panic' level");
2120                 setError(125,"Cost of best pool exceeds panic level") ;
2121                 return RT_ERROR ;
2122              }
2123              _log.info("askIfAvailable : Found candidates : "+bestAv);
2124              //
2125              //  this part is intended to get rid of duplicates if the
2126              //  load is decreasing.
2127              //
2128              PoolCostCheckable cost = null ;
2129              SortedMap<Integer,PoolCostCheckable> list =
2130                  new TreeMap<Integer,PoolCostCheckable>();
2131 
2132              if( _parameter._minCostCut > 0.0 ){
2133 
2134                 for( int i = 0 , n = bestAv.size() ; i < n ; i++ ){
2135 
2136                    cost = bestAv.get(i) ;
2137 
2138                    double costValue = cost.getPerformanceCost() ;
2139 
2140                    if( costValue < _parameter._minCostCut ){
2141                       //
2142                       // here we sort it arbitrary but reproducible
2143                       // (whatever that means)
2144                       //
2145                       String poolName = cost.getPoolName() ;
2146                       _log.info("askIfAvailable : "+poolName+" below "+_parameter._minCostCut+" : "+costValue);
2147                       list.put((_pnfsId.toString()+poolName).hashCode(), cost);
2148                    }
2149                 }
2150 
2151              }
2152 
2153              cost =
2154                  (list.size() > 0 ? list.get(list.firstKey()) : bestAv.get(0));
2155 
2156              _log.info( "askIfAvailable : candidate : "+cost ) ;
2157 
2158 
2159              if( _sendCostInfo )sendCostMsg( _pnfsId, cost, false );
2160 
2161              _poolCandidateInfo = cost ;
2162              setError(0,"") ;
2163 
2164              return RT_FOUND ;
2165 
2166           }catch(Exception ee ){
2167              _log.warn(err="Exception in getFileAvailableList : "+ee ) ;
2168              _log.warn(ee.toString());
2169              setError(130,err) ;
2170              return RT_ERROR ;
2171           }finally{
2172              _log.info( "askIfAvailable : Took  "+(System.currentTimeMillis()-_started));
2173           }
2174 
2175        }
2176        //
2177        // Result :
2178        //    FOUND :
2179        //        valid source/destination pair found fitting all constraints.
2180        //    NOT_PERMITTED :
2181        //        - already too many copies (_maxPnfsFileCopies)
2182        //        - file already everywhere (no destination found)
2183        //        - SAME_HOST_NEVER : but no valid combination found
2184        //    COST_EXCEEDED :
2185        //        - slope == 0 : all destination pools > costCut (p2p)
2186        //          else       : (best destination) > ( slope * source )
2187        //    S_COST_EXCEEDED :
2188        //        - all source pools > alert
2189        //    ERROR
2190        //        - no source pool (code problem)
2191        //
2192                private int askForPoolToPool(boolean overwriteCost) {
2193                        try {
2194                                //
2195                                //
2196                                List<PoolCostCheckable> sources =
2197                                    _pnfsFileLocation.getCostSortedAvailable();
2198                                //
2199                                // Here we get the parameter set of the 'read'
2200                                //
2201                                PoolManagerParameter parameter = _pnfsFileLocation
2202                                                .getCurrentParameterSet();
2203 
2204                                if (sources.size() == 0) {
2205 
2206                                        setError(132,
2207                                                        "PANIC : Tried to do p2p, but source was empty");
2208                                        return RT_ERROR;
2209 
2210                                } else if (sources.size() >= parameter._maxPnfsFileCopies) {
2211                                        //
2212                                        // already too many copies of the file
2213                                        //
2214                                        _log.info("askForPoolToPool : already too many copies : "
2215                                                        + sources.size());
2216                                        setError(133, "Not replicated : already too many copies : "
2217                                                        + sources.size());
2218                                        return RT_NOT_PERMITTED;
2219 
2220                                } else if ((!overwriteCost)
2221                                                && (parameter._alertCostCut > 0.0)
2222                                                && ((sources.get(0))
2223                                                                .getPerformanceCost() > parameter._alertCostCut)) {
2224                                        //
2225                                        // all source are too busy
2226                                        //
2227                                        _log.info("askForPoolToPool : all p2p source(s) are too busy (cost > "
2228                                                        + parameter._alertCostCut + ")");
2229                                        setError(134,
2230                                                        "Not replicated : all p2p source(s) are too busy (cost > "
2231                                                                        + parameter._alertCostCut + ")");
2232                                        return RT_S_COST_EXCEEDED;
2233 
2234                                }
2235                                //
2236                                // make sure we are either below costCut or
2237                                // 'slope' below the source pool.
2238                                //
2239                                double maxCost = parameter._slope > 0.01 ? (parameter._slope * sources.get(0).getPerformanceCost())
2240                                                : getCurrentCostCut( parameter);
2241 
2242 
2243                                List<List<PoolCostCheckable>> matrix =
2244                                    _pnfsFileLocation.getFetchPoolMatrix(DirectionType.P2P,
2245                                                _storageInfo, _protocolInfo, _storageInfo
2246                                                                .getFileSize());
2247 
2248                                if (matrix.size() == 0) {
2249                                        setError(
2250                                                        136,
2251                                                        "Not replicated : No pool candidates available/configured/left for p2p or file already everywhere");
2252                                        _log.info("askForPoolToPool : No pool candidates available/configured/left for p2p or file already everywhere");
2253                                        return RT_NOT_PERMITTED;
2254                                }
2255                                //
2256                                // Here we get the parameter set of the 'p2p'
2257                                //
2258                                parameter = _pnfsFileLocation.getCurrentParameterSet();
2259 
2260                                List<PoolCostCheckable> destinations = null;
2261 
2262                                for (Iterator<List<PoolCostCheckable>> it = matrix.iterator(); it.hasNext();) {
2263                                        destinations = it.next();
2264                                        if (destinations.size() > 0)
2265                                                break;
2266                                }
2267 
2268//                                subtract all source pools from the list of destination pools (those pools already have a copy)
2269                                for (PoolCostCheckable dest : destinations) {
2270                                        for (PoolCheckable src : sources) {
2271                                                if (dest.getPoolName().equals( src.getPoolName()) ) {
2272                                                        _log.info("removing pool "+dest.getPoolName()+" from dest pool list");
2273                                                        destinations.remove(dest);
2274                                                }
2275                                        }
2276                                }
2277 
2278                                //
2279                                if (destinations.size() == 0) {
2280                                        //
2281                                        // file already everywhere
2282                                        //
2283                                        _log.info("askForPoolToPool : file already everywhere");
2284                                        setError(137, "Not replicated : file already everywhere");
2285                                        return RT_NOT_PERMITTED;
2286                                }
2287 
2288                                if ((!overwriteCost) && (maxCost > 0.0)) {
2289 
2290                                        List<PoolCostCheckable> selected =
2291                                            new ArrayList<PoolCostCheckable>();
2292                                        for (PoolCostCheckable dest : destinations) {
2293                                                PoolCostCheckable cost = dest;
2294                                                if (cost.getPerformanceCost() < maxCost)
2295                                                        selected.add(cost);
2296                                        }
2297                                        if (selected.size() == 0) {
2298                                                _log.info("askForPoolToPool : All destination pools exceed cost "
2299                                                                + maxCost);
2300                                                setError(137,
2301                                                                "Not replicated : All destination pools exceed cost "
2302                                                                                + maxCost);
2303                                                return RT_COST_EXCEEDED;
2304                                        }
2305 
2306                                        destinations = selected;
2307                                }
2308                                //
2309                                // The 'performance cost' of all destination pools is below
2310                                // maxCost,
2311                                // and the destination list is sorted according to the
2312                                // 'full cost'.
2313                                //
2314 
2315                                _pnfsFileLocation.sortByCost(destinations, true);
2316 
2317                                //
2318                                // loop over all source, destination combinations and find the
2319                                // most appropriate for (source.hostname !=
2320                                // destination.hostname)
2321                                //
2322                PoolCheckable sourcePool                = null;
2323                PoolCheckable destinationPool           = null;
2324                PoolCheckable bestEffortSourcePool      = null;
2325                PoolCheckable bestEffortDestinationPool = null;
2326 
2327                                Map<String, String> map = null;
2328 
2329                                for (int s = 0, sMax = sources.size(); s < sMax; s++) {
2330 
2331                                        PoolCheckable sourceCost = sources.get(s);
2332 
2333                                        String sourceHost = ((map = sourceCost.getTagMap()) == null ? null : map.get("hostname"));
2334 
2335                                        for (int d = 0, dMax = destinations.size(); d < dMax; d++) {
2336 
2337                                                PoolCheckable destinationCost = destinations.get(d);
2338 
2339                                                if (parameter._allowSameHostCopy == PoolManagerParameter.P2P_SAME_HOST_NOT_CHECKED) {
2340                                                        // we take the pair with the least cost without
2341                                                        // further hostname checking
2342                                                        sourcePool = sourceCost;
2343                                                        destinationPool = destinationCost;
2344                                                        break;
2345                                                }
2346 
2347                                                // save the pair with the least cost for later reuse
2348                                                if (bestEffortSourcePool == null) bestEffortSourcePool = sourceCost;
2349                                                if (bestEffortDestinationPool == null) bestEffortDestinationPool = destinationCost;
2350 
2351                                                _log.info("p2p same host checking : "
2352                                                                + sourceCost.getPoolName() + " "
2353                                                                + destinationCost.getPoolName());
2354 
2355                                                String destinationHost = ((map = destinationCost.getTagMap()) == null ? null : map.get("hostname"));
2356 
2357                                                if (sourceHost != null && !sourceHost.equals(destinationHost)) {
2358                                                        // we take the first src/dest-pool pair not residing on the same host
2359                                                        sourcePool = sourceCost;
2360                                                        destinationPool = destinationCost;
2361                                                        break;
2362                                                }
2363                                        }
2364                                        if (sourcePool != null && destinationPool != null)
2365                                                break;
2366                                }
2367 
2368                                if (sourcePool == null || destinationPool == null) {
2369 
2370//                                        ok, we could not find a pair on different hosts, what now?
2371 
2372                                        if (parameter._allowSameHostCopy == PoolManagerParameter.P2P_SAME_HOST_BEST_EFFORT) {
2373                                                _log.info("P2P : sameHostCopy=bestEffort : couldn't find a src/dest-pair on different hosts, choosing pair with the least cost");
2374                                                sourcePool = bestEffortSourcePool;
2375                                                destinationPool = bestEffortDestinationPool;
2376 
2377                                        } else if (parameter._allowSameHostCopy == PoolManagerParameter.P2P_SAME_HOST_NEVER) {
2378                                                _log.info("P2P : sameHostCopy=never : no matching pool found");
2379                                                setError(137,
2380                                                                "Not replicated : sameHostCopy=never : no matching pool found");
2381                                                return RT_NOT_PERMITTED;
2382 
2383                                        } else {
2384                                                _log.info("P2P : coding error, bad state");
2385                                                setError(137,
2386                                                                "Not replicated : coding error, bad state");
2387                                                return RT_NOT_PERMITTED;
2388                                        }
2389                                }
2390 
2391                _log.info("P2P : source=" + sourcePool.getPoolName() + ";dest=" + destinationPool.getPoolName());
2392 
2393                sendPool2PoolRequest(
2394                      (_p2pSourcePoolInfo    = sourcePool).getPoolName(),
2395                      (_p2pPoolCandidateInfo = destinationPool ).getPoolName()
2396                                     );
2397 
2398                if (_sendCostInfo)sendCostMsg(_pnfsId, (PoolCostCheckable)destinationPool, true);// VP
2399 
2400 
2401                                return RT_FOUND;
2402 
2403            } catch ( CacheException ce) {
2404 
2405                setError( ce.getRc() , ce.getMessage());
2406                _log.warn(ce.toString());
2407 
2408                return RT_ERROR;
2409 
2410            } catch (Exception ee) {
2411 
2412                setError( 128 , ee.getMessage());
2413                _log.warn(ee.toString());
2414 
2415                return RT_ERROR;
2416 
2417            } finally {
2418                _log.info("Selection pool 2 pool took : "+ (System.currentTimeMillis() - _started));
2419            }
2420 
2421                }
2422 
2423        private PoolCostCheckable askForFileStoreLocation( DirectionType mode  )
2424            throws CacheException, InterruptedException
2425        {
2426 
2427            //
2428            // matrix contains cost for original db matrix minus
2429            // the pools already containing the file.
2430            //
2431            List<List<PoolCostCheckable>> matrix =
2432                    _pnfsFileLocation.getFetchPoolMatrix (
2433                                mode ,
2434                                _storageInfo ,
2435                                _protocolInfo ,
2436                                _storageInfo.getFileSize() ) ;
2437 
2438 
2439            PoolManagerParameter parameter = _pnfsFileLocation.getCurrentParameterSet() ;
2440 
2441            if( matrix.size() == 0 )
2442                  throw new
2443                  CacheException( 149 , "No pool candidates available/configured/left for "+mode ) ;
2444 
2445 
2446            PoolCostCheckable cost = null ;
2447            if( _poolCandidateInfo == null ){
2448                int n = 0 ;
2449                for( Iterator<List<PoolCostCheckable>> i = matrix.iterator() ; i.hasNext() ; n++ ){
2450 
2451                    parameter = _pnfsFileLocation.getListOfParameter().get(n) ;
2452                    cost = i.next().get(0);
2453                    if( ( parameter._fallbackCostCut  == 0.0 ) ||
2454                        ( cost.getPerformanceCost() < parameter._fallbackCostCut ) )break ;
2455 
2456                 }
2457            }else{
2458 
2459                 //
2460                 // find a pool which is not identical to the first candidate
2461                 //
2462                //
2463                //    This prepares for the 'host name' comparison.
2464                //    (tmpMap is used to avoid calling getTagMap twice. The variable is used within the
2465                //     the for(for( loop for the same reason. The scope is limited to just two lines.)
2466                //
2467                Map<String, String> tmpMap = _poolCandidateInfo == null ? null : _poolCandidateInfo.getTagMap() ;
2468                String currentCandidateHostName = tmpMap == null ? null : (String)tmpMap.get("hostname") ;
2469 
2470                PoolCostCheckable rememberBest = null ;
2471 
2472                 for( Iterator<List<PoolCostCheckable>> i = matrix.iterator() ; i.hasNext() ; ){
2473 
2474                    for( Iterator<PoolCostCheckable> n = i.next().iterator() ; n.hasNext() ; ){
2475 
2476                       PoolCostCheckable c  = n.next() ;
2477                       //
2478                       // skip this one if we tried this last time
2479                       //
2480                       if( c.getPoolName().equals(_poolCandidateInfo.getPoolName()) &&  n.hasNext() ) {
2481                           _log.info("askFor "+mode+" : Second shot excluding : " + _poolCandidateInfo.getPoolName() ) ;
2482                           continue;
2483                       }
2484 
2485                       //
2486                       //  If the setting disallows 'sameHostRetry' and the hostname information
2487                       //  is present, we try to honor this.
2488                       //
2489                       if( ( _sameHostRetry != SAME_HOST_RETRY_NOTCHECKED ) && ( currentCandidateHostName != null )){
2490                           //
2491                           // Remember the best even if it is on the same host, in case of 'best effort'.
2492                           //
2493                           if( rememberBest == null )rememberBest = c  ;
2494                           //
2495                           // skip this if the hostname is available and identical to the first candidate.
2496                           //
2497                           String thisHostname = ( tmpMap = c.getTagMap() ) == null ? null : (String) tmpMap.get("hostname") ;
2498                           if( ( thisHostname != null ) && ( thisHostname.equals(currentCandidateHostName) ) )continue ;
2499                        }
2500                        //
2501                        // If the 'fallbackoncost' option is enabled and the cost of the smallest
2502                        // pool is still higher than the specified threshold, don't set the pool and
2503                        // step to the next level.
2504                        //
2505                        if( (  parameter._fallbackCostCut > 0.0 ) &&  ( c.getPerformanceCost() > parameter._fallbackCostCut ) ){
2506                            rememberBest = null ;
2507                            break ;
2508                        }
2509                        //
2510                        // now we can safely assign the best pool and break the loop.
2511                        //
2512                        cost = c ;
2513 
2514                       break ;
2515                    }
2516                    if( cost != null )break ;
2517                 }
2518 
2519                 //
2520                 // clear the pool candidate if this second shot didn't find a good pool. So we can try the first one
2521                 // again. If we don't, systems with a single pool for this request will never recover. (lionel bug 2132)
2522                 //
2523 
2524                 cost = ( cost == null ) && ( _sameHostRetry == SAME_HOST_RETRY_BESTEFFORT )  ? rememberBest : cost ;
2525 
2526           }
2527           _parameter = parameter ;
2528 
2529           if( cost == null )
2530              throw new
2531              CacheException( 150 , "No cheap candidates available for '"+mode+"'");
2532 
2533           return cost ;
2534       }
2535 
2536        //
2537        //   FOUND :
2538        //        - pool candidate found
2539        //   NOT_FOUND :
2540        //        - no pools configured
2541        //        - pools configured but not active
2542        //        - no pools left after subtracting primary candidate.
2543        //   OUT_OF_RESOURCES :
2544        //        - too many requests queued
2545        //
2546        private int askForStaging(){
2547 
2548           try{
2549 
2550               _poolCandidateInfo = askForFileStoreLocation( DirectionType.CACHE ) ;
2551 
2552               //_poolCandidate     = _poolCandidateInfo.getPoolName() ;
2553 
2554               _log.info( "askForStaging : poolCandidate -> "+_poolCandidateInfo.getPoolName());
2555 
2556               if( ! sendFetchRequest( _poolCandidateInfo.getPoolName() , _storageInfo ) )return RT_OUT_OF_RESOURCES ;
2557 
2558               setError(0,"");
2559 
2560               if( _sendCostInfo )sendCostMsg(_pnfsId, (PoolCostCheckable)_poolCandidateInfo , true);//VP
2561 
2562 
2563              return RT_FOUND ;
2564 
2565           }catch( CacheException ce ){
2566 
2567               setError( ce.getRc() , ce.getMessage() );
2568               _log.warn( ce.toString() );
2569 
2570               return RT_NOT_FOUND ;
2571 
2572           }catch( Exception ee ){
2573 
2574              setError( 128 , ee.getMessage() );
2575              _log.warn(ee.toString()) ;
2576 
2577              return RT_ERROR ;
2578 
2579           }finally{
2580              _log.info( "Selection cache took : "+(System.currentTimeMillis()-_started));
2581           }
2582 
2583       }
2584 
2585    }
2586 
2587    private void sendInfoMessage( PnfsId pnfsId ,
2588                                  StorageInfo storageInfo ,
2589                                  int rc , String infoMessage ){
2590      try{
2591        WarningPnfsFileInfoMessage info =
2592            new WarningPnfsFileInfoMessage(
2593                                    "PoolManager","PoolManager",pnfsId ,
2594                                    rc , infoMessage )  ;
2595            info.setStorageInfo( storageInfo ) ;
2596 
2597        sendMessage(
2598         new CellMessage( new CellPath(_warningPath), info )
2599                             ) ;
2600 
2601      }catch(Exception ee){
2602         _log.warn("Coudn't send WarningInfoMessage : "+ee ) ;
2603      }
2604    }
2605 
2606    //VP
2607    public void sendCostMsg( PnfsId pnfsId,
2608                             PoolCostCheckable checkable,
2609                             boolean useBoth){
2610       try {
2611       /*
2612          PoolCostInfoMessage msg = new PoolCostInfoMessage(checkable.getPoolName(), pnfsId);
2613          double cost = _poolMonitor.calculateCost(checkable, useBoth);
2614          msg.setCost(cost);
2615          _cell.sendMessage(new CellMessage( new CellPath(_warningPath), msg));
2616        */
2617       }catch (Exception ee){
2618          _log.warn("Couldn't report cost for : "+pnfsId+" : "+ee);
2619       }
2620    }
2621 
2622    private void sendHitMsg(PnfsId pnfsId, String poolName, boolean cached)
2623    {
2624        try {
2625            PoolHitInfoMessage msg = new PoolHitInfoMessage(poolName, pnfsId);
2626            msg.setFileCached(cached);
2627            sendMessage(new CellMessage( new CellPath(_warningPath), msg));
2628        } catch (Exception ee) {
2629            _log.warn("Couldn't report hit info for : "+pnfsId+" : "+ee);
2630        }
2631    }
2632 
2633    public void setSendCostInfo(boolean sendCostInfo) {
2634        _sendCostInfo = sendCostInfo;
2635    }
2636 
2637    /**
2638     * Establish the costCut at the moment for the given set of parameters.
2639     * If the costCut was assigned a value from a string ending with a '%' then
2640     * that percentile cost is used.
2641     * @param parameter which set of parameters to consider.
2642     * @return the costCut, taking into account possible relative costCut.
2643     */
2644    private double getCurrentCostCut( PoolManagerParameter parameter) {
2645        return parameter.isCostCutPercentile()
2646            ? _poolMonitor.getPoolsPercentilePerformanceCost( parameter.getCostCut())
2647            : parameter.getCostCut();
2648    }
2649    //VP
2650 
2651    public void setStageConfigurationFile(String path)
2652    {
2653        _stagePolicyDecisionPoint = new CheckStagePermission(path);
2654    }
2655}

[all classes][diskCacheV111.poolManager]
EMMA 2.0.5312 (C) Vladimir Roubtsov