Changeset 31348
- Timestamp:
- Apr 22, 2011, 1:17:20 PM (15 years ago)
- File:
-
- 1 edited
-
trunk/ippToPsps/jython/batch.py (modified) (22 diffs)
Legend:
- Unmodified
- Added
- Removed
-
trunk/ippToPsps/jython/batch.py
r31322 r31348 12 12 13 13 from datastore import Datastore 14 from scratchdb import ScratchDb 14 15 from gpc1db import Gpc1Db 15 16 from ipptopspsdb import IppToPspsDb … … 23 24 ''' 24 25 class Batch(object): 25 26 driverName="com.mysql.jdbc.Driver"27 26 28 27 ''' … … 45 44 self.survey = survey 46 45 46 # TODO 47 self.tablesToExport = [] 48 47 49 # open config 48 50 doc = ElementTree(file="config.xml") 49 51 50 # set up JDBC connection to local Db51 dbName = doc.find("localdatabase/name").text52 dbHost = doc.find("localdatabase/host").text53 dbUser = doc.find("localdatabase/user").text54 dbPass = doc.find("localdatabase/password").text55 self.localUrl = "jdbc:mysql://"+dbHost+"/"+dbName+"?user="+dbUser+"&password="+dbPass56 self.localCon = DriverManager.getConnection(self.localUrl)57 self.localStmt = self.localCon.createStatement()58 59 52 # create Gpc1Db object 60 53 self.gpc1Db = Gpc1Db(self.logger) 54 self.ippToPspsDb = IppToPspsDb(logger) 55 self.scratchDb = ScratchDb(logger) 61 56 62 57 if self.survey != "": 63 64 # get survey ID from init table 65 sql = "SELECT surveyID FROM Survey WHERE name = '" + self.survey + "'" 66 try: 67 rs = self.localStmt.executeQuery(sql) 68 rs.first() 69 self.surveyID = rs.getInt(1) 70 except: 71 self.logger.exception("No survey ID found for this survey: '" + self.survey + "'") 72 self.surveyID = -1; 58 self.surveyID = self.scratchDb.getSurveyID(self.survey) 73 59 74 60 # get dvo info from config 75 dvoName = doc.find("dvo_" +survey+"/name").text76 self.dvoLocation = doc.find("dvo_" +survey+"/location").text61 dvoName = doc.find("dvo_" + self.survey + "/name").text 62 self.dvoLocation = doc.find("dvo_" + self.survey + "/location").text 77 63 else: 78 64 dvoName = "" … … 83 69 self.datastore = Datastore(self.logger) 84 70 85 # create IppToPspsDb object and create a new batch 86 self.ippToPspsDb = IppToPspsDb(logger) 87 self.batchID = self.ippToPspsDb.createNewBatch(66, 88 survey, 71 # create a new batch 72 self.batchID = self.ippToPspsDb.createNewBatch( 89 73 self.getPspsBatchType(), 74 survey, 90 75 dvoName, 91 76 self.datastore.product) … … 97 82 if not os.path.exists(self.localOutPath): os.makedirs(self.localOutPath) 98 83 99 100 84 # store today's date 101 85 now = datetime.datetime.now(); … … 109 93 110 94 # create DVO table 111 self.createDvoTables() 112 113 114 ''' 115 Gets photcode (aka photoCalID from dvo table) 116 ''' 117 def getPhotoCalID(self): 118 119 photcode = -1 120 121 sql = "SELECT photcode FROM dvoMeta" 122 try: 123 rs = self.localStmt.executeQuery(sql) 124 rs.first() 125 photcode = rs.getInt(1) 126 except: 127 self.logger.exception("Unable to get photcode from dvo table") 128 129 130 return photcode 95 self.scratchDb.createDvoTables() 131 96 132 97 ''' … … 136 101 137 102 self.logger.debug("Batch destructor") 138 self.localStmt.close() 139 self.localCon.close() 103 104 105 ''' 106 Returns the value from this dictinary or else NULL 107 ''' 108 def safeDictionaryAccess(self, header, key): 109 110 if key in header: return header[key] 111 else: return "NULL" 140 112 141 113 ''' 142 114 Finds and reads a header extension 143 115 ''' 144 def findAndReadFITSHeader(self, name): 145 146 self.logger.info("Searching for header extension: '" + name + "'...") 147 148 file = open(self.inputFitsPath, 'r') 149 150 index = 0 116 def findAndReadFITSHeader(self, name, file): 117 151 118 found = False 119 152 120 while True: 153 154 file.seek(index, 0)121 122 index = file.tell() 155 123 156 124 record = file.read(80) 157 125 if not record: break; 158 159 header = {}160 126 161 127 # quit when we reach 'END' … … 165 131 if header['EXTNAME'] == name: 166 132 found = True 167 break 133 file.seek(index + 2880, 0) 134 break 135 136 file.seek(index + 2880, 0) 168 137 169 index = index + 2880170 171 138 if found != True: self.logger.error("...could not find extension '" + name + "'") 172 139 else: self.logger.info("...read header at '" + name + "' and found " + str(len(header)) + " header cards") 173 140 174 # TODO close file?175 141 return header 176 142 … … 257 223 def publishToDatastore(self): 258 224 259 self.datastore.publish(self.batchName, self.subDir, self.tarballFile, "tgz")260 # TODO update ippToPsps Db here225 if self.datastore.publish(self.batchName, self.subDir, self.tarballFile, "tgz"): 226 self.ippToPspsDb.updateLoadedToDatastore(self.batchID, 1) 261 227 262 228 ''' … … 264 230 ''' 265 231 def getBatchFriendlySurveyType(self): 232 233 return "SCR" # TODO 266 234 267 235 try: … … 286 254 else: self.logger.error("Don't know this batch type: " + self.survey) 287 255 288 289 ''' 290 Prints a log message with the current time 291 ''' 292 def log(self, msg): 293 294 print datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " | " + msg 295 296 ''' 297 Sets min and max obj ID using the provided table 298 ''' 299 def setMinMaxObjID(self, table): 300 301 sql = "SELECT MIN(objID), MAX(objID) FROM " + table 302 print 303 rs = self.localStmt.executeQuery(sql) 304 rs.first() 305 self.minObjID = rs.getLong(1) 306 self.maxObjID = rs.getLong(2) 307 308 ''' 309 Drops a table 310 ''' 311 def dropTable(self, table): 312 313 sql = "DROP TABLE " + table 314 try: self.localStmt.execute(sql) 315 except: return 316 317 ''' 318 Updates a table with surveyID 319 ''' 320 def updateSurveyID(self, table): 321 322 sql = "UPDATE " + table + " SET surveyID=%d" % self.surveyID 323 self.localStmt.execute(sql) 324 325 ''' 326 Updates a table with filterID grabbed from Filter init table 327 ''' 328 def updateFilterID(self, table): 329 330 sql = "UPDATE "+table+" AS a, Filter AS b SET a.filterID=b.filterID WHERE b.filterType = '" + self.filter + "'" 331 self.localStmt.execute(sql) 256 ''' 257 Sets min and max obj ID using the provided table, or list of tables 258 ''' 259 def setMinMaxObjID(self, tables): 260 261 first = True 262 for table in tables: 263 264 sql = "SELECT MIN(objID), MAX(objID) FROM " + table 265 rs = self.scratchDb.stmt.executeQuery(sql) 266 rs.first() 267 268 if first: 269 self.minObjID = rs.getLong(1) 270 self.maxObjID = rs.getLong(2) 271 else: 272 if rs.getLong(1) < self.minObjID: self.minObjID = rs.getLong(1) 273 if rs.getLong(2) > self.maxObjID: self.maxObjID = rs.getLong(2) 274 275 first = False 276 277 self.ippToPspsDb.updateMinMaxObjID(self.batchID, self.minObjID, self.maxObjID) 332 278 333 279 ''' … … 351 297 param = match.group(2) 352 298 value = match.group(3).strip() 299 if value == "NaN": value = "NULL" 353 300 header[param] = value 354 301 … … 365 312 for table in self.pspsTables: 366 313 self.logger.info("Creating PSPS table: " + table.name) 367 table.write(self.localUrl + '#' + table.name) 314 table.write(self.scratchDb.url + '#' + table.name) 315 self.tablesToExport.append(table.name) 368 316 369 317 self.indexPspsTables() … … 375 323 self.logger.warn("indexIppTables not implemented") 376 324 377 378 '''379 Adds an index to the supplied table and column380 '''381 def createIndex(self, table, column):382 383 self.logger.info("Creating index on column '"+column+"' for table '"+table+"'")384 385 sql = "CREATE INDEX "+table+"_index ON "+table+" ("+column+")"386 try:387 self.localStmt.execute(sql)388 except: pass389 #self.logger.warn("Index already in place on '" + column + "' for table '" + table + "'")390 391 392 325 ''' 393 326 Subclass should implement this to index PSPS tables … … 420 353 421 354 try: 422 table.write(self. localUrl + '#' + table.name)355 table.write(self.scratchDb.url + '#' + table.name) 423 356 except: 424 357 self.logger.exception(" Problem writing table '" + table.name + "' to the database") 425 426 358 count = count + 1 427 359 … … 439 371 440 372 self.logger.info(" Selecting database tables") 441 for table in self. pspsTables:373 for table in self.tablesToExport: 442 374 443 375 # check for an empty table 444 if self. getRowCount(table.name) < 1: continue376 if self.scratchDb.getRowCount(table) < 1: continue 445 377 446 378 # get everything from table 447 _table = stilts.tread(self. localUrl + '#SELECT * FROM ' + table.name)448 449 self.logger.info(" Replacing NULLs with weird PSPS -999 constant for " + table .name)379 _table = stilts.tread(self.scratchDb.url + '#SELECT * FROM ' + table) 380 381 self.logger.info(" Replacing NULLs with weird PSPS -999 constant for " + table) 450 382 451 383 # replace nulls and empty fields with weird PSPS -999 pseudo-null … … 453 385 454 386 # change table names 455 _table = stilts.tpipe(_table, cmd='tablename ' + table .name)387 _table = stilts.tpipe(_table, cmd='tablename ' + table) 456 388 _tables.append(_table) 457 389 … … 459 391 stilts.twrites(_tables, self.outputFitsPath, fmt='fits') 460 392 self.logger.info(" ...done") 461 462 463 ''' 464 Returns a list of column names for this table 465 ''' 466 def getColumnNames(self, tableName): 467 468 sql = "SHOW COLUMNS FROM " + tableName 469 rs = self.localStmt.executeQuery(sql) 470 columns = [] 471 while (rs.next()): columns.append(rs.getString(1)) 472 rs.close() 473 474 return columns 475 476 ''' 477 Replaces all NULL values in the provided table with the prvoded substitute 478 ''' 479 def replaceNulls(self, tableName, sub): 480 481 # get list of columns 482 columns = self.getColumnNames(tableName) 483 484 # now loop through all columns and replace all NULLs with sub 485 for column in columns: 486 487 sql = "UPDATE " + tableName + " SET " + column + " = " + sub + " WHERE " + column + " IS NULL" 488 self.localStmt.execute(sql) 489 490 491 ''' 492 Searches a table and reports the columns that are either partially or completely populated with NULLs 493 ''' 494 def reportNulls(self, tableName, showPartials): 495 496 # first, count rows 497 sql = "SELECT COUNT(*) FROM " + tableName 498 rs = self.localStmt.executeQuery(sql) 499 rs.first() 500 numRows = rs.getInt(1) 501 502 # get list of columns 503 columns = self.getColumnNames(tableName) 504 505 print "+----------------------+---------------+" 506 print "| %25s |" % tableName 507 print "+----------------------+---------------+" 508 509 # now see which columns are full of NULLS, with are partially NULL 510 for column in columns: 511 512 sql = "SELECT COUNT(*) FROM " + tableName + " WHERE " + column + " IS NULL" 513 rs = self.localStmt.executeQuery(sql) 514 rs.first() 515 if rs.getInt(1) == numRows: 516 print "| %20s | all NULL |" % column 517 elif showPartials and rs.getInt(1) > 0: 518 print "| %20s | partial NULL |" % column 519 rs.close() 520 print "+----------------------+---------------+" 521 393 self.ippToPspsDb.updateProcessed(self.batchID, 1) 522 394 523 395 ''' … … 527 399 528 400 for table in self.pspsTables: 529 self. reportNulls(table.name, showPartials)401 self.scratchDb.reportNulls(table.name, showPartials) 530 402 531 403 ''' … … 536 408 self.logger.info("Replacing all NULL values in PSPS tables with '" + sub + "'...") 537 409 for table in self.pspsTables: 538 self. replaceNulls(table.name, sub)410 self.scratchDb.replaceNulls(table.name, sub) 539 411 self.logger.info("...done") 540 412 … … 546 418 547 419 ''' 548 Updates provided table with DVO IDs from DVO table549 '''550 def updateDvoIDs(self, table):551 552 self.logger.info("Not implemented in base-class")553 554 '''555 Creates a table for for ID matching556 '''557 def createDvoTables(self):558 559 self.logger.info("Creating DVO meta and detection tables")560 sql = "DROP TABLE dvoMeta"561 try: self.localStmt.execute(sql)562 except: pass563 564 sql = "DROP TABLE dvoDetection"565 try: self.localStmt.execute(sql)566 except: pass567 568 sql = "CREATE TABLE dvoMeta ( \569 flags INT, \570 photcode INT \571 )"572 573 try: self.localStmt.execute(sql)574 except:575 self.logger.error("Unable to create DVO meta-data database tablei")576 577 sql = "CREATE TABLE dvoDetection ( \578 ippDetectID BIGINT PRIMARY KEY, \579 detectID BIGINT, \580 ippObjID BIGINT, \581 objID BIGINT \582 )"583 584 try: self.localStmt.execute(sql)585 except:586 self.logger.error("Unable to create DVO detection database table")587 588 '''589 Returns a row count for this table590 '''591 def getRowCount(self, table):592 593 sql = "SELECT COUNT(*) FROM " + table594 try:595 rs = self.localStmt.executeQuery(sql)596 rs.first()597 return rs.getInt(1)598 except:599 self.logger.exception("Could not count rows for table: '" + table + "'")600 return -1601 602 603 '''604 420 Calls DVO program to 'query' DVO database and populate results to local MySQL Db table 605 421 ''' 606 def getIDsFromDVO(self , sourceID, imageID):422 def getIDsFromDVO(self): 607 423 608 424 # TODO path to DVO prog hardcoded temporarily 609 cmd = "../src/dvo %s %s %s" % (self.dvoLocation, sourceID, imageID)425 cmd = "../src/dvograbber " + self.dvoLocation 610 426 self.logger.info("Running: '" + cmd + "'...") 611 427 p = Popen(cmd, shell=True, stdout=PIPE) … … 614 430 self.logger.info("...done") 615 431 616 if self. getRowCount("dvoDetection") < 1:432 if self.scratchDb.getRowCount("dvoDetection") < 1: 617 433 self.logger.error("No DVO IDs found") 618 434 return False
Note:
See TracChangeset
for help on using the changeset viewer.
