1 | // $Id: DummyStager.java,v 1.2 2002-06-24 06:02:17 cvs Exp $ |
2 | |
3 | package diskCacheV111.hsmControl ; |
4 | |
5 | import java.util.* ; |
6 | import java.io.* ; |
7 | import java.text.* ; |
8 | |
9 | import dmg.util.* ; |
10 | import dmg.cells.nucleus.* ; |
11 | |
12 | import diskCacheV111.vehicles.* ; |
13 | import diskCacheV111.util.* ; |
14 | |
15 | import org.slf4j.Logger; |
16 | import org.slf4j.LoggerFactory; |
17 | |
18 | public class DummyStager extends CellAdapter { |
19 | |
20 | private final static Logger _log = |
21 | LoggerFactory.getLogger(DummyStager.class); |
22 | |
23 | private CellNucleus _nucleus ; |
24 | private Args _args ; |
25 | private int _requests = 0 ; |
26 | private int _failed = 0 ; |
27 | private int _outstandingRequests = 0 ; |
28 | private File _database = null ; |
29 | private SimpleDateFormat formatter |
30 | = new SimpleDateFormat ("MM.dd hh:mm:ss"); |
31 | |
32 | public DummyStager( String name , String args ) throws Exception { |
33 | super( name , args , false ) ; |
34 | _nucleus = getNucleus() ; |
35 | _args = getArgs() ; |
36 | try{ |
37 | if( _args.argc() < 1 ) |
38 | throw new |
39 | IllegalArgumentException("Usage : ... <database>") ; |
40 | |
41 | _database = new File( _args.argv(0) ) ; |
42 | if( ! _database.isDirectory() ) |
43 | throw new |
44 | IllegalArgumentException( "Not a directory : "+_database); |
45 | }catch(Exception e){ |
46 | start() ; |
47 | kill() ; |
48 | throw e ; |
49 | } |
50 | useInterpreter( true ); |
51 | _nucleus.newThread( new QueueWatch() , "queueWatch").start() ; |
52 | start(); |
53 | export(); |
54 | } |
55 | private class QueueWatch implements Runnable { |
56 | public void run(){ |
57 | _log.info("QueueWatch started" ) ; |
58 | while( ! Thread.currentThread().interrupted() ){ |
59 | try{ |
60 | Thread.currentThread().sleep(60000); |
61 | }catch(InterruptedException ie ){ |
62 | break ; |
63 | } |
64 | _nucleus.updateWaitQueue() ; |
65 | } |
66 | _log.info( "QueueWatch stopped" ) ; |
67 | } |
68 | } |
69 | public String toString(){ |
70 | return "Req="+_requests+";Err="+_failed+";" ; |
71 | } |
72 | public void getInfo( PrintWriter pw ){ |
73 | pw.println("DummyStager : [$Id: DummyStager.java,v 1.2 2002-06-24 06:02:17 cvs Exp $]" ) ; |
74 | pw.println("Requests : "+_requests ) ; |
75 | pw.println("Failed : "+_failed ) ; |
76 | pw.println("Outstanding : "+_outstandingRequests ) ; |
77 | } |
78 | public void messageArrived( CellMessage msg ){ |
79 | Object obj = msg.getMessageObject() ; |
80 | _requests ++ ; |
81 | if( obj instanceof StagerMessage ){ |
82 | StagerMessage stager = (StagerMessage)obj ; |
83 | _log.info( stager.toString() ) ; |
84 | try{ |
85 | sendStageRequest( stager ) ; |
86 | stager.setSucceeded(); |
87 | }catch(Exception iiee ){ |
88 | stager.setFailed( 33 , iiee ) ; |
89 | _log.warn("Problem in sendStageRequest: "+iiee); |
90 | } |
91 | msg.revertDirection() ; |
92 | try{ |
93 | sendMessage( msg ) ; |
94 | }catch(Exception ee ){ |
95 | _log.warn("Problem replying : "+ee ) ; |
96 | } |
97 | }else{ |
98 | _log.warn("Unknown message arrived ("+msg.getSourcePath()+") : "+ |
99 | msg.getMessageObject() ) ; |
100 | _failed ++ ; |
101 | } |
102 | } |
103 | private class StageCompanion implements CellMessageAnswerable { |
104 | private StagerMessage _stager = null ; |
105 | private StageCompanion( StagerMessage stager ){ |
106 | _stager = stager ; |
107 | } |
108 | public void answerArrived( CellMessage request , CellMessage answer ){ |
109 | _log.info( "Answer for : "+answer.getMessageObject() ) ; |
110 | _outstandingRequests -- ; |
111 | } |
112 | public void exceptionArrived( CellMessage request , Exception exception ){ |
113 | _log.warn( "Exception for : "+_stager+" : "+exception ) ; |
114 | _outstandingRequests -- ; |
115 | } |
116 | public void answerTimedOut( CellMessage request ){ |
117 | _log.warn( "Timeout for : "+_stager ) ; |
118 | _outstandingRequests -- ; |
119 | } |
120 | } |
121 | private void sendStageRequest( StagerMessage stager ){ |
122 | PoolMgrSelectReadPoolMsg request = |
123 | new PoolMgrSelectReadPoolMsg( |
124 | stager.getPnfsId(), |
125 | stager.getStorageInfo(), |
126 | stager.getProtocolInfo(), 0); |
127 | try{ |
128 | sendMessage( |
129 | new CellMessage( |
130 | new CellPath("PoolManager") , |
131 | request ) , |
132 | true , true , |
133 | new StageCompanion( stager ) , |
134 | 1*24*60*60*1000 |
135 | ) ; |
136 | _outstandingRequests ++ ; |
137 | }catch(Exception ee ){ |
138 | _log.warn("Failed to send request to PM : "+ee) ; |
139 | } |
140 | } |
141 | // |
142 | // stage and pin example for Timur |
143 | // |
144 | private HashMap _companionMap = new HashMap() ; |
145 | private class ExampleCompanion implements CellMessageAnswerable { |
146 | private PnfsId _pnfsId = null ; |
147 | private String _host = null ; |
148 | private boolean _pin = false ; |
149 | private StorageInfo _storageInfo = null ; |
150 | private String _status = "<WaitingForStorageInfo>" ; |
151 | private String _poolName = null ; |
152 | private ExampleCompanion( PnfsId pnfsId , String host , boolean pin ){ |
153 | _pnfsId = pnfsId ; |
154 | _host = host ; |
155 | _pin = pin ; |
156 | synchronized( _companionMap ){ |
157 | if( _companionMap.get(pnfsId) != null ) |
158 | throw new |
159 | IllegalArgumentException( "Staging "+_pnfsId+" in progess"); |
160 | |
161 | _companionMap.put( pnfsId , this ) ; |
162 | } |
163 | } |
164 | public void setStatus(String message ){_status = message ;} |
165 | public void answerArrived( CellMessage req , CellMessage answer ){ |
166 | _log.info( "Answer for : "+answer.getMessageObject() ) ; |
167 | Message message = (Message)answer.getMessageObject() ; |
168 | if( message.getReturnCode() != 0 ){ |
169 | |
170 | _log.warn( _status = "Manual stage : "+_pnfsId+" "+message.getErrorObject() ) ; |
171 | return ; |
172 | } |
173 | if( message instanceof PnfsGetStorageInfoMessage ){ |
174 | _storageInfo = ((PnfsGetStorageInfoMessage)message).getStorageInfo() ; |
175 | _log.info( "Manual Stager : storageInfoArrived : "+_storageInfo ) ; |
176 | |
177 | DCapProtocolInfo pinfo = new DCapProtocolInfo( "DCap",3,0,_host,0) ; |
178 | PoolMgrSelectReadPoolMsg request = |
179 | new PoolMgrSelectReadPoolMsg( |
180 | _pnfsId, |
181 | _storageInfo , |
182 | pinfo , 0); |
183 | try{ |
184 | sendMessage( |
185 | new CellMessage( |
186 | new CellPath("PoolManager") , |
187 | request ) , |
188 | true , true , |
189 | this , |
190 | 1*24*60*60*1000 |
191 | ) ; |
192 | _status = "<WaitingForStage>" ; |
193 | }catch(Exception ee ){ |
194 | _log.warn(_status = "Manual Stage : exception in sending stage req. : "+ee ) ; |
195 | return ; |
196 | } |
197 | }else if( message instanceof PoolMgrSelectReadPoolMsg ){ |
198 | PoolMgrSelectReadPoolMsg select = (PoolMgrSelectReadPoolMsg)message; |
199 | _log.info( "Manual Stager : PoolMgrSelectReadPoolMsg : "+select ) ; |
200 | _poolName = select.getPoolName() ; |
201 | if( _pin ){ |
202 | PoolSetStickyMessage sticky = |
203 | new PoolSetStickyMessage( _poolName , _pnfsId , true ) ; |
204 | try{ |
205 | sendMessage( |
206 | new CellMessage( |
207 | new CellPath(_poolName) , |
208 | sticky ) , |
209 | true , true , |
210 | this , |
211 | 60*1000 |
212 | ) ; |
213 | _status = " (sticky) assumed O.K." ; |
214 | }catch(Exception ee ){ |
215 | _log.warn(_status = "Manual Stage : exception in sending sticky req. : "+ee ) ; |
216 | return ; |
217 | } |
218 | }else{ |
219 | _status = "O.K." ; |
220 | } |
221 | }else if( message instanceof PoolSetStickyMessage ){ |
222 | // |
223 | // will no come |
224 | // |
225 | _status = " (sticky) O.K." ; |
226 | } |
227 | } |
228 | public void exceptionArrived( CellMessage request , Exception exception ){ |
229 | _log.warn( _status = "Exception for : "+_pnfsId+" : "+exception ) ; |
230 | } |
231 | public void answerTimedOut( CellMessage request ){ |
232 | _log.warn( _status = "Timeout for : "+_pnfsId ) ; |
233 | } |
234 | public String toString(){ |
235 | if( _poolName != null ){ |
236 | return _pnfsId.toString()+" Staged at : "+_poolName+" ; "+_status; |
237 | }else{ |
238 | return _pnfsId.toString()+" "+_status ; |
239 | } |
240 | } |
241 | } |
242 | public String hh_stage_remove = "<pnfsId>" ; |
243 | public String ac_stage_remove_$_1(Args args )throws Exception { |
244 | Object x = _companionMap.remove(new PnfsId(args.argv(0))); |
245 | if( x == null ) |
246 | throw new |
247 | IllegalArgumentException("Not found : "+_args.argv(0)); |
248 | return "Removed : "+args.argv(0) ; |
249 | } |
250 | public String hh_stage_ls = "" ; |
251 | public String ac_stage_ls( Args args )throws Exception { |
252 | Iterator i = _companionMap.values().iterator() ; |
253 | StringBuffer sb = new StringBuffer() ; |
254 | while( i.hasNext() ){ |
255 | sb.append( i.next().toString() ).append("\n"); |
256 | } |
257 | return sb.toString() ; |
258 | } |
259 | public String hh_stage_file = "<pnfsId> <destinationHost>" ; |
260 | public String ac_stage_file_$_2( Args args ) throws Exception { |
261 | PnfsId pnfsId = new PnfsId( args.argv(0) ) ; |
262 | String host = args.argv(1) ; |
263 | boolean pin = args.getOpt("pin") != null ; |
264 | |
265 | ExampleCompanion companion = new ExampleCompanion(pnfsId,host,pin) ; |
266 | |
267 | PnfsGetStorageInfoMessage storageInfoMsg = |
268 | new PnfsGetStorageInfoMessage( pnfsId ) ; |
269 | |
270 | try{ |
271 | sendMessage( new CellMessage( |
272 | new CellPath("PnfsManager") , |
273 | storageInfoMsg ) , |
274 | true , true , |
275 | companion , |
276 | 3600 * 1000 ) ; |
277 | }catch(Exception ee ){ |
278 | companion.setStatus("Problem sending 'getStorageInfo' : "+ee ); |
279 | _log.warn( "Problem sending 'getStorageInfo' : "+ee ) ; |
280 | throw ee ; |
281 | } |
282 | return "" ; |
283 | } |
284 | } |