- Timestamp:
- May 5, 2011, 10:05:10 AM (15 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/eam_branches/ipp-20110404/ippToPsps/jython/batch.py
r31117 r31439 4 4 import datetime 5 5 import re 6 import sys 7 import os 8 import md5 9 import shutil 10 import logging 11 from subprocess import call, PIPE, Popen 12 13 from datastore import Datastore 14 from scratchdb import ScratchDb 15 from gpc1db import Gpc1Db 16 from ipptopspsdb import IppToPspsDb 6 17 7 18 from java.lang import * 8 19 from java.sql import * 9 10 20 from xml.etree.ElementTree import ElementTree, Element, tostring 21 22 ''' 23 Base class of all batch types. 24 ''' 11 25 class Batch(object): 12 26 13 driverName="com.mysql.jdbc.Driver"14 15 27 ''' 16 28 Constructor 17 ''' 18 def __init__(self, batchType, inputFitsPath, outputFitsPath, dbHost, dbName, dbUser, dbPass, survey=""): 29 30 >>> batch = Batch(1,2,3,4,5,6,7) 31 >>> print batch.pspsVoTableFilePath 32 "../config/2/tables.vot" 33 ''' 34 def __init__(self, logger, batchType, inputFitsPath="", survey="", useFullTables=False): 35 36 # set up logging 37 self.logger = logger 38 self.logger.info("-------------------------------------------------------------------------------") 39 self.logger.debug("Batch class constructor") 19 40 20 41 # set up class variables 42 self.batchType = batchType; 21 43 self.pspsVoTableFilePath = "../config/" + batchType + "/tables.vot" 22 44 self.inputFitsPath = inputFitsPath 23 self.outputFitsPath = outputFitsPath24 self.dbHost = dbHost25 self.dbName = dbName26 self.dbUser = dbUser27 self.dbPass = dbPass28 45 self.survey = survey 29 30 # set up JDBC connection 31 self.url = "jdbc:mysql://"+self.dbHost+"/"+self.dbName+"?user="+self.dbUser+"&password="+self.dbPass 32 self.con = DriverManager.getConnection(self.url) 33 self.stmt = self.con.createStatement() 34 35 # get survey ID from init table 36 sql = "SELECT surveyID from Survey WHERE name = '" + survey + "'" 37 try: 38 rs = self.stmt.executeQuery(sql) 39 rs.first() 40 self.surveyID = rs.getInt(1) 41 except: 42 self.log("No survey ID found for this survey: '" + survey + "'") 46 self.useFullTables = useFullTables 47 48 # TODO 49 self.tablesToExport = [] 50 51 # open config 52 doc = ElementTree(file="config.xml") 53 54 # create Gpc1Db object 55 self.gpc1Db = Gpc1Db(self.logger) 56 self.ippToPspsDb = IppToPspsDb(logger) 57 self.scratchDb = ScratchDb(logger, self.useFullTables) 58 59 if self.survey != "": 60 self.surveyID = self.scratchDb.getSurveyID(self.survey) 61 62 # get dvo info from config 63 dvoName = doc.find("dvo_" + self.survey + "/name").text 64 self.dvoLocation = doc.find("dvo_" + self.survey + "/location").text 65 else: 66 dvoName = "" 67 self.dvoLocation = "" 43 68 self.surveyID = -1; 44 69 70 # get datastore info from config 71 self.datastore = Datastore(self.logger) 72 73 # create a new batch 74 self.batchID = self.ippToPspsDb.createNewBatch( 75 self.getPspsBatchType(), 76 survey, 77 dvoName, 78 self.datastore.product) 79 80 # get local storage location from config 81 self.batchName = "B%08d" % self.batchID 82 self.subDir = doc.find("localOutPath").text + "/" + self.getPspsBatchType() + "/" + dvoName 83 self.localOutPath = self.subDir + "/" + self.batchName 84 if not os.path.exists(self.localOutPath): os.makedirs(self.localOutPath) 85 45 86 # store today's date 46 87 now = datetime.datetime.now(); … … 48 89 49 90 if self.inputFitsPath != "": 50 self.parseFitsHeader() 91 file = open(self.inputFitsPath) 92 self.header = self.parseFitsHeader(file) 93 self.logger.info("Read primary and found " + str(len(self.header)) + " header cards") 94 # TODO close file? 95 96 # create DVO tables if accessing DVO directly 97 if not self.useFullTables: self.scratchDb.createDvoTables() 51 98 52 99 ''' … … 55 102 def __del__(self): 56 103 57 self.log("Batch destructor") 58 self.stmt.close() 59 self.con.close() 60 61 ''' 62 Prints a log message with the current time 63 ''' 64 def log(self, msg): 65 66 print datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " | " + msg 67 68 ''' 69 Updates a table with surveyID 70 ''' 71 def updateSurveyID(self, table): 72 73 sql = "UPDATE " + table + " SET surveyID=%d" % self.surveyID 74 self.stmt.execute(sql) 75 76 ''' 77 Updates a table with filterID grabbed from Filter init table 78 ''' 79 def updateFilterID(self, table): 80 81 sql = "UPDATE "+table+" AS a, Filter AS b SET a.filterID=b.filterID WHERE b.filterType = '" + self.filter + "'" 82 self.stmt.execute(sql) 104 self.logger.debug("Batch destructor") 105 106 107 ''' 108 Returns the value from this dictinary or else NULL 109 ''' 110 def safeDictionaryAccess(self, header, key): 111 112 if key in header: return header[key] 113 else: return "NULL" 114 115 ''' 116 Finds and reads a header extension 117 ''' 118 def findAndReadFITSHeader(self, name, file): 119 120 found = False 121 122 while True: 123 124 index = file.tell() 125 126 record = file.read(80) 127 if not record: break; 128 129 # quit when we reach 'END' 130 if record.startswith("XTENSION= 'IMAGE"): 131 132 header = self.parseFitsHeader(file) 133 if header['EXTNAME'] == name: 134 found = True 135 file.seek(index + 2880, 0) 136 break 137 138 file.seek(index + 2880, 0) 139 140 if found != True: self.logger.error("...could not find extension '" + name + "'") 141 else: self.logger.info("...read header at '" + name + "' and found " + str(len(header)) + " header cards") 142 143 return header 144 145 146 ''' 147 Writes the batch manifest file 148 ''' 149 def writeBatchManifest(self): 150 151 outPath = self.localOutPath + "/BatchManifest.xml" 152 tmpPath = "./tmp.xml" 153 self.logger.info("Creating batch manifest file here: " + outPath) 154 root = Element('manifest') 155 156 # batch information 157 root.attrib['name'] = self.batchName 158 root.attrib['type'] = self.getPspsBatchType() 159 root.attrib['timestamp'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") 160 if self.survey != "": 161 root.attrib['survey'] = self.getBatchFriendlySurveyType() 162 try: self.minObjID 163 except: pass 164 else: root.attrib['minObjId'] = str(self.minObjID) 165 try: self.maxObjID 166 except: pass 167 else: root.attrib['maxObjId'] = str(self.maxObjID) 168 169 # get md5sum 170 p = Popen("md5sum " + self.outputFitsPath, shell=True, stdout=PIPE) 171 p.wait() 172 out = p.stdout.read() 173 md5sum = out[0:out.rfind(" ")] 174 175 # get file size 176 fileSize = os.path.getsize(self.outputFitsPath) 177 178 # file information 179 child = Element('file') 180 root.append(child) 181 child.attrib['name'] = self.outputFitsFile 182 child.attrib['bytes'] = str(fileSize) 183 child.attrib['md5'] = md5sum 184 185 # now create doc and write to file 186 file = open(tmpPath, 'w') 187 ElementTree(root).write(file) 188 file.close() 189 190 # clunky way to prettify XML 191 p = Popen("xmllint --format " + tmpPath + " > " + outPath, shell=True, stdout=PIPE) 192 p.wait() 193 os.remove(tmpPath) 194 195 196 ''' 197 tar and zips batch directory 198 ''' 199 def createTarball(self): 200 201 # set up filenams and paths 202 tarFile = self.batchName + ".tar" 203 tarPath = self.subDir + "/" + tarFile 204 205 self.tarballFile = tarFile + ".gz" 206 tarballPath = self.subDir + "/" + self.tarballFile 207 208 # tar directory 209 p = Popen("tar -cvf " + tarPath + "\ 210 -C " + self.subDir + " \ 211 " + self.batchName, shell=True, stdout=PIPE) 212 p.wait() 213 214 # zip tar archive 215 p = Popen("gzip -c " + tarPath + " > " + tarballPath, shell=True, stdout=PIPE) 216 p.wait() 217 218 # delete tar file and original directory 219 os.remove(tarPath) 220 shutil.rmtree(self.localOutPath) 221 222 ''' 223 Publishes this batch to the datastore 224 ''' 225 def publishToDatastore(self): 226 227 if self.datastore.publish(self.batchName, self.subDir, self.tarballFile, "tgz"): 228 self.ippToPspsDb.updateLoadedToDatastore(self.batchID, 1) 229 230 ''' 231 Gets PSPS-friendly survey type 232 ''' 233 def getBatchFriendlySurveyType(self): 234 235 return "SCR" # TODO 236 237 try: 238 self.survey 239 except: 240 return "NA" 241 242 if self.survey == "3PI": return "3PI" 243 elif self.survey == "MD04": return "MD4" 244 else: 245 self.logger.error("Don't know this survey: '" + self.survey + "'") 246 return "NA" 247 248 ''' 249 Gets PSPS friendly batch type 250 ''' 251 def getPspsBatchType(self): 252 253 if self.batchType == "init": return "IN" 254 elif self.batchType == "detection": return "P2" 255 elif self.batchType == "stack": return "ST" 256 else: self.logger.error("Don't know this batch type: " + self.survey) 257 258 ''' 259 Sets min and max obj ID using the provided table, or list of tables 260 ''' 261 def setMinMaxObjID(self, tables): 262 263 first = True 264 for table in tables: 265 266 sql = "SELECT MIN(objID), MAX(objID) FROM " + table 267 rs = self.scratchDb.stmt.executeQuery(sql) 268 rs.first() 269 270 if first: 271 self.minObjID = rs.getLong(1) 272 self.maxObjID = rs.getLong(2) 273 else: 274 if rs.getLong(1) < self.minObjID: self.minObjID = rs.getLong(1) 275 if rs.getLong(2) > self.maxObjID: self.maxObjID = rs.getLong(2) 276 277 first = False 278 279 self.ippToPspsDb.updateMinMaxObjID(self.batchID, self.minObjID, self.maxObjID) 83 280 84 281 ''' 85 282 Reads FITS header and stores all fields in a dictionary object 86 283 ''' 87 def parseFitsHeader(self): 88 89 fitsFile = open(self.inputFitsPath) 90 91 self.header = {} 284 def parseFitsHeader(self, fitsFile): 285 286 header = {} 92 287 93 288 while (True): 289 94 290 record = fitsFile.read(80) 95 291 96 292 # quit when we reach 'END' 97 if record.startswith("END"): break 98 99 # ignore comments 100 if record.startswith("COMMENT"): continue 101 match = re.match('(.*)=(.*)', record) 293 if re.match('END\s+', record): break 294 295 # this regex will get param/value pairs for all header cards, ignoring comments and parsing out 'HIERACH' prefixes 296 match = re.match('^(HIERARCH )*([a-zA-Z0-9-_\.]+)\s*=\s+\'*([a-zA-Z0-9-_\.:\s@#]+)\'*\\/*', record) 102 297 if match: 103 298 104 # remove HIERARCH prefix 105 param = match.group(1).replace("HIERARCH", "") 106 param = param.strip() 107 108 value = match.group(2) 109 # remove trailing comment after / char, if there is one 110 index = value.find("/") 111 if index != -1: value = value[0:index] 112 113 # remove ' chars around content 114 value = value.replace("'", "") 115 116 # remove leading and trailing whitespace 117 value = value.strip() 118 119 # store in out dictionary object 120 self.header[param] = value 299 param = match.group(2) 300 value = match.group(3).strip() 301 if value == "NaN": value = "NULL" 302 header[param] = value 121 303 122 304 #print param + "|" + value + "|" 305 306 return header 123 307 124 308 ''' … … 129 313 self.pspsTables = stilts.treads(self.pspsVoTableFilePath) 130 314 for table in self.pspsTables: 131 self.log("Creating PSPS table: " + table.name) 132 table.write(self.url + '#' + table.name) 133 134 self.indexPspsTables() 315 self.logger.info("Creating PSPS table: " + table.name) 316 table.write(self.scratchDb.url + '#' + table.name) 317 self.tablesToExport.append(table.name) 318 319 self.alterPspsTables(); 135 320 136 321 ''' … … 138 323 ''' 139 324 def indexIppTables(self): 140 self.log("indexIppTables not implemented") 141 142 143 ''' 144 Adds an index to the supplied table and column 145 ''' 146 def createIndex(self, table, column): 147 148 self.log("Creating index on column '"+column+"' for table '"+table+"'") 149 150 sql = "CREATE INDEX "+table+"_index ON "+table+" ("+column+")" 151 try: 152 self.stmt.execute(sql) 153 except: 154 self.log("Index already in place on '" + column + "' for table '" + table + "'") 155 156 157 ''' 158 Subclass should implement this to index PSPS tables 325 self.logger.warn("indexIppTables not implemented") 326 327 328 ''' 329 Alter PSPS tables 159 330 ''' 160 def indexPspsTables(self):161 self.log ("indexPspsTables not implemented")331 def alterPspsTables(self): 332 self.logger.warn("alterPspsTables not implemented") 162 333 163 334 ''' 164 335 Imports IPP tables from FITS file 165 336 166 Accepts a regular expression filter so not all tabl s need to be imported337 Accepts a regular expression filter so not all tables need to be imported 167 338 ''' 168 339 def importIppTables(self, filter): 169 340 341 self.logger.info("Attempting to import tables from input FITS file") 170 342 tables = stilts.treads(self.inputFitsPath) 171 343 … … 175 347 match = re.match(filter, table.name) 176 348 if not match: continue 177 self.log ("Creating IPP table " + table.name)349 self.logger.info(" Reading IPP table " + table.name + " from FITS file") 178 350 table = stilts.tpipe(table, cmd='explodeall') 351 352 # drop any previous tables before import 353 self.scratchDb.dropTable(table.name) 354 355 # IPP FITS files are littered with infinities, so remove these 356 self.logger.info(" Removing Infinity values from all columns") 357 table = stilts.tpipe(table, cmd='replaceval -Infinity null *') 358 table = stilts.tpipe(table, cmd='replaceval Infinity null *') 359 179 360 try: 180 table.write(self. url + '#' + table.name)361 table.write(self.scratchDb.url + '#' + table.name) 181 362 except: 182 self.log("ERROR problem writing table '" + table.name + "' to the database") 183 363 self.logger.exception(" Problem writing table '" + table.name + "' to the database") 184 364 count = count + 1 185 365 186 self.log ("Imported %d tables from '%s' " % (count, self.inputFitsPath))366 self.logger.info("Done. Imported %d tables" % count) 187 367 188 368 self.indexIppTables() 189 369 190 370 ''' 191 Exports PSPS tables from the database to FITS format 371 Exports PSPS tables from the database to FITS format. Optional regex if you want to alter table names prior to export 192 372 ''' 193 def exportPspsTablesToFits(self ):194 195 self.log ("Exporting all PSPS tables to FITS")373 def exportPspsTablesToFits(self, regex="(.*)"): 374 375 self.logger.info("Replacing NULLs with -999 then exporting all PSPS tables to FITS") 196 376 _tables = [] 197 377 198 self.log(" Selecting database tables") 378 self.logger.info(" Selecting database tables") 379 for table in self.tablesToExport: 380 381 # check for an empty table 382 if self.scratchDb.getRowCount(table) < 1: continue 383 384 # get everything from table 385 _table = stilts.tread(self.scratchDb.url + '#SELECT * FROM ' + table) 386 387 # replace nulls and empty fields with weird PSPS -999 pseudo-null 388 _table = stilts.tpipe(_table, cmd='replaceval "" -999 *') 389 390 match = re.match(regex, table) 391 newTableName = match.group(1) 392 393 # change table names 394 _table = stilts.tpipe(_table, cmd='tablename ' + newTableName) 395 _tables.append(_table) 396 397 self.logger.info(" Writing to FITS file '" + self.outputFitsPath + "'...") 398 stilts.twrites(_tables, self.outputFitsPath, fmt='fits') 399 self.logger.info(" ...done") 400 self.ippToPspsDb.updateProcessed(self.batchID, 1) 401 402 ''' 403 Searches all PSPS tables and reports the columns that are either partially or completely populated with NULLs 404 ''' 405 def reportNullsInAllPspsTables(self, showPartials): 406 199 407 for table in self.pspsTables: 200 _table = stilts.tread(self.url + '#SELECT * FROM ' + table.name) 201 _table = stilts.tpipe(_table, cmd='tablename ' + table.name) 202 _tables.append(_table) 203 204 self.log(" Writing to FITS file " + self.outputFitsPath) 205 stilts.twrites(_tables, self.outputFitsPath, fmt='fits') 206 408 self.scratchDb.reportNulls(table.name, showPartials) 409 410 ''' 411 Searches all PSPS tables and replaces all NULLs with the provided substitute 412 ''' 413 def replaceAllPspsNulls(self, sub): 414 415 self.logger.info("Replacing all NULL values in PSPS tables with '" + sub + "'...") 416 for table in self.pspsTables: 417 self.scratchDb.replaceNulls(table.name, sub) 418 self.logger.info("...done") 207 419 208 420 ''' … … 210 422 ''' 211 423 def populatePspsTables(self): 212 self.log("Not implemented yet") 213 424 self.logger.warn("Not implemented yet") 425 426 ''' 427 Calls DVO program to 'query' DVO database and populate results to local MySQL Db table 428 ''' 429 def getIDsFromDVO(self): 430 431 # TODO path to DVO prog hardcoded temporarily 432 cmd = "../src/dvograbber " + self.dvoLocation 433 self.logger.info("Running: '" + cmd + "'...") 434 p = Popen(cmd, shell=True, stdout=PIPE) 435 p.wait() 436 # out = p.stdout.read() 437 self.logger.info("...done") 438 439 if self.scratchDb.getRowCount("dvoDetection") < 1: 440 self.logger.error("No DVO IDs found") 441 return False 442 443 return True 444 445 ''' 446 Checks whether this batch has already been processed and published. To be implemented by all subclasses 447 ''' 448 def alreadyProcessed(self): 449 self.logger.info("Not implemented") 450 451 452
Note:
See TracChangeset
for help on using the changeset viewer.
