DESY Hbb Analysis Framework
naf_submit.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 import os
3 import glob
4 import re
5 from argparse import ArgumentParser
6 from argparse import HelpFormatter
7 from shutil import copyfile, rmtree, move
8 from time import sleep
9 
10 from colors import bcolors
11 
12 # --- functions ---
13 def getConfigParameter( config, parameter ):
14  p = None
15  exist = False
16  with open(config) as f:
17  for line in f:
18  line = line.replace(" ","").strip()
19  if len(line) == 0:
20  continue
21  if line[0] != '#' and line.split("=")[0] == parameter:
22  par = line.split("=")[1]
23  exist = True
24  p = [parameter,par]
25  if parameter=="ntuplesList" and "tools:" in p[1]:
26  ntp_path = os.getenv('CMSSW_BASE')
27  ntp_path += "/src/Analysis/Tools/data/ntuples/"
28  p[1] = p[1].replace("tools:",ntp_path)
29  break
30 
31  return p
32 
33 def createConfigParameter( config, parameter ):
34  exist = False
35  with open(config,"r") as f:
36  lines = f.readlines()
37 
38  for line in lines:
39  line = line.replace(" ","").strip()
40  if len(line) == 0:
41  continue
42  if line[0] != '#' and line.split("=")[0] == parameter:
43  exist = True
44  break
45 
46  if not exist:
47  with open(config, "w") as f:
48  f.write(parameter+" = \n")
49  for line in lines:
50  f.write(line)
51 
52 
53 def replaceConfigParameter( config, parameter, newpar ):
54  par = None
55  with open(config, "r") as f:
56  lines = f.readlines()
57  with open(config, "w") as f:
58  for line in lines:
59  l = line.replace(" ","").strip()
60  if len(l) < 1:
61  f.write(line)
62  continue
63  if l[0] != '#' and l.split('=')[0] == parameter:
64  if l.split('=')[1] == "" :
65  par = "="
66  newpar = " = " + newpar
67  else:
68  par = l.split('=')[1]
69  f.write(re.sub(par, newpar, line))
70  else:
71  f.write(line)
72 
73 def removeConfigParameter( config, parameter ):
74  with open(config, "r") as f:
75  lines = f.readlines()
76  with open(config, "w") as f:
77  for line in lines:
78  l = line.replace(" ","").strip()
79  if len(l) < 1:
80  f.write(line)
81  continue
82  if l[0] != '#' and l.split('=')[0] == parameter:
83  continue;
84  f.write(line)
85 
86 
87 def basenameConfigParameter( config, name ):
88  with open(config, "r") as f:
89  lines = f.readlines()
90  with open(config, "w") as f:
91  for line in lines:
92  f.write(re.sub(name, os.path.basename(name), line))
93 
94 
96  libpath = os.environ['LD_LIBRARY_PATH']
97  condor_submit = """
98 Requirements = ( OpSysAndVer == "CentOS7" )
99 getenv = True
100 executable = $(jobdir)/job.sh
101 log = $(jobdir)/xjob_$(Cluster)_$(Process).log
102 output = $(jobdir)/xjob_$(Cluster)_$(Process).out
103 error = $(jobdir)/xjob_$(Cluster)_$(Process).err
104 environment = "LD_LIBRARY_PATH_STORED=$ENV(LD_LIBRARY_PATH)"
105 
106 queue jobdir matching dirs job_*
107  """
108  with open(f'{maindir}/jobs.submit', 'w') as condor_file:
109  print(condor_submit,file=condor_file)
110 
111 def createCondorSubmit(jobdir):
112  libpath = os.environ['LD_LIBRARY_PATH']
113  condor_submit = """
114 Requirements = ( OpSysAndVer == "CentOS7" )
115 getenv = True
116 executable = job.sh
117 log = xjob_$(Cluster)_$(Process).log
118 output = xjob_$(Cluster)_$(Process).out
119 error = xjob_$(Cluster)_$(Process).err
120 environment = "LD_LIBRARY_PATH_STORED=$ENV(LD_LIBRARY_PATH)"
121 
122 queue
123  """
124  with open(f'{jobdir}/job.submit', 'w') as condor_file:
125  print(condor_submit,file=condor_file)
126 
127 def createJobScript(exedir,jobid, exe, cfg):
128  js = """
129 if [[ ! -e "{}" ]]; then
130  cd {}
131 fi
132 export LD_LIBRARY_PATH=$LD_LIBRARY_PATH_STORED
133 {} -c {}
134  """.format(cfg,jobid,exe,cfg)
135  js_name = f'{exedir}/job.sh'
136  with open(js_name, 'w') as job_script:
137  print(js,file=job_script)
138  os.chmod(js_name, 0o744)
139 
140 
142 
143  ntuples = args.ntuples
144  json = args.json
145  config = args.config
146  events_max = -1
147  if args.events_max:
148  events_max = args.events_max
149  test = args.njobs
150 
151  configMC = getConfigParameter( config, "isMC" )
152  isMC = configMC[1] == 'true'
153 
154  if test:
155  print('TEST MODE:', test, 'jobs')
156 
157  configNtuples = None
158  # get parameter from configuration
159  if config:
160  if not os.path.isfile(config):
161  print ("Configuration file does not exist")
162  quit()
163  configNtuples = getConfigParameter( config, "ntuplesList" )
164  if not ntuples:
165  if configNtuples:
166  ntuples = configNtuples[1]
167  if not ntuples:
168  print ("*error* You must define the parameter ntuplesList in your configuration.")
169  quit()
170  if not isMC:
171  configJson = getConfigParameter( config, "json" )
172  if not json:
173  if configJson:
174  json = configJson[1]
175  else:
176  json = None
177 
178  # checking if require files exist
179  if ntuples:
180  if not os.path.isfile(ntuples):
181  print ("Ntuples list file does not exist")
182  quit()
183  if json:
184  if (not 'tools:' in json) and (not os.path.isfile(json)):
185  print ("Json file does not exist")
186  quit()
187 
188  # directory where the jobs will be stored
189  maindir = "Condor_"+os.path.basename(args.exe)
190  if config:
191  maindir = maindir+"_"+ os.path.splitext(os.path.basename(config))[0]
192  if args.label:
193  maindir += '_'+args.label
194  cwd = os.getcwd()
195  if os.path.exists(cwd+"/"+maindir):
196  print ('')
197  print (bcolors.FAIL,'>>>>>>', maindir,"already exists. Rename or remove it and then resubmit",bcolors.ENDC)
198  status(maindir)
199  quit()
200  os.mkdir(maindir)
201  os.mkdir(maindir+'/finished_jobs')
202 
203  # splitting the file list
204  if ntuples:
205  pid = os.getpid()
206  tmpdir = ".tmp_" + str(pid)
207  os.mkdir(tmpdir)
208  copyfile(ntuples, tmpdir+"/"+os.path.basename(ntuples))
209  if config:
210  copyfile(config, tmpdir+"/"+os.path.basename(config))
211  os.chdir(tmpdir)
212  if config:
213  # replace json and ntuples in the local exe config by their basenames
214  if json:
215  if (not 'tools:' in json):
216  createConfigParameter(os.path.basename(config),'json')
217  replaceConfigParameter(os.path.basename(config), 'json', os.path.basename(json))
218  else:
219  removeConfigParameter(os.path.basename(config),'json')
220  # ntuples list
221  createConfigParameter(os.path.basename(config),'ntuplesList')
222  replaceConfigParameter(os.path.basename(config), 'ntuplesList', os.path.basename(ntuples))
223  if events_max:
224  replaceConfigParameter(os.path.basename(config), 'eventsMax', events_max)
225 
226 
227  splitcmd = "split.csh" + " " + str(args.nfiles) + " " + os.path.basename(ntuples)
228  os.system(splitcmd)
229  os.remove(os.path.basename(ntuples)) # not needed anymore after the splitting
230  files = glob.glob('.*_x????.txt')
231  files.sort()
232  os.chdir(cwd)
233 
234  # loop over the splitted files, each will correspond to a job on the NAF
235 
236  for i,f in enumerate(files):
237  if test:
238  if i >= int(test) and int(test)>=0:
239  os.chdir(cwd)
240  break
241  jobnum = os.path.splitext(f)[0][-4:]
242  jobid = "job_"+jobnum
243  exedir = maindir+"/"+jobid
244  os.mkdir(exedir)
245  # moving stuff to the proper directories
246  move(tmpdir+"/"+f,exedir+"/"+os.path.basename(ntuples))
247  if json:
248  if not 'tools:' in json:
249  copyfile(json, exedir+"/"+os.path.basename(json))
250  if config:
251  copyfile(tmpdir+"/"+os.path.basename(config),exedir+"/"+os.path.basename(config))
252  condorcmd = "condor_scripts.csh" + " " + jobid + " " + args.exe + " " + os.path.basename(config)
253  createCondorSubmit(cwd+'/'+exedir)
254  createJobScript(exedir,jobid,args.exe,os.path.basename(config))
255  else:
256  condorcmd = "condor_scripts.csh" + " " + jobid + " " + args.exe
257  # make the submissions
258  # os.chdir(exedir)
259  jobf = open(f'{exedir}/seed.txt', 'w+')
260  jobf.write(str(int(jobnum)+1))
261  # print >> jobf, int(jobnum)+1
262  jobf.close()
263  print ("Creating ",jobid,"...")
264  # os.system(condorcmd)
265  # if not test:
266  # os.chdir(exedir)
267  # os.system('condor_submit job.submit')
268  # sleep(0.2)
269  # back to original directory
270  # os.chdir(cwd)
271 
272  createCondorDirsSubmit(cwd+'/'+maindir)
273  if not test:
274  os.chdir(cwd+'/'+maindir)
275  os.system('condor_submit jobs.submit')
276 
277  sleep(2)
278  os.chdir(cwd)
279  status(maindir)
280 
281  else:
282  exedir = maindir+"/job_0000"
283  os.mkdir(exedir)
284  if os.path.isfile(args.exe):
285  copyfile(args.exe, exedir+"/"+os.path.basename(args.exe))
286  os.chdir(exedir)
287  jobf = open('./seed.txt', 'w+')
288  print >> jobf, 1
289  jobf.close()
290  condorcmd = "condor_scripts.csh job_0000" + " " + os.path.basename(args.exe)
291  os.system(condorcmd)
292  if not test:
293  os.system('condor_submit job.submit')
294  os.chdir(cwd)
295 
296  # remove the temporary directory
297  os.chdir(cwd)
298  if ntuples:
299  os.remove(tmpdir+"/"+os.path.basename(config))
300  rmtree(tmpdir)
301 
302 
303 # given the directory of a job_xxx return the most recent log file
304 def get_job_log(jobdir):
305  files = os.listdir(jobdir)
306  log = [ jobdir+'/'+x for x in files if x.endswith('.log') ]
307  if len(log) > 1:
308  return max(log, key=os.path.getctime)
309  elif len(log) == 1:
310  return log[0]
311  else:
312  return None
313 
314 # given the directory of a job_xxx check if there is anything in the most recent err file
315 def job_err(jobdir):
316  files = os.listdir(jobdir)
317  errs = [ jobdir+'/'+x for x in files if x.endswith('.err') ]
318  if len(errs) > 1:
319  err = max(errs, key=os.path.getctime)
320  elif len(errs) == 1:
321  err = errs[0]
322  else:
323  # if err file does not exist, there is no error
324  return False
325 
326  return (os.path.getsize(err) > 0)
327 
328 
329 
330 # given the directory of a job_xxxx check the status in the most recent log file
331 def get_job_status(jobdir):
332  job_status = {}
333  job_status['submission'] = False
334  job_status['termination'] = False
335  job_status['execution'] = False
336  job_status['abortion'] = False
337  job_status['error'] = True
338  job_status['jobid'] = ' '
339  log = get_job_log(jobdir)
340  if not log:
341  return job_status
342  logname = os.path.basename(log)
343  job_status['jobid'] = logname.split('.')[0].split('_')[1] + '.' + logname.split('.')[0].split('_')[2]
344  f = open(log,'r')
345  for l in f:
346  if re.search('Job submitted from host',l):
347  job_status['submission'] = True
348  if re.search('Job executing on host',l):
349  job_status['execution'] = True
350  if re.search('Job terminated.',l):
351  job_status['termination'] = True
352  if re.search('Job was evicted',l) or re.search('Job was aborted',l):
353  job_status['abortion'] = True
354  if re.search('return value 0',l):
355  job_status['error'] = False
356 
357  return job_status
358 
359 
360 
361 
362 def status(submission_dir, failed_only=False):
363  failed_jobs = 0
364 # submission_dir = args.status
365  finished_dir = submission_dir+'/finished_jobs'
366  jobs_dir = os.listdir(submission_dir)
367  jobs_dir = [ x for x in jobs_dir if 'job_' in x ]
368  jobs_dir.sort()
369  print(' ')
370  header = ' *** STATUS OF JOBS ***'
371  if failed_only:
372  header = ' *** FAILED JOBS ***'
373  print(header)
374  print('\n '+submission_dir)
375  print(' ')
376  dash= ' ----------------------------------------------------------------------------------------------------------'
377  print(dash)
378  print(' job finished running submitted aborted error condor_id (latest)')
379  print(dash)
380  for jj in jobs_dir:
381  j = submission_dir+'/'+jj
382  js = get_job_status(j)
383  je = job_err(j)
384  finished = ' '
385  running = ' '
386  submitted = ' '
387  aborted = ' '
388  error = ' ' # check
389  jobid = js['jobid']
390  if js['termination']:
391  finished = '\u2705' # check
392  error = '\u2705 ' # check
393 # if js['error']:
394 # finished = '\u2705'
395  if js['error'] and not js['abortion']:
396  error = '\u274C' # red x
397  if not js['abortion'] and not js['error']:
398  # search for finished.txt before moving job dir, guarantee that all everything finished
399  if os.path.isfile(j+'/finished.txt'):
400  sleep(1)
401  move(j,finished_dir)
402  else:
403  finished = '\u26A0\uFE0F ' # warning
404 # elif js['execution'] and not js['abortion'] and not js['error']:
405  elif js['execution'] and not js['abortion']:
406  running = '\u2705'
407  elif js['submission'] and not js['abortion']:
408  submitted = '\u2705'
409  if js['abortion']:
410  aborted = '\u274C ' # red cross
411 
412  if not ( js['submission'] or js['termination'] or js['execution'] or js['abortion'] ):
413  submitted = '\u26A0\uFE0F ' # warning
414  if js['abortion'] or ( js['termination'] and js['error']):
415  failed_jobs +=1
416  if failed_only and (not (js['abortion'] or ( js['termination'] and js['error']))):
417  continue
418  print(' '+jj+' '+finished+' '+running+' '+submitted+' '+aborted+' '+error+' '+jobid)
419  if len(jobs_dir) == 0:
420  print(' No jobs to be checked!')
421  print(dash)
422  if not failed_only:
423  print('\n N.B.: Good *finished* jobs will no longer appear in future "--status" calls')
424 
425  return failed_jobs
426 
427 # resubmit jobs according to their status
428 def resubmit(submission_dir):
429  failed_jobs = status(submission_dir,True)
430  if failed_jobs == 0:
431  return
432  jobs_dir = os.listdir(submission_dir)
433  jobs_dir = [ x for x in jobs_dir if 'job_' in x ]
434  jobs_dir.sort()
435  cwd = os.getcwd()
436  print(' ')
437  print(' *** Resubmit failed jobs ***')
438  print(dash)
439  confirmed = confirm()
440  if not confirmed:
441  print(dash)
442  print(' ')
443  return
444  for jj in jobs_dir:
445  j = submission_dir+'/'+jj
446  js = get_job_status(j)
447  if js['abortion'] or ( js['termination'] and js['error'] ):
448  os.chdir(j)
449  os.system('condor_submit job.submit')
450  os.chdir(cwd)
451  print(dash)
452  print(' ')
453  os.chdir(cwd)
454  sleep(2)
455  status(submission_dir)
456 
457 
458 
459 # resubmit jobs expert mode
460 def resubmit_expert(submission_dir):
461  failed_jobs = status(submission_dir,True)
462  if failed_jobs == 0:
463  return
464  cwd = os.getcwd()
465 # submission_dir = args.resubmit_expert
466  os.chdir(submission_dir)
467  print(' ')
468  print(' *** Resubmit jobs (EXPERT) ***')
469  print(dash)
470  confirmed = confirm()
471  if not confirmed:
472  print(dash)
473  print(' ')
474  return
475  os.system('condor_submit jobs.submit')
476  print(dash)
477  print(' ')
478  os.chdir(cwd)
479  sleep(2)
480  status(submission_dir)
481 
482 
483 def confirm():
484  answer = " "
485  while answer not in ["y", "n", ""]:
486  answer = input(" Please confirm this action (Y/[N])? ").lower()
487  return (answer == "y" or answer != "")
488 
489 
490 # --- main code ---
491 
492 # parsing arguments
493 parser = ArgumentParser(prog='naf_submit.py', formatter_class=lambda prog: HelpFormatter(prog,indent_increment=6,max_help_position=80,width=280), description='Prepare, submit and check jobs to NAF HTCondor batch system',add_help=True)
494 parser_submission = parser.add_argument_group('submission','prepare and submit jobs')
495 
496 parser_submission.add_argument("--exe" , "-e" , dest="exe" , help="Executable (REQUIRED)")
497 parser_submission.add_argument("--config" , "-c" , dest="config" , help="Configuration file (REQUIRED)")
498 parser_submission.add_argument("--ntuples", "-n" , dest="ntuples" , help="List of ntuples file")
499 parser_submission.add_argument("--nfiles" , "-x" , dest="nfiles" , type=int, default=1, help="Number of ntuple files per job")
500 parser_submission.add_argument("--json" , "-j" , dest="json" , help="JSON file with certified data")
501 parser_submission.add_argument("--label" , "-l" , dest="label" , help="user label for the submission")
502 parser_submission.add_argument("--events" , dest="events_max" , default="-1" , help="override eventsMax in the config file (default = -1)")
503 parser_submission.add_argument("--test" , dest="njobs" , help="*** expert only ***:produce njobs, no automatic submission")
504 
505 
506 parser_status = parser.add_argument_group('status','show and modify status')
507 parser_status.add_argument("--dir" , dest="dir" , help="an existing condor directory (REQUIRED)")
508 parser_status.add_argument("--status" , dest="status" , action="store_true", help="-> returns the status of the jobs in --dir")
509 parser_status.add_argument("--resubmit" , dest="resubmit" , action="store_true", help="-> resubmits aborted and finished-with-error jobs in --dir")
510 parser_status.add_argument("--expert" , dest="expert" , action="store_true", help=" -> *** expert mode ***")
511 args = parser.parse_args()
512 
513 dash = ' ----------------------------------------------------------------------------------------------------------'
514 
515 if args.dir:
516  if not os.path.exists(args.dir):
517  print("Directory", args.dir, "does not exist")
518  quit()
519  if args.status:
520  status(args.dir)
521  elif args.resubmit and not args.expert:
522  resubmit(args.dir)
523  elif args.resubmit and args.expert:
524  resubmit_expert(args.dir)
525  else:
526  parser.print_help()
527 else:
528  if not args.exe and not args.config:
529  parser.print_help()
530  quit()
531  submission()
532 
def replaceConfigParameter(config, parameter, newpar)
Definition: naf_submit.py:53
def confirm()
Definition: naf_submit.py:483
def status(submission_dir, failed_only=False)
Definition: naf_submit.py:362
def job_err(jobdir)
Definition: naf_submit.py:315
def basenameConfigParameter(config, name)
Definition: naf_submit.py:87
def createCondorDirsSubmit(maindir)
Definition: naf_submit.py:95
def get_job_log(jobdir)
Definition: naf_submit.py:304
def submission()
Definition: naf_submit.py:141
def getConfigParameter(config, parameter)
Definition: naf_submit.py:13
def createJobScript(exedir, jobid, exe, cfg)
Definition: naf_submit.py:127
def createCondorSubmit(jobdir)
Definition: naf_submit.py:111
def resubmit(submission_dir)
Definition: naf_submit.py:428
def removeConfigParameter(config, parameter)
Definition: naf_submit.py:73
def createConfigParameter(config, parameter)
Definition: naf_submit.py:33
def get_job_status(jobdir)
Definition: naf_submit.py:331
def resubmit_expert(submission_dir)
Definition: naf_submit.py:460