- Timestamp:
- May 4, 2011, 3:20:38 PM (15 years ago)
- Location:
- branches/czw_branch/20110406
- Files:
-
- 2 edited
-
. (modified) (1 prop)
-
ippToPsps/jython/stackbatch.py (modified) (25 diffs)
Legend:
- Unmodified
- Added
- Removed
-
branches/czw_branch/20110406
- Property svn:mergeinfo changed
-
branches/czw_branch/20110406/ippToPsps/jython/stackbatch.py
r31253 r31434 1 1 #!/usr/bin/env jython 2 3 import os.path 4 import sys 2 5 3 6 import stilts … … 9 12 import logging.config 10 13 14 ''' 15 StackBatch class 16 ''' 11 17 class StackBatch(Batch): 12 18 … … 14 20 Constructor 15 21 ''' 16 def __init__(self, logger, skyID, inputFile ):22 def __init__(self, logger, skyID, inputFile, stackType, useFullTables=False): 17 23 super(StackBatch, self).__init__( 18 24 logger, 19 25 "stack", 20 26 inputFile, 21 "MD04") # TODO 22 23 self.logger.info("StackBatch constructor. Creating batch from: ''" + inputFile + "'") 24 25 # obs time makes no sense except for nightly stacks 26 if self.header['STK_TYPE'] != "NIGHTLY_STACK": self.header['MJD-OBS'] = "-999" 27 28 # determine skycell from header value 29 self.skycell = self.header['SKYCELL'] 30 self.skycell = self.skycell[8:] 27 "MD04", 28 useFullTables) # TODO 29 30 self.logger.info("StackBatch constructor. Creating batch from: '" + inputFile + "'") 31 32 self.skyID = skyID 31 33 32 34 # get filterID using init table … … 34 36 self.filter = self.filter[0:1] 35 37 38 self.stackType = stackType 39 meta = self.gpc1Db.getStackStageMeta(self.skyID, self.header['FPA.FILTER']) 40 if len(meta) < 1: return 41 self.stackID = meta[0]; 42 self.skycell = meta[1]; 43 44 # determine skycell from header value 45 #self.skycell = "skycell.34" #= self.header['SKYCELL'] 46 self.skycell = self.skycell[8:] 47 48 self.logger.info("Processing stack with ID: %d, type: %s and skycell: %s filter: %s" % (self.stackID, self.stackType, self.skycell, self.filter)) 49 50 51 # delete PSPS tables 52 self.scratchDb.dropTable("StackMeta") 53 self.scratchDb.dropTable("StackDetection") 54 self.scratchDb.dropTable("StackModelFit") 55 self.scratchDb.dropTable("StackApFlx") 56 self.scratchDb.dropTable("StackToImage") 57 self.scratchDb.dropTable("SkinnyObject") 58 self.scratchDb.dropTable("ObjectCalColor") 59 60 # delete IPP tables 61 #self.scratchDb.dropTable("SkyChip_psf") 62 #self.scratchDb.dropTable("SkyChip_xsrc") 63 #self.scratchDb.dropTable("SkyChip_xfit") 64 #self.scratchDb.dropTable("SkyChip_xrad") 65 66 self.logger.info("Stack type: " + self.safeDictionaryAccess(self.header, self.stackType)) 67 # obs time makes no sense except for nightly stacks 68 #if self.header['STK_TYPE'] != "NIGHTLY_STACK": self.header['MJD-OBS'] = "-999" 69 36 70 # create an output filename, which is {filterID}{skycellID}.FITS 37 self.outputFitsFile = "%s%07d.FITS" % (self.filter, int(self.skycell)) ;38 self.outputFitsPath = "%s/%s" % (self.localOutPath, self.outputFitsFile) ;71 self.outputFitsFile = "%s%07d.FITS" % (self.filter, int(self.skycell)) 72 self.outputFitsPath = "%s/%s" % (self.localOutPath, self.outputFitsFile) 39 73 40 74 # set some constants 41 75 self.dataRelease = "1" 42 self.stackVer = "1"43 76 self.historyModNum = "0" 44 77 45 46 ''' 47 Updates a table with data release number 48 ''' 49 def updateDataRelease(self, table): 50 51 sql = "UPDATE " + table + " SET dataRelease=" + self.dataRelease 52 self.localStmt.execute(sql) 78 # insert what we know about this stack batch into the stack table 79 self.ippToPspsDb.insertStackMeta(self.batchID, self.skyID, self.stackID, self.filter, self.stackType) 80 81 # insert sourceID/imageID combo so DVO can look it up 82 if not self.useFullTables: 83 self.scratchDb.insertNewDvoImage(self.header['SOURCEID'], self.header['IMAGEID']) 53 84 54 85 ''' … … 57 88 def updateStackMetaID(self, table): 58 89 59 sql = "UPDATE " + table + " SET stackMetaID=" + s elf.header['STK_ID']60 self. localStmt.execute(sql)90 sql = "UPDATE " + table + " SET stackMetaID=" + str(self.stackID) 91 self.scratchDb.stmt.execute(sql) 61 92 62 93 ''' … … 65 96 def updateStackTypeID(self, table): 66 97 67 sql = "UPDATE "+table+" AS a, StackType AS b SET a.stackTypeID=b.stackTypeID WHERE b.name = '" +self.header['STK_TYPE']+"'"68 self. localStmt.execute(sql)98 sql = "UPDATE "+table+" AS a, StackType AS b SET a.stackTypeID=b.stackTypeID WHERE b.name = '" + self.stackType + "'" 99 self.scratchDb.stmt.execute(sql) 69 100 70 101 … … 117 148 WHERE a.ippDetectID=b.IPP_IDET AND b.PSF_FWHM "+psfCondition 118 149 119 self. localStmt.execute(sql)150 self.scratchDb.stmt.execute(sql) 120 151 121 152 ''' … … 165 196 WHERE a.ippDetectID=b.IPP_IDET AND b.MODEL_TYPE = '"+ippModelType+"'" 166 197 167 self. localStmt.execute(sql)198 self.scratchDb.stmt.execute(sql) 168 199 169 200 # sersic fit has an extra parameter … … 177 208 "+modelLong+"Covar68=b.EXT_COVAR_05_07, \ 178 209 "+modelLong+"Covar78=b.EXT_COVAR_06_07, \ 179 "+modelLong+"Covar88=b.EXT_COVAR_07_07 \ 210 "+modelLong+"Covar88=b.EXT_COVAR_07_07, \ 211 "+"serNu=b.EXT_PAR_07, \ 212 "+"serNuErr=SQRT(EXT_COVAR_07_07) \ 180 213 WHERE a.ippDetectID=b.IPP_IDET AND b.MODEL_TYPE = '"+ippModelType+"'" 181 214 182 self. localStmt.execute(sql)215 self.scratchDb.stmt.execute(sql) 183 216 184 217 … … 192 225 stackMetaID \ 193 226 ,skycellID \ 227 ,photoCalID \ 194 228 ,photoZero \ 195 ,nP2Images \ 229 ,expTime \ 230 ,psfModelID \ 196 231 ,ctype1 \ 197 232 ,ctype2 \ … … 207 242 ,pc002002 \ 208 243 ) VALUES ( \ 209 " + s elf.header['STK_ID']+ " \244 " + str(self.stackID) + " \ 210 245 ," + self.skycell + " \ 246 ," + str(self.scratchDb.getPhotoCalID(self.header['SOURCEID'], self.header['IMAGEID'])) + " \ 211 247 ," + self.header['FPA.ZP'] + " \ 212 ," + self.header['NINPUTS'] + " \ 248 ," + self.header['EXPTIME'] + " \ 249 ,'" + self.safeDictionaryAccess(self.header, 'PSFMODEL') + "' \ 213 250 ,'" + self.header['CTYPE1'] + "' \ 214 251 ,'" + self.header['CTYPE2'] + "' \ … … 224 261 ," + self.header['PC002002'] + " \ 225 262 )" 226 self.localStmt.execute(sql) 227 228 self.updateSurveyID("StackMeta") 229 self.updateFilterID("StackMeta") 263 self.scratchDb.stmt.execute(sql) 264 265 self.scratchDb.updateAllRows("StackMeta", "surveyID", str(self.surveyID)) 266 self.scratchDb.updateFilterID("StackMeta", self.filter) 267 self.scratchDb.updateAllRows("StackMeta", "dataRelease", str(self.dataRelease)) 230 268 self.updateStackTypeID("StackMeta") 231 self.updateDataRelease("StackMeta")232 269 233 270 ''' … … 241 278 ippDetectID \ 242 279 ,skyCellID \ 243 ,stackVer \244 280 ,obsTime \ 245 281 ,xPos \ … … 258 294 ,infoFlag \ 259 295 ,psfCf \ 296 ,momentXX \ 297 ,momentXY \ 298 ,momentYY \ 299 ,momentM3C \ 300 ,momentM3S \ 301 ,momentM4C \ 302 ,momentM4S \ 303 ,momentR1 \ 304 ,momentRH \ 305 ,apMag \ 306 ,apMagErr \ 307 ,kronFlux \ 308 ,kronFluxErr \ 309 ,kronRad \ 310 ,kronRadErr \ 260 311 ,nFrames \ 261 312 ,assocDate \ … … 265 316 IPP_IDET \ 266 317 ," + self.skycell + " \ 267 ," + self.stackVer + " \268 318 ," + self.header['MJD-OBS'] + " \ 269 319 ,X_PSF \ … … 271 321 ,X_PSF_SIG \ 272 322 ,Y_PSF_SIG \ 273 ,P SF_INST_FLUX\274 , PSF_INST_FLUX_SIG\323 ,POW(10.0, (-0.4*PSF_INST_MAG)) / "+self.header['EXPTIME']+" \ 324 ,ABS((PSF_INST_MAG_SIG*(POW(10.0, (-0.4*PSF_INST_MAG)) / "+self.header['EXPTIME']+")) / 1.085736) \ 275 325 ,POW(10.0, (-0.4*PEAK_FLUX_AS_MAG)) / "+self.header['EXPTIME']+" \ 276 326 ,SKY \ … … 282 332 ,FLAGS << 32 | FLAGS2 \ 283 333 ,PSF_QF \ 334 ,MOMENTS_XX \ 335 ,MOMENTS_XY \ 336 ,MOMENTS_YY \ 337 ,MOMENTS_M3C \ 338 ,MOMENTS_M3S \ 339 ,MOMENTS_M4C \ 340 ,MOMENTS_M4S \ 341 ,MOMENTS_R1 \ 342 ,MOMENTS_RH \ 343 ,AP_MAG \ 344 , NULL \ 345 ,KRON_FLUX \ 346 ,KRON_FLUX_ERR \ 347 , NULL \ 348 , NULL \ 284 349 ,N_FRAMES \ 285 , " + self.dateStr + "\350 , '" + self.dateStr + "' \ 286 351 ," + self.historyModNum + " \ 287 352 FROM SkyChip_psf" 288 353 289 self.localStmt.execute(sql) 290 291 self.updateSurveyID("StackDetection") 292 self.updateFilterID("StackDetection") 354 self.scratchDb.stmt.execute(sql) 355 356 self.scratchDb.updateAllRows("StackDetection", "surveyID", str(self.surveyID)) 357 self.scratchDb.updateFilterID("StackDetection", self.filter) 358 self.scratchDb.updateAllRows("StackDetection", "dataRelease", str(self.dataRelease)) 359 self.scratchDb.updateAllRows("StackDetection", "primaryF", "0") 360 self.scratchDb.updateAllRows("StackDetection", "activeFlag", "0") 293 361 self.updateStackMetaID("StackDetection") 294 self.updateDataRelease("StackDetection")295 362 self.updateStackTypeID("StackDetection") 363 self.updateDvoIDs("StackDetection") 364 365 # now delete bad flux 366 self.scratchDb.reportAndDeleteRowsWithNULLS("StackDetection", "instFlux") 367 self.scratchDb.reportAndDeleteRowsWithNULLS("StackDetection", "objID") 368 296 369 297 370 ''' … … 306 379 DISTINCT IPP_IDET \ 307 380 FROM SkyChip_xrad" 308 self.localStmt.execute(sql) 381 382 try: 383 self.scratchDb.stmt.execute(sql) 384 except: return 309 385 310 386 # TODO temporarily loading 1st convolved fluxes into unconvolved fields … … 327 403 ,petR90Err=b.PETRO_RADIUS_90_ERR \ 328 404 WHERE a.ippDetectID=b.IPP_IDET" 329 self.localStmt.execute(sql) 330 331 self.updateSurveyID("StackApFlx") 332 self.updateFilterID("StackApFlx") 405 self.scratchDb.stmt.execute(sql) 406 407 self.scratchDb.updateAllRows("StackApFlx", "surveyID", str(self.surveyID)) 408 self.scratchDb.updateFilterID("StackApFlx", self.filter) 409 self.scratchDb.updateAllRows("StackApFlx", "dataRelease", str(self.dataRelease)) 410 self.scratchDb.updateAllRows("StackApFlx", "primaryF", "0") 411 self.scratchDb.updateAllRows("StackApFlx", "activeFlag", "0") 333 412 self.updateStackMetaID("StackApFlx") 334 self.updateDataRelease("StackApFlx")335 413 self.updateStackTypeID("StackApFlx") 414 self.updateDvoIDs("StackApFlx") 336 415 337 416 ''' … … 343 422 # insert all the detections 344 423 sql = "INSERT INTO StackModelFit (ippDetectID) SELECT DISTINCT IPP_IDET from SkyChip_xfit" 345 self.localStmt.execute(sql) 424 try: 425 self.scratchDb.stmt.execute(sql) 426 except: 427 return 428 346 429 347 430 # populate model parameters … … 353 436 self.updateModelFit("ser", "PS_MODEL_SERSIC") 354 437 355 self.updateSurveyID("StackModelFit") 356 self.updateFilterID("StackModelFit") 438 self.scratchDb.updateAllRows("StackModelFit", "surveyID", str(self.surveyID)) 439 self.scratchDb.updateFilterID("StackModelFit", self.filter) 440 self.scratchDb.updateAllRows("StackModelFit", "dataRelease", str(self.dataRelease)) 441 self.scratchDb.updateAllRows("StackModelFit", "primaryF", "0") 442 self.scratchDb.updateAllRows("StackModelFit", "activeFlag", "0") 357 443 self.updateStackMetaID("StackModelFit") 358 self.updateDataRelease("StackModelFit")359 444 self.updateStackTypeID("StackModelFit") 445 self.updateDvoIDs("StackModelFit") 360 446 361 447 ''' … … 365 451 self.logger.info("Procesing StackToImage table") 366 452 367 sql = "INSERT INTO StackToImage (stackMetaID) VALUES (" + self.header['STK_ID'] + ")" 368 self.localStmt.execute(sql) 453 imageIDs = self.gpc1Db.getImageIDsForThisStackID(self.stackID) 454 455 for imageID in imageIDs: 456 sql = "INSERT INTO StackToImage (stackMetaID, imageID) \ 457 VALUES (\ 458 " + str(self.stackID) + ", " + imageID + ")" 459 self.scratchDb.stmt.execute(sql) 460 461 # now update StackMeta with correct number of inputs 462 sql = "UPDATE StackMeta SET nP2Images = (SELECT COUNT(*) FROM StackToImage)" 463 self.scratchDb.stmt.execute(sql) 464 465 ''' 466 Populates the SkinnyObject table 467 ''' 468 def populateSkinnyObject(self): 469 self.logger.info("Procesing SkinnyObject table") 470 471 sql = "INSERT INTO SkinnyObject (\ 472 objID \ 473 ,ippObjID \ 474 ) \ 475 SELECT \ 476 objID \ 477 ,ippObjID \ 478 FROM StackDetection" 479 self.scratchDb.stmt.execute(sql) 480 481 self.scratchDb.updateAllRows("SkinnyObject", "surveyID", str(self.surveyID)) 482 self.scratchDb.updateAllRows("SkinnyObject", "dataRelease", str(self.dataRelease)) 483 484 ''' 485 Populates the ObjectCalColor table 486 ''' 487 def populateObjectCalColor(self): 488 self.logger.info("Procesing ObjectCalColor table") 489 490 sql = "INSERT INTO ObjectCalColor (\ 491 objID \ 492 ,ippObjID \ 493 ) \ 494 SELECT \ 495 objID \ 496 ,ippObjID \ 497 FROM StackDetection" 498 self.scratchDb.stmt.execute(sql) 499 500 self.scratchDb.updateFilterID("ObjectCalColor", self.filter) 501 self.scratchDb.updateAllRows("ObjectCalColor", "dataRelease", str(self.dataRelease)) 502 369 503 370 504 ''' 371 505 Applies indexes to the PSPS tables 372 506 ''' 373 def indexPspsTables(self): 374 375 self.logger.info("Creating indexes on PSPS tables") 376 self.createIndex("StackDetection", "ippDetectID") 377 self.createIndex("StackApFlx", "ippDetectID") 378 self.createIndex("StackModelFit", "ippDetectID") 507 def alterPspsTables(self): 508 509 self.logger.info("Altering PSPS tables") 510 self.scratchDb.makeColumnUnique("StackDetection", "objID") 511 self.scratchDb.createIndex("StackDetection", "ippDetectID") 512 self.scratchDb.createIndex("StackApFlx", "ippDetectID") 513 self.scratchDb.createIndex("StackModelFit", "ippDetectID") 379 514 380 515 ''' … … 384 519 385 520 self.logger.info("Creating indexes on IPP tables") 386 self.createIndex("SkyChip_psf", "IPP_IDET") 387 self.createIndex("SkyChip_xfit", "IPP_IDET") 388 self.createIndex("SkyChip_xrad", "IPP_IDET") 389 self.createIndex("SkyChip_xsrc", "IPP_IDET") 390 391 ''' 392 ''' 393 def fudgeIDs(self): 394 395 self.logger.info("Creating bogus IDs on all detections") 396 397 sql = "INSERT INTO dvo (ippDetectID) SELECT IPP_IDET FROM SkyChip_psf"; 398 self.localStmt.execute(sql) 399 400 sql = "SELECT ippDetectID FROM dvo" 401 rs = self.localStmt.executeQuery(sql) 402 403 ids = [] 404 while (rs.next()): 405 ids.append(rs.getInt(1)) 406 407 i = 1 408 o = 72010000000000001 409 410 for id in ids: 411 sql = "UPDATE dvo SET ippObjID = %d, objID = %d WHERE ippDetectID = %s" % (i, o, id) 412 self.localStmt.execute(sql) 413 i = i + 1 414 o = o + 1 415 416 self.updateDvoIDs("StackDetection") 417 self.updateDvoIDs("StackApFlx") 418 self.updateDvoIDs("StackModelFit") 419 420 self.setMinMaxObjID("StackDetection") 521 self.scratchDb.createIndex("SkyChip_psf", "IPP_IDET") 522 self.scratchDb.createIndex("SkyChip_xfit", "IPP_IDET") 523 self.scratchDb.createIndex("SkyChip_xrad", "IPP_IDET") 524 self.scratchDb.createIndex("SkyChip_xsrc", "IPP_IDET") 525 526 ''' 527 Updates provided table with DVO IDs from DVO table 528 ''' 529 def updateDvoIDs(self, table): 530 531 imageID = self.scratchDb.getImageIDFromExternID(self.header['SOURCEID'], self.header['IMAGEID']) 532 533 self.logger.info("Updating table '" + table + "' with DVO IDs...") 534 sql = "UPDATE IGNORE " + table + " AS a, dvoDetectionFull AS b SET \ 535 a.ippObjID = b.ippObjID, \ 536 a.stackDetectID = b.detectID, \ 537 a.objID = b.objID \ 538 WHERE a.ippDetectID = b.ippDetectID \ 539 AND b.sourceID = " + self.header['SOURCEID'] + "\ 540 AND b.imageID = " + str(imageID) 541 self.scratchDb.stmt.execute(sql) 542 421 543 422 544 ''' … … 424 546 ''' 425 547 def populatePspsTables(self): 548 549 if not self.useFullTables: 550 if not self.getIDsFromDVO(): 551 return False 426 552 427 553 self.populateStackMeta() … … 430 556 self.populateStackApFlx() 431 557 self.populateStackToImage() 432 558 self.populateSkinnyObject() 559 self.populateObjectCalColor() 560 561 self.setMinMaxObjID(["StackDetection"]) 562 563 return True 564 565 ''' 566 Checks whether this batch has already been processed and published 567 ''' 568 def alreadyProcessed(self): 569 570 return self.ippToPspsDb.alreadyProcessed("stack", "stack_id", self.stackID) 433 571 434 572 logging.config.fileConfig("logging.conf") 435 573 logger = logging.getLogger("stackbatch") 436 sky_id = 299 # TODO 574 logger.info("Starting") 437 575 gpc1Db = Gpc1Db(logger) 438 cmfFiles = gpc1Db.getStackStageCmfs(sky_id) 439 576 stackType = "NIGHTLY_STACK" 577 skyIDs = gpc1Db.getIDsInThisDVODbForThisStage("MD04.Staticsky", "staticsky") 578 #skyIDs = gpc1Db.getIDsInThisDVODbForThisStage("MD04.GENE.PSPSDEEP", "staticsky") 579 #stackType = "DEEP_STACK" 580 #skyIDs = [689] 581 #skyIDs = [299] 582 #skyIDs = [302] 583 #skyIDs = [8508] 440 584 i = 0 441 for file in cmfFiles: 442 443 stackBatch = StackBatch(logger, sky_id, file) 444 445 stackBatch.createEmptyPspsTables() 446 stackBatch.importIppTables("") 447 # stackBatch.populatePspsTables() 448 # stackBatch.fudgeIDs() 449 # stackBatch.reportNullsInAllPspsTables(False) 450 # stackBatch.replaceAllPspsNulls("-999") 451 # stackBatch.exportPspsTablesToFits() 452 # stackBatch.writeBatchManifest() 453 # stackBatch.createTarball() 454 # stackBatch.publishToDatastore() 455 456 i = i + 1 457 if i > 0: break # TODO just doing one filter for now 458 585 for skyID in skyIDs: 586 587 logger.info("-------------------------------------------------- sky ID: %d" % skyID) 588 589 cmfFiles = gpc1Db.getStackStageCmfs(skyID) 590 591 for file in cmfFiles: 592 593 if not os.path.isfile(file): 594 logger.error("Cannot read file at '" + file) 595 continue 596 597 stackBatch = StackBatch(logger, skyID, file, stackType, True) 598 599 if not stackBatch.alreadyProcessed(): 600 601 stackBatch.createEmptyPspsTables() 602 stackBatch.importIppTables("") 603 if stackBatch.populatePspsTables(): 604 605 #stackBatch.reportNullsInAllPspsTables(False) 606 stackBatch.exportPspsTablesToFits() 607 stackBatch.writeBatchManifest() 608 #stackBatch.createTarball() 609 #stackBatch.publishToDatastore() 610 611 i = i + 1 612 #if i > 0: sys.exit() 613 614 logger.info("Finished")
Note:
See TracChangeset
for help on using the changeset viewer.
