IPP Software Navigation Tools IPP Links Communication Pan-STARRS Links

AlternateDistributionClient: update_mirror.py

File update_mirror.py, 29.4 KB (added by welling, 16 years ago)
Line 
1#! /usr/bin/env python
2
3import sys,os,stat,datetime,getopt,time,threading,Queue,re,copy,signal,cPickle
4import tarfile
5import parse_config
6
7##################
8# Notes-
9# -need to fetch updates of a given product in time order,
10# irrespective of their order in the archive.
11# -Is there a way we can test recursiveRm for safety?
12##################
13
14#topURI= "http://alala2.ifa.hawaii.edu/ipp025/ds"
15#topURI= "http://alala2.ifa.hawaii.edu/ipp049/ds"
16topURI= "http://datastore.ipp.ifa.hawaii.edu"
17#mirrorRootDir= "/Volumes/data/PS1/data/MIRROR"
18#mirrorRootDir= "/n/panstarrs/data/MIRROR"
19#mirrorRootDir= "/panstarrs/data/MIRROR"
20#mirrorRootDir= "/bessemer/welling/ps_ipp/MIRROR"
21#mirrorRootDir= "/big/welling/MIRROR"
22mirrorRootDir= "/var/tmp/MIRROR"
23#desiredProductList= ['dist0','gpc1-sample','iq-evaluation','md-all-rel-200907']
24desiredProductList= ['ps1-md']
25hiddenProductList= ['md-all-rel-200907',
26 'durham-200907',
27 'qub-200907',
28 'edb-200907',
29 'jhu-200907',
30 'threepi-all-200908',
31 'ps1-200910','ps1-cat-200910',
32 'ps1-md']
33productStatusTableFilename= "product_status.csv"
34failureTableFilename= "download_failures.csv"
35leftoverWorkFilename= "update_mirror_leftover_work.pkl"
36maxFetchAttempts= 3
37logFileName= "update_mirror.log"
38logFile= None
39lastFilesetId= None
40indexColumnFilterDict= {}
41verbose= 0
42debug= 0
43groupWriteFlag= 0
44selectTarballRegex= None
45numDownloadThreads= 10 # This seems to be about the limit for cyclops01
46gblKeepRunning= True
47
48# We use these in lieu of lists because they support the locking needed
49# by the threads.
50pendingDownloadQueue= None # All the products we're trying to get
51failedDownloadQueue= None # Products we've given up on
52
53# All of the workers will contribute to the count of total downloaded bytes
54totalKBytesDownloaded= 0
55lockTotalKBytesDownloaded= None
56
57def handler( signum, frame ):
58 raise IOError, "caught signal %s!"%signum
59
60def logMessage(str):
61 global logFile
62 if not logFile:
63 logFile= open(logFileName,"a")
64 logFile.write("%s: %s\n"%\
65 (datetime.datetime.now().strftime("%Y/%m/%d/%H:%M:%S"),
66 str))
67 logFile.flush()
68
69def logMessageAndExit(str):
70 logMessage(str)
71 sys.exit(str)
72
73def readCmdOutputToList(cmd):
74 #logMessage("running <%s>"%cmd)
75 cmdout= os.popen(cmd)
76 result= cmdout.readlines()
77 if cmdout.close() != None :
78 raise RuntimeError("Command failed: <%s>"%cmd)
79 #logMessage("read %d entries"%len(result))
80 return result
81
82def parseCSV( ifile ):
83 "returns a tuple containing a list of keys and a list of dicts"
84 lineList= []
85 possibleDelimiters= [",","\t",None] # empty string means whitespace-delimited
86 lines= ifile.readlines()
87 if len(lines)==0:
88 return [],lineList # in case input is an empty file
89 delimFound= 0
90 delimForThisFile= None
91 for delim in possibleDelimiters:
92 wordCount= len(lines[0].split(delim))
93 if wordCount<2: continue
94 delimStillACandidate= 1
95 for line in lines[1:]:
96 nwords= len(line.split(delim))
97 if nwords>1 and nwords != wordCount:
98 if debug: print "Delim is not <%s>\n"%delim
99 delimStillACandidate= 0
100 break
101 if delimStillACandidate:
102 delimFound= 1
103 delimForThisFile= delim
104 break
105 if not delimFound:
106 sys.exit("Cannot find the right delimiter for this CSV input!")
107 if debug: print"delimForThisFile= <%s>"%delimForThisFile
108 keys= lines[0].split(delimForThisFile)
109 keys= [ x.strip() for x in keys ]
110 stringsAreQuoted= 1
111 for key in keys:
112 if len(key)>0 \
113 and (not key.startswith('"') or not key.endswith('"')) \
114 and (not key.startswith("'") or not key.endswith("'")):
115 stringsAreQuoted= 0
116 if debug: print "stringsAreQuoted= %d"%stringsAreQuoted
117 if stringsAreQuoted:
118 keys = [ x[1:-1] for x in keys ]
119 lines= lines[1:]
120 lineNum= 1
121 for line in lines:
122 words= line.split(delimForThisFile)
123 words= [x.strip() for x in words]
124 if len(words)>0:
125 dict= {}
126 if len(words)!=len(keys):
127 errorMessage("Line length error: %d vs %d"%\
128 (len(words),len(keys)))
129 for i in xrange(len(keys)):
130 errorMessage("%d: <%s> <%s>"%(i,keys[i],words[i]))
131 sys.exit("Line length error parsing CSV at line %d"%(lineNum))
132 for i in xrange(len(keys)):
133 if (stringsAreQuoted
134 and ((words[i].startswith('"')
135 and words[i].endswith('"'))
136 or (words[i].startswith('"')
137 and words[i].endswith('"')))):
138 dict[keys[i]]= words[i][1:-1]
139 else:
140 try:
141 dict[keys[i]]= int(words[i])
142 except ValueError:
143 try:
144 dict[keys[i]]= float(words[i])
145 except ValueError:
146 dict[keys[i]]= words[i]
147
148 lineList.append(dict)
149 lineNum += 1
150 return (keys, lineList)
151
152def writeCSV( ofile, keyList, recDictList, delim=",", quoteStrings=0 ):
153 """
154 Each element of the input recDictList is a dictionary containing
155 keys from keyList.
156 """
157 if quoteStrings:
158 ofile.write('"%s"'%keyList[0])
159 for key in keyList[1:]:
160 ofile.write('%s"%s"'%(delim,key))
161 else:
162 ofile.write("%s"%keyList[0])
163 for key in keyList[1:]:
164 ofile.write("%s%s"%(delim,key))
165 ofile.write("\n")
166 for dict in recDictList:
167 if dict.has_key(keyList[0]):
168 val= dict[keyList[0]]
169 else: val= 'NA'
170 if isinstance(val,int): ofile.write("%d"%val)
171 elif isinstance(val,float): ofile.write("%r"%val)
172 elif quoteStrings:
173 ofile.write('"%s"'%val)
174 else:
175 ofile.write("%s"%val)
176 for key in keyList[1:]:
177 if dict.has_key(key):
178 val= dict[key]
179 else: val= 'NA'
180 if isinstance(val,int): ofile.write("%s%d"%(delim,val))
181 elif isinstance(val,float): ofile.write("%s%r"%(delim,val))
182 elif quoteStrings:
183 ofile.write('%s"%s"'%(delim,val))
184 else:
185 ofile.write("%s%s"%(delim,val))
186 ofile.write("\n")
187 if debug:
188 print "Wrote %d recs, delim=<%s>, quoteStrings= %d"%\
189 (len(recDictList),delim,quoteStrings)
190
191def parseIndexList(lines):
192 if len(lines)==0 or len(lines[0])==0:
193 raise RuntimeError("Got an empty index list")
194 if lines[0][0]=="#":
195 keys= lines[0][1:].split('|')
196 keys= ( k.strip() for k in keys )
197 cleanKeys= []
198 for k in keys:
199 if len(k)>0:
200 cleanKeys.append(k)
201 lines= lines[1:]
202 else:
203 raise RuntimeError("Did not find '#' in line 1 column 1 of downloaded file list")
204
205 result= []
206 lineCount= 1
207 for l in lines:
208 lineCount += 1
209 l.strip()
210 if len(l)>0:
211 words= l.strip().split('|')
212 cleanWords= []
213 for w in words:
214 if len(w)>0:
215 cleanWords.append(w)
216 cleanWords= [w.strip() for w in cleanWords]
217 if len(cleanWords)==0:
218 continue
219 if len(cleanWords) < len(cleanKeys):
220 logMessageAndExit("Wrong length line %d in index list"%lineCount)
221 lineDict= {}
222 for i in xrange(len(cleanKeys)):
223 lineDict[cleanKeys[i]]= cleanWords[i]
224 result.append(lineDict)
225
226 return result
227
228def makeGroupWritable(fname):
229 os.chmod(fname,os.stat(fname)[stat.ST_MODE] | stat.S_IWGRP)
230
231def makeDirsGroupWritable(path):
232 if not os.access(path,os.F_OK):
233 parent= os.path.dirname(path)
234 if parent!="/": makeDirsGroupWritable(parent)
235 os.mkdir(path)
236 makeGroupWritable(path)
237
238def placeFiles(product,incomingDir, runType, dirinfoDict, newFileInfoList):
239 topDestDir= os.path.join(mirrorRootDir, product,
240 dirinfoDict['destdir']['destdir'][1])
241 for entryType,entryComponent,entryFileID in newFileInfoList:
242 #logMessage("treating %s %s %s"%(entryType,entryComponent,entryFileID))
243 if entryType=='text':
244 outDir= topDestDir
245 if not os.access(outDir,os.W_OK):
246 if groupWriteFlag: makeDirsGroupWritable(outDir)
247 else: os.makedirs(outDir)
248 # Move annoying metadata to a subdirectory
249 if entryComponent=='dbinfo' or entryComponent=='dirinfo':
250 outDir= os.path.join(outDir,"metadata")
251 if not os.access(outDir,os.W_OK):
252 if groupWriteFlag: makeDirsGroupWritable(outDir)
253 else: os.makedirs(outDir)
254 os.rename(os.path.join(incomingDir,entryFileID),
255 os.path.join(outDir,entryFileID))
256 if groupWriteFlag:
257 makeGroupWritable(os.path.join(outDir,entryFileID))
258 elif entryType=='tgz':
259 outDir= os.path.join(topDestDir,
260 dirinfoDict['components'][entryComponent][1])
261 if not os.access(outDir,os.W_OK):
262 if groupWriteFlag: makeDirsGroupWritable(outDir)
263 else: os.makedirs(outDir)
264 tarFile= tarfile.open(os.path.join(incomingDir,entryFileID))
265 fnames= tarFile.getnames()
266 tarFile.extractall(path=outDir)
267 if groupWriteFlag:
268 for fn in fnames:
269 makeGroupWritable(os.path.join(outDir,fn))
270 logMessage("untarred %s (%d files)"%(entryFileID,len(fnames)))
271 else:
272 raise RuntimeError("Unknown entry type moving %s into place"%\
273 entryFileID)
274
275def fetchSingleUpdate(product,dict,incomingDir):
276 global totalKBytesDownloaded, lockTotalKBytesDownloaded
277 fetchCount= 0
278 totalBytesThisUpdate= 0
279 lines= readCmdOutputToList("wget -nv -a %s -O - %s/%s/%s/index.txt"%\
280 (logFileName,
281 topURI,product,dict['filesetID']))
282 indexList= parseIndexList(lines)
283 try:
284 if groupWriteFlag: makeDirsGroupWritable(incomingDir)
285 else: os.makedirs(incomingDir)
286 except IOError:
287 pass # this arises occasionally if incomingDir exists from prev run
288 newFileInfoList= []
289 runType= None
290 dirinfoDict= None
291 for entry in indexList:
292 if selectTarballRegex is not None \
293 and entry['type']=="tgz" \
294 and not selectTarballRegex.match(entry['fileID']):
295 continue
296 lines= readCmdOutputToList("wget -nv -a %s -P %s %s/%s/%s/%s"%\
297 (logFileName,
298 incomingDir,
299 topURI,product,dict['filesetID'],
300 entry['fileID']))
301 fetchCount += 1
302 fname= os.path.join(incomingDir, entry['fileID'])
303 totalBytesThisUpdate += int(entry['bytes'])
304 md5SumLines= readCmdOutputToList("md5sum %s"%fname)
305 md5Sum= md5SumLines[0].split()[0].strip()
306 #logMessage("%s: <%s> vs <%s>"%(entry['fileID'],
307 # md5Sum,entry['md5sum']))
308 if md5Sum!=entry['md5sum']:
309 raise RuntimeError("checksum mismatch on %s: %s vs. %s"%\
310 (entry['fileID'],md5Sum,entry['md5sum']))
311 if entry['type']=="tgz":
312 pass
313 elif entry['type']=='text':
314 if entry['component']=='dbinfo':
315 f= open(fname,"r")
316 l= f.readline().strip()
317 words= l.split()
318 f.close()
319 if len(words)!=2 or words[1]!="MULTI":
320 raise RuntimeError("First line of %s is not 'something MULTI'"%\
321 entry['fileID'])
322 runType= words[0]
323 elif entry['component']=='dirinfo':
324 f= open(fname,"r")
325 dirinfoDict= parse_config.parse(f,{})
326 f.close()
327 else:
328 raise RuntimeError("Unrecognized text component %s processing %s"%\
329 (entry['component'],entry['fileID']))
330 else:
331 raise RuntimeError("Unrecognized type %s processing %s"%\
332 (entry['type'],entry['fileID']))
333 newFileInfoList.append( (entry['type'], entry['component'],
334 entry['fileID']) )
335 if runType==None or dirinfoDict==None:
336 raise RuntimeError("Missing needed info processing %s"%\
337 dict["filesetID"])
338 placeFiles(product, incomingDir, runType, dirinfoDict, newFileInfoList)
339 recursiveRm(incomingDir)
340
341 logMessage("%s finished fileset %s: %d downloads, %d bytes"%\
342 (threading.currentThread().name,dict['filesetID'],
343 fetchCount,totalBytesThisUpdate))
344
345 lockTotalKBytesDownloaded.acquire()
346 totalKBytesDownloaded += totalBytesThisUpdate/1024
347 lockTotalKBytesDownloaded.release()
348
349 return dict
350
351def getFailureList():
352 try:
353 f= open(failureTableFilename,"r")
354 except IOError:
355 logMessage("Download failure table <%s> does not exist; will create it"%\
356 failureTableFilename)
357 return []
358
359 keyList,recs= parseCSV(f)
360 f.close()
361 tbl= {}
362 for r in recs:
363 tbl[r['productID']]= r
364 return recs
365
366def getProductStatusTable():
367 try:
368 f= open(productStatusTableFilename,"r")
369 except IOError:
370 logMessage("Product status table <%s> does not exist; will create it"%\
371 productStatusTableFilename)
372 return {}
373
374 keyList,recs= parseCSV(f)
375 f.close()
376 tbl= {}
377 for r in recs:
378 tbl[r['productID']]= r
379 return tbl
380
381def writeProductStatusTable(tbl):
382 keyList= []
383 recDictList= []
384 for k in tbl.keys():
385 thisRecDict= tbl[k]
386 thisRecDict['productID']= k
387 recDictList.append(thisRecDict)
388 for subKey in thisRecDict:
389 if not subKey in keyList:
390 keyList.append(subKey)
391 try:
392 f= open(productStatusTableFilename,"w")
393 except IOError:
394 logMessageAndExit("Can't write product status table <%s>"%\
395 productStatusTableFilename)
396 if len(keyList)>0 and len(recDictList)>0:
397 writeCSV(f, keyList, recDictList)
398 f.close()
399
400def writeFailureList(failureList):
401 " Write the failure list .csv if there are failures; otherwise clear the file."
402 if len(failureList)>0:
403 keyList= []
404 for dict in failureList:
405 for k in dict.keys():
406 if not k in keyList:
407 keyList.append(k)
408 try:
409 f= open(failureTableFilename,"w")
410 except IOError:
411 logMessageAndExit("Can't write failure table <%s>"%\
412 failureTableFilename)
413 writeCSV(f, keyList, failureList)
414 f.close()
415 else:
416 try:
417 os.remove(failureTableFilename)
418 except OSError:
419 pass # may fail because the file does not exist
420
421def recursiveRm(targetDir):
422 if os.access(targetDir,os.F_OK):
423 logMessage("recursiveRm: killing <%s>\n"%targetDir)
424 for root, dirs, files in os.walk(targetDir, topdown=False):
425 for name in files:
426 os.remove(os.path.join(root, name))
427 for name in dirs:
428 os.rmdir(os.path.join(root, name))
429 os.rmdir(targetDir)
430 else:
431 logMessage("recursiveRm: no <%s> to remove\n"%targetDir)
432
433
434def workerRun():
435 logMessage("%s starts"%\
436 (threading.currentThread().name))
437 while gblKeepRunning:
438 workTuple= pendingDownloadQueue.get()
439 if workTuple:
440 try:
441 (product, productDict, triesSoFar)= workTuple
442 logMessage("%s fetching %s %s"%\
443 (threading.currentThread().name,
444 product,productDict['filesetID']))
445 incomingDir= os.path.join(mirrorRootDir,product,
446 threading.currentThread().name,
447 productDict['filesetID'])
448 fetchResultsInfo= None
449 try:
450 fetchResultsInfo= fetchSingleUpdate(product,productDict,
451 incomingDir)
452 except OSError,e:
453 logMessage("File handling problem fetching %s %s: %s"%\
454 (product, productDict['filesetID'],e))
455 fetchResultsInfo=None
456 except RuntimeError,e:
457 logMessage("Fetch of %s %s failed: %s"%\
458 (product, productDict['filesetID'],e))
459 fetchResultsInfo=None
460 if fetchResultsInfo==None:
461 recursiveRm(incomingDir)
462 if triesSoFar<maxFetchAttempts:
463 pendingDownloadQueue.put((product, productDict,
464 triesSoFar+1))
465 else:
466 failedDownloadQueue.put((product,productDict,
467 triesSoFar+1))
468 logMessage("%s failed fetching %s %s"%\
469 (threading.currentThread().name,
470 product,productDict['filesetID']))
471 finally:
472 pendingDownloadQueue.task_done()
473 else:
474 time.sleep(0.1)
475
476def fetchUpdates(product, productStatusTable):
477 global lastFilesetId, indexColumnFilterDict
478 logMessage( "starting fetchUpdates for product %s"%product )
479 try:
480 if productStatusTable.has_key(product):
481 lines= readCmdOutputToList("wget -nv -a %s -O - %s/%s/index.txt?%s"%\
482 (logFileName,topURI, product,
483 productStatusTable[product]['filesetID']))
484 else:
485 lines= readCmdOutputToList("wget -nv -a %s -O - %s/%s/index.txt"%\
486 (logFileName,topURI,product))
487 except:
488 logMessage( "Unable to fetch the product index for %s; skipping it"%\
489 product )
490 return
491
492 productIndex= parseIndexList(lines)
493 culledProductIndex= []
494 cut= None
495 if productStatusTable.has_key(product):
496 cut= productStatusTable[product]['time registered']
497 for l in productIndex:
498 if l['time registered']>cut:
499 culledProductIndex.append(l)
500 if lastFilesetId!=None and l['filesetID']==lastFilesetId:
501 break
502 else:
503 if lastFilesetId!=None:
504 for l in productIndex:
505 culledProductIndex.append(l)
506 if l['filesetID']==lastFilesetId:
507 break
508 else:
509 culledProductIndex= productIndex
510
511 if len(indexColumnFilterDict.keys())!=0:
512 productIndex= culledProductIndex
513 culledProductIndex= []
514 for l in productIndex:
515 ok= True
516 for k,compRegex in indexColumnFilterDict.items():
517 if l.has_key(k) and not compRegex.match(l[k]):
518 ok= False
519 break
520 if ok:
521 culledProductIndex.append(l)
522
523 if len(culledProductIndex)>0:
524 for productDict in culledProductIndex:
525 pendingDownloadQueue.put((product, productDict,0))
526
527 # Any that fail to download will be noted separately as failures
528 productStatusTable[product]= culledProductIndex[-1]
529
530def describeSelf():
531 print """
532 usage: %s [-v][-d][--groupwrite][--mirrorroot RootDir]
533 [--statusfile Filename][--failurefile Filename]
534 [--pipecrash Filename][--logfile Filename]
535 [--last LastFilesetId][--select_fsid Regex]
536 [--select_targetid Regex][--select_stage Regex]
537 [--select_fstag Regex][--select_label Regex]
538 [--select_filter Regex][--select_distgroup Regex]
539 [--select_datagroup Regex][--select_tarball Regex]
540
541 where:
542 -v requests verbose output
543 -d requests debugging output
544 --groupwrite requests that the imported files be made
545 group writable. The algorithm isn't terribly efficient
546 but can be convenient.
547 --mirrorroot specifies the directory to store the incoming
548 distribution (default %s)
549 --statusfile specifies the name of the .csv file storing
550 information about the last downloaded fileset in each
551 distribution (default %s)
552 --failurefile specifies the name of the .csv file storing
553 information about filesets for which downloading has failed
554 (default %s)
555 --pipecrash specifies the name of the Python .pkl file in which
556 the contents of the working download queue are to be stored
557 in case of an interrupt (default %s)
558 --logfile specifies the name of the log (default %s)
559 --last specifies a filesetID at which to stop downloading
560 (default %s)
561 --select_... options can be used to specify a regular expression
562 to check against the corresponding column of the datastore
563 index. The options correspond to the following columns:
564
565 ..._fsid filesetID
566 ..._targetid target_id
567 ..._stage stage
568 ..._fstag fs_tag
569 ..._filter filter
570 ..._label label or dist_group or data_group
571 ..._distgroup dist_group or label or data_group
572 ..._datagroup data_group or dist_group or label
573 ..._tarball match against .tgz files before downloading
574
575 Any index entry for which a regex is specified must match that
576 regex to be downloaded. The options which come in multiple
577 versions arise because of a backward compatibility issue.
578
579 The regex for tarballs is a little different, in that matches
580 are checked at the stage of downloading the individual files
581 of a fileset. The condition applies only to .tgz files, not
582 text or metadata. This is useful for selecting a specific
583 skycell, for example.
584
585 NOTE: if you 'kill' this script (with SIGTERM, the default) it will
586 attempt to clean up and take good notes before it dies. This can
587 take a while as the worker threads finish downloading. If you must
588 kill it immediately, use 'kill -9'.
589 """%(os.path.basename(sys.argv[0]),
590 mirrorRootDir, productStatusTableFilename, failureTableFilename,
591 leftoverWorkFilename, logFileName, lastFilesetId)
592
593def main():
594 global verbose, debug, groupWriteFlag, gblKeepRunning
595 global mirrorRootDir, productStatusTableFilename, logFileName
596 global failureTableFilename, leftoverWorkFilename, lastFilesetId
597 global pendingDownloadQueue, failedDownloadQueue
598 global totalKBytesDownloaded, lockTotalKBytesDownloaded
599 global indexColumnFilterDict, selectTarballRegex
600
601 try:
602 (opts,pargs) = getopt.getopt(sys.argv[1:],"vdh",["groupwrite",
603 "mirrorroot=",
604 "statusfile=",
605 "failurefile=",
606 "logfile=",
607 "pipecrash=",
608 "last=","help",
609 "select_fsid=",
610 "select_targetid=",
611 "select_stage=",
612 "select_fstag=",
613 "select_label=",
614 "select_distgroup=",
615 "select_filter=",
616 "select_datagroup=",
617 "select_tarball="])
618 except:
619 sys.exit("%s: Invalid command line parameter" % sys.argv[0])
620
621 # Parse args
622 for a,b in opts:
623 if a=="-h" or a=="--help":
624 describeSelf()
625 sys.exit(0)
626 if a=="-v":
627 verbose= 1
628 if a=="-d":
629 debug= 1
630 if a=="--groupwrite":
631 groupWriteFlag= 1
632 if a=="--mirrorroot":
633 mirrorRootDir= b
634 if a=="--statusfile":
635 productStatusTableFilename= b
636 if a=="--failurefile":
637 failureTableFilename= b
638 if a=="--pipecrash":
639 leftoverWorkFilename= b
640 if a=="--logfile":
641 logFileName= b
642 if a=="--last":
643 lastFilesetId= b
644 if a=="--select_fsid":
645 indexColumnFilterDict["filesetID"]= b
646 if a=="--select_targetid":
647 indexColumnFilterDict["target_id"]= b
648 if a=="--select_stage":
649 indexColumnFilterDict["stage"]= b
650 if a=="--select_fstag":
651 indexColumnFilterDict["fs_tag"]= b
652 if a=="--select_label" \
653 or a=="--select_distgroup" \
654 or a=="--select_datagroup":
655 indexColumnFilterDict["label"]= b
656 indexColumnFilterDict["dist_group"]= b
657 indexColumnFilterDict["data_group"]= b
658 if a=="--select_filter":
659 indexColumnFilterDict["filter"]= b
660 if a=="--select_tarball":
661 selectTarballRegex= b
662
663 if len(pargs)>0:
664 describeSelf()
665 sys.exit(-1)
666
667 logMessage("####### Starting #######")
668
669 # Check any regular expressions provided
670 regex= None
671 try:
672 for k,regex in indexColumnFilterDict.items():
673 indexColumnFilterDict[k]= re.compile(regex)
674 if selectTarballRegex is not None:
675 regex= selectTarballRegex # to get error message correct
676 selectTarballRegex= re.compile(regex)
677 except Exception,e:
678 sys.exit("<%s> is not a valid regular expression: %s"%(regex,str(e)))
679
680 productStatusTable= getProductStatusTable()
681 failureList= getFailureList()
682 if debug: print failureList
683
684 # This will allow us to exit in an orderly way if someone
685 # kills the process.
686 signal.signal(signal.SIGTERM, handler)
687
688 pendingDownloadQueue= Queue.Queue()
689 failedDownloadQueue= Queue.Queue()
690 lockTotalKBytesDownloaded= threading.Lock()
691 gblKeepRunning= True
692 threadsThatAreNotWorkers= threading.activeCount()
693 try:
694 for i in range(numDownloadThreads):
695 t= threading.Thread(target=workerRun)
696 t.setDaemon(True)
697 t.start()
698
699 # Try to get any downloads that we failed to get last time
700 for failDict in failureList:
701 pendingDownloadQueue.put((failDict['productID'],failDict,0))
702
703 # Generate the top-level index
704 topIndexLines= readCmdOutputToList("wget -a %s -nv -O - %s/index.txt"%
705 (logFileName,topURI))
706 topIndex= parseIndexList(topIndexLines)
707 for dist in hiddenProductList:
708 topIndex.append({'productID':dist,'type':'dump'})
709 productTable= {}
710 for dict in topIndex:
711 productTable[dict['productID']]= dict
712
713 for product in desiredProductList:
714 if productTable.has_key(product):
715 fetchUpdates(product, productStatusTable)
716
717 # Queue.join() seems to be immune to interrupts.
718 while pendingDownloadQueue.qsize()>0:
719 time.sleep(0.1)
720 except IOError,e:
721 logMessage("IOError or signal in main: %s"%e)
722 gblKeepRunning= False # bail on processing
723
724 # Stall while the workers finish their tasks and exit
725 while threading.activeCount()>threadsThatAreNotWorkers:
726 time.sleep(0.1)
727
728 # Freeze a copy of the elements left in the work queue-
729 # this also empties the queue so queue.join() works below
730 remainingPipeWork= []
731 while not pendingDownloadQueue.empty():
732 remainingPipeWork.append( pendingDownloadQueue.get() )
733 pendingDownloadQueue.task_done()
734
735 if len(remainingPipeWork)>0:
736 pklOut= open(leftoverWorkFilename,"w")
737 cPickle.dump(remainingPipeWork, pklOut)
738 pklOut.close()
739
740 finally:
741 pendingDownloadQueue.join()
742
743 gblKeepRunning= False # might as well cause workers to terminate
744
745 # Save update state and failure list
746 writeProductStatusTable(productStatusTable)
747 failureList= []
748 while not failedDownloadQueue.empty():
749 prod, dict, nTries= failedDownloadQueue.get()
750 dict['productID']= prod
751 failureList.append(dict)
752 failedDownloadQueue.task_done()
753 writeFailureList(failureList)
754
755
756 # And a final message on the way out
757 logMessage("Exiting; downloaded %d KBytes since startup"%\
758 totalKBytesDownloaded)
759
760############
761# Main hook
762############
763
764if __name__=="__main__":
765 main()
766