- Timestamp:
- May 5, 2011, 10:05:10 AM (15 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/eam_branches/ipp-20110404/ippToPsps/jython/stackbatch.py
r31118 r31439 1 1 #!/usr/bin/env jython 2 3 import os.path 4 import sys 2 5 3 6 import stilts 4 7 from java.lang import * 5 8 from java.sql import * 9 10 from gpc1db import Gpc1Db 6 11 from batch import Batch 7 12 import logging.config 13 14 ''' 15 StackBatch class 16 ''' 8 17 class StackBatch(Batch): 9 18 … … 11 20 Constructor 12 21 ''' 13 def __init__(self ):22 def __init__(self, logger, skyID, inputFile, stackType, useFullTables=False): 14 23 super(StackBatch, self).__init__( 24 logger, 15 25 "stack", 16 #"/data/ipp053.0/eugene/md04.20110320/staticsky/MD04.V2/skycell.087/MD04.V2.skycell.087.stk.280.000.cmf", 17 "demo.fits", 18 "stack.fits", 19 "localhost", 20 "ipptopsps_scratch", 21 "ipp", 22 "ipp", 23 "MD04") # TODO 24 25 26 inputFile, 27 "MD04", 28 useFullTables) # TODO 29 30 self.logger.info("StackBatch constructor. Creating batch from: '" + inputFile + "'") 31 32 self.skyID = skyID 33 34 # get filterID using init table 35 self.filter = self.header['FPA.FILTER'] 36 self.filter = self.filter[0:1] 37 38 self.stackType = stackType 39 meta = self.gpc1Db.getStackStageMeta(self.skyID, self.header['FPA.FILTER']) 40 if len(meta) < 1: return 41 self.stackID = meta[0]; 42 self.skycell = meta[1]; 43 44 # determine skycell from header value 45 #self.skycell = "skycell.34" #= self.header['SKYCELL'] 46 self.skycell = self.skycell[8:] 47 48 self.logger.info("Processing stack with ID: %d, type: %s and skycell: %s filter: %s" % (self.stackID, self.stackType, self.skycell, self.filter)) 49 50 51 # delete PSPS tables 52 self.scratchDb.dropTable("StackMeta") 53 self.scratchDb.dropTable("StackDetection") 54 self.scratchDb.dropTable("StackModelFit") 55 self.scratchDb.dropTable("StackApFlx") 56 self.scratchDb.dropTable("StackToImage") 57 self.scratchDb.dropTable("SkinnyObject") 58 self.scratchDb.dropTable("ObjectCalColor") 59 60 # delete IPP tables 61 #self.scratchDb.dropTable("SkyChip_psf") 62 #self.scratchDb.dropTable("SkyChip_xsrc") 63 #self.scratchDb.dropTable("SkyChip_xfit") 64 #self.scratchDb.dropTable("SkyChip_xrad") 65 66 self.logger.info("Stack type: " + self.safeDictionaryAccess(self.header, self.stackType)) 67 # obs time makes no sense except for nightly stacks 68 #if self.header['STK_TYPE'] != "NIGHTLY_STACK": self.header['MJD-OBS'] = "-999" 69 70 # create an output filename, which is {filterID}{skycellID}.FITS 71 self.outputFitsFile = "%s%07d.FITS" % (self.filter, int(self.skycell)) 72 self.outputFitsPath = "%s/%s" % (self.localOutPath, self.outputFitsFile) 73 74 # set some constants 75 self.dataRelease = "1" 76 self.historyModNum = "0" 77 78 # insert what we know about this stack batch into the stack table 79 self.ippToPspsDb.insertStackMeta(self.batchID, self.skyID, self.stackID, self.filter, self.stackType) 80 81 # insert sourceID/imageID combo so DVO can look it up 82 if not self.useFullTables: 83 self.scratchDb.insertNewDvoImage(self.header['SOURCEID'], self.header['IMAGEID']) 26 84 27 85 ''' … … 30 88 def updateStackMetaID(self, table): 31 89 32 sql = "UPDATE " + table + " SET stackMetaID=" + s elf.header['STK_ID']33 self.s tmt.execute(sql)90 sql = "UPDATE " + table + " SET stackMetaID=" + str(self.stackID) 91 self.scratchDb.stmt.execute(sql) 34 92 35 93 ''' … … 38 96 def updateStackTypeID(self, table): 39 97 40 sql = "UPDATE "+table+" AS a, StackType AS b SET a.stackTypeID=b.stackTypeID WHERE b.name = '" +self.header['STK_TYPE']+"'"41 self.s tmt.execute(sql)98 sql = "UPDATE "+table+" AS a, StackType AS b SET a.stackTypeID=b.stackTypeID WHERE b.name = '" + self.stackType + "'" 99 self.scratchDb.stmt.execute(sql) 42 100 43 101 … … 90 148 WHERE a.ippDetectID=b.IPP_IDET AND b.PSF_FWHM "+psfCondition 91 149 92 self.s tmt.execute(sql)150 self.scratchDb.stmt.execute(sql) 93 151 94 152 ''' … … 138 196 WHERE a.ippDetectID=b.IPP_IDET AND b.MODEL_TYPE = '"+ippModelType+"'" 139 197 140 self.s tmt.execute(sql)198 self.scratchDb.stmt.execute(sql) 141 199 142 200 # sersic fit has an extra parameter … … 150 208 "+modelLong+"Covar68=b.EXT_COVAR_05_07, \ 151 209 "+modelLong+"Covar78=b.EXT_COVAR_06_07, \ 152 "+modelLong+"Covar88=b.EXT_COVAR_07_07 \ 210 "+modelLong+"Covar88=b.EXT_COVAR_07_07, \ 211 "+"serNu=b.EXT_PAR_07, \ 212 "+"serNuErr=SQRT(EXT_COVAR_07_07) \ 153 213 WHERE a.ippDetectID=b.IPP_IDET AND b.MODEL_TYPE = '"+ippModelType+"'" 154 214 155 self.s tmt.execute(sql)215 self.scratchDb.stmt.execute(sql) 156 216 157 217 … … 160 220 ''' 161 221 def populateStackMeta(self): 162 self.log ("Procesing StackMeta table")222 self.logger.info("Procesing StackMeta table") 163 223 164 224 sql = "INSERT INTO StackMeta (\ 165 225 stackMetaID \ 166 226 ,skycellID \ 227 ,photoCalID \ 167 228 ,photoZero \ 168 ,nP2Images \ 229 ,expTime \ 230 ,psfModelID \ 169 231 ,ctype1 \ 170 232 ,ctype2 \ … … 180 242 ,pc002002 \ 181 243 ) VALUES ( \ 182 " + s elf.header['STK_ID']+ " \244 " + str(self.stackID) + " \ 183 245 ," + self.skycell + " \ 246 ," + str(self.scratchDb.getPhotoCalID(self.header['SOURCEID'], self.header['IMAGEID'])) + " \ 184 247 ," + self.header['FPA.ZP'] + " \ 185 ," + self.header['NINPUTS'] + " \ 248 ," + self.header['EXPTIME'] + " \ 249 ,'" + self.safeDictionaryAccess(self.header, 'PSFMODEL') + "' \ 186 250 ,'" + self.header['CTYPE1'] + "' \ 187 251 ,'" + self.header['CTYPE2'] + "' \ … … 197 261 ," + self.header['PC002002'] + " \ 198 262 )" 199 self.stmt.execute(sql) 200 201 self.updateSurveyID("StackMeta") 202 self.updateFilterID("StackMeta") 263 self.scratchDb.stmt.execute(sql) 264 265 self.scratchDb.updateAllRows("StackMeta", "surveyID", str(self.surveyID)) 266 self.scratchDb.updateFilterID("StackMeta", self.filter) 267 self.scratchDb.updateAllRows("StackMeta", "dataRelease", str(self.dataRelease)) 203 268 self.updateStackTypeID("StackMeta") 204 269 … … 207 272 ''' 208 273 def populateStackDetection(self): 209 self.log ("Procesing StackDetection table")274 self.logger.info("Procesing StackDetection table") 210 275 211 276 # insert all the detections 212 277 sql = "INSERT INTO StackDetection (\ 213 278 ippDetectID \ 279 ,skyCellID \ 280 ,obsTime \ 214 281 ,xPos \ 215 282 ,yPos \ … … 225 292 ,psfWidMinor \ 226 293 ,psfTheta \ 294 ,infoFlag \ 227 295 ,psfCf \ 296 ,momentXX \ 297 ,momentXY \ 298 ,momentYY \ 299 ,momentM3C \ 300 ,momentM3S \ 301 ,momentM4C \ 302 ,momentM4S \ 303 ,momentR1 \ 304 ,momentRH \ 305 ,apMag \ 306 ,apMagErr \ 307 ,kronFlux \ 308 ,kronFluxErr \ 309 ,kronRad \ 310 ,kronRadErr \ 228 311 ,nFrames \ 312 ,assocDate \ 313 ,historyModNum \ 229 314 ) \ 230 315 SELECT \ 231 316 IPP_IDET \ 317 ," + self.skycell + " \ 318 ," + self.header['MJD-OBS'] + " \ 232 319 ,X_PSF \ 233 320 ,Y_PSF \ 234 321 ,X_PSF_SIG \ 235 322 ,Y_PSF_SIG \ 236 ,P SF_INST_FLUX\237 , PSF_INST_FLUX_SIG\323 ,POW(10.0, (-0.4*PSF_INST_MAG)) / "+self.header['EXPTIME']+" \ 324 ,ABS((PSF_INST_MAG_SIG*(POW(10.0, (-0.4*PSF_INST_MAG)) / "+self.header['EXPTIME']+")) / 1.085736) \ 238 325 ,POW(10.0, (-0.4*PEAK_FLUX_AS_MAG)) / "+self.header['EXPTIME']+" \ 239 326 ,SKY \ … … 243 330 ,PSF_MINOR \ 244 331 ,PSF_THETA \ 332 ,FLAGS << 32 | FLAGS2 \ 245 333 ,PSF_QF \ 334 ,MOMENTS_XX \ 335 ,MOMENTS_XY \ 336 ,MOMENTS_YY \ 337 ,MOMENTS_M3C \ 338 ,MOMENTS_M3S \ 339 ,MOMENTS_M4C \ 340 ,MOMENTS_M4S \ 341 ,MOMENTS_R1 \ 342 ,MOMENTS_RH \ 343 ,AP_MAG \ 344 , NULL \ 345 ,KRON_FLUX \ 346 ,KRON_FLUX_ERR \ 347 , NULL \ 348 , NULL \ 246 349 ,N_FRAMES \ 350 , '" + self.dateStr + "' \ 351 ," + self.historyModNum + " \ 247 352 FROM SkyChip_psf" 248 353 249 self.stmt.execute(sql) 250 251 if self.header['STK_TYPE'] != "NIGHTLY_STACK": 252 sql = "UPDATE StackDetection SET obsTime = " + self.header['MJD-OBS'] 253 self.stmt.execute(sql) 254 255 256 sql = "UPDATE StackDetection SET skycellID = " + self.skycell + ", assocDate = '"+self.dateStr+"'" 257 self.stmt.execute(sql) 258 self.updateSurveyID("StackDetection") 259 self.updateFilterID("StackDetection") 354 self.scratchDb.stmt.execute(sql) 355 356 self.scratchDb.updateAllRows("StackDetection", "surveyID", str(self.surveyID)) 357 self.scratchDb.updateFilterID("StackDetection", self.filter) 358 self.scratchDb.updateAllRows("StackDetection", "dataRelease", str(self.dataRelease)) 359 self.scratchDb.updateAllRows("StackDetection", "primaryF", "0") 360 self.scratchDb.updateAllRows("StackDetection", "activeFlag", "0") 260 361 self.updateStackMetaID("StackDetection") 261 362 self.updateStackTypeID("StackDetection") 363 self.updateDvoIDs("StackDetection") 364 365 # now delete bad flux 366 self.scratchDb.reportAndDeleteRowsWithNULLS("StackDetection", "instFlux") 367 self.scratchDb.reportAndDeleteRowsWithNULLS("StackDetection", "objID") 368 262 369 263 370 ''' … … 265 372 ''' 266 373 def populateStackApFlx(self): 267 self.log ("Procesing StackApFlx table")374 self.logger.info("Procesing StackApFlx table") 268 375 269 376 sql = "INSERT INTO StackApFlx \ … … 272 379 DISTINCT IPP_IDET \ 273 380 FROM SkyChip_xrad" 274 self.stmt.execute(sql) 275 276 self.log(" Adding 1st convolved fluxes") 381 382 try: 383 self.scratchDb.stmt.execute(sql) 384 except: return 385 386 # TODO temporarily loading 1st convolved fluxes into unconvolved fields 387 self.logger.info(" Adding un-convolved fluxes") 388 self.updateApFlxs("", "< 7.0") 389 self.logger.info(" Adding 1st convolved fluxes") 277 390 self.updateApFlxs("c1", "< 7.0") 278 self.log (" Adding 2nd convolved fluxes")391 self.logger.info(" Adding 2nd convolved fluxes") 279 392 self.updateApFlxs("c2", "> 7.0") 280 393 281 self.log (" Adding petrosian stufffor extended sources")394 self.logger.info(" Adding petrosians for extended sources") 282 395 sql = "UPDATE StackApFlx AS a, SkyChip_xsrc AS b SET \ 283 396 petRadius=b.PETRO_RADIUS \ … … 290 403 ,petR90Err=b.PETRO_RADIUS_90_ERR \ 291 404 WHERE a.ippDetectID=b.IPP_IDET" 292 self.stmt.execute(sql) 293 294 self.updateSurveyID("StackApFlx") 295 self.updateFilterID("StackApFlx") 405 self.scratchDb.stmt.execute(sql) 406 407 self.scratchDb.updateAllRows("StackApFlx", "surveyID", str(self.surveyID)) 408 self.scratchDb.updateFilterID("StackApFlx", self.filter) 409 self.scratchDb.updateAllRows("StackApFlx", "dataRelease", str(self.dataRelease)) 410 self.scratchDb.updateAllRows("StackApFlx", "primaryF", "0") 411 self.scratchDb.updateAllRows("StackApFlx", "activeFlag", "0") 296 412 self.updateStackMetaID("StackApFlx") 297 413 self.updateStackTypeID("StackApFlx") 298 299 #rs = stmt.executeQuery(sql) 300 # list = [] 301 302 #while (rs.next()): 303 304 # print rs.getString(1) 305 306 #rs.close() 414 self.updateDvoIDs("StackApFlx") 307 415 308 416 ''' … … 310 418 ''' 311 419 def populateStackModelFit(self): 312 self.log ("Procesing StackModelFit table")420 self.logger.info("Procesing StackModelFit table") 313 421 314 422 # insert all the detections 315 423 sql = "INSERT INTO StackModelFit (ippDetectID) SELECT DISTINCT IPP_IDET from SkyChip_xfit" 316 self.stmt.execute(sql) 424 try: 425 self.scratchDb.stmt.execute(sql) 426 except: 427 return 428 317 429 318 430 # populate model parameters 319 self.log (" Adding deVaucouleurs fit")431 self.logger.info(" Adding deVaucouleurs fit") 320 432 self.updateModelFit("deV", "PS_MODEL_DEV") 321 self.log (" Adding exponential fit")433 self.logger.info(" Adding exponential fit") 322 434 self.updateModelFit("exp", "PS_MODEL_EXP") 323 self.log (" Adding sersic fit")435 self.logger.info(" Adding sersic fit") 324 436 self.updateModelFit("ser", "PS_MODEL_SERSIC") 325 437 326 self.updateSurveyID("StackModelFit") 327 self.updateFilterID("StackModelFit") 438 self.scratchDb.updateAllRows("StackModelFit", "surveyID", str(self.surveyID)) 439 self.scratchDb.updateFilterID("StackModelFit", self.filter) 440 self.scratchDb.updateAllRows("StackModelFit", "dataRelease", str(self.dataRelease)) 441 self.scratchDb.updateAllRows("StackModelFit", "primaryF", "0") 442 self.scratchDb.updateAllRows("StackModelFit", "activeFlag", "0") 328 443 self.updateStackMetaID("StackModelFit") 329 444 self.updateStackTypeID("StackModelFit") 445 self.updateDvoIDs("StackModelFit") 446 447 ''' 448 Populates the StackToImage table 449 ''' 450 def populateStackToImage(self): 451 self.logger.info("Procesing StackToImage table") 452 453 imageIDs = self.gpc1Db.getImageIDsForThisStackID(self.stackID) 454 455 for imageID in imageIDs: 456 sql = "INSERT INTO StackToImage (stackMetaID, imageID) \ 457 VALUES (\ 458 " + str(self.stackID) + ", " + imageID + ")" 459 self.scratchDb.stmt.execute(sql) 460 461 # now update StackMeta with correct number of inputs 462 sql = "UPDATE StackMeta SET nP2Images = (SELECT COUNT(*) FROM StackToImage)" 463 self.scratchDb.stmt.execute(sql) 464 465 ''' 466 Populates the SkinnyObject table 467 ''' 468 def populateSkinnyObject(self): 469 self.logger.info("Procesing SkinnyObject table") 470 471 sql = "INSERT INTO SkinnyObject (\ 472 objID \ 473 ,ippObjID \ 474 ) \ 475 SELECT \ 476 objID \ 477 ,ippObjID \ 478 FROM StackDetection" 479 self.scratchDb.stmt.execute(sql) 480 481 self.scratchDb.updateAllRows("SkinnyObject", "surveyID", str(self.surveyID)) 482 self.scratchDb.updateAllRows("SkinnyObject", "dataRelease", str(self.dataRelease)) 483 484 ''' 485 Populates the ObjectCalColor table 486 ''' 487 def populateObjectCalColor(self): 488 self.logger.info("Procesing ObjectCalColor table") 489 490 sql = "INSERT INTO ObjectCalColor (\ 491 objID \ 492 ,ippObjID \ 493 ) \ 494 SELECT \ 495 objID \ 496 ,ippObjID \ 497 FROM StackDetection" 498 self.scratchDb.stmt.execute(sql) 499 500 self.scratchDb.updateFilterID("ObjectCalColor", self.filter) 501 self.scratchDb.updateAllRows("ObjectCalColor", "dataRelease", str(self.dataRelease)) 330 502 331 503 … … 333 505 Applies indexes to the PSPS tables 334 506 ''' 335 def indexPspsTables(self): 336 337 self.log("Creating indexes on PSPS tables") 338 self.createIndex("StackDetection", "ippDetectID") 339 self.createIndex("StackApFlx", "ippDetectID") 340 self.createIndex("StackModelFit", "ippDetectID") 507 def alterPspsTables(self): 508 509 self.logger.info("Altering PSPS tables") 510 self.scratchDb.makeColumnUnique("StackDetection", "objID") 511 self.scratchDb.createIndex("StackDetection", "ippDetectID") 512 self.scratchDb.createIndex("StackApFlx", "ippDetectID") 513 self.scratchDb.createIndex("StackModelFit", "ippDetectID") 341 514 342 515 ''' … … 345 518 def indexIppTables(self): 346 519 347 self.log("Creating indexes on IPP tables") 348 self.createIndex("SkyChip_psf", "IPP_IDET") 349 self.createIndex("SkyChip_xfit", "IPP_IDET") 350 self.createIndex("SkyChip_xrad", "IPP_IDET") 351 self.createIndex("SkyChip_xsrc", "IPP_IDET") 520 self.logger.info("Creating indexes on IPP tables") 521 self.scratchDb.createIndex("SkyChip_psf", "IPP_IDET") 522 self.scratchDb.createIndex("SkyChip_xfit", "IPP_IDET") 523 self.scratchDb.createIndex("SkyChip_xrad", "IPP_IDET") 524 self.scratchDb.createIndex("SkyChip_xsrc", "IPP_IDET") 525 526 ''' 527 Updates provided table with DVO IDs from DVO table 528 ''' 529 def updateDvoIDs(self, table): 530 531 imageID = self.scratchDb.getImageIDFromExternID(self.header['SOURCEID'], self.header['IMAGEID']) 532 533 self.logger.info("Updating table '" + table + "' with DVO IDs...") 534 sql = "UPDATE IGNORE " + table + " AS a, dvoDetectionFull AS b SET \ 535 a.ippObjID = b.ippObjID, \ 536 a.stackDetectID = b.detectID, \ 537 a.objID = b.objID \ 538 WHERE a.ippDetectID = b.ippDetectID \ 539 AND b.sourceID = " + self.header['SOURCEID'] + "\ 540 AND b.imageID = " + str(imageID) 541 self.scratchDb.stmt.execute(sql) 542 352 543 353 544 ''' … … 356 547 def populatePspsTables(self): 357 548 358 # determine skycell from header value 359 self.skycell = self.header['SKYCELL'] 360 self.skycell = self.skycell[8:] 361 362 # get filterID using init table 363 self.filter = self.header['FPA.FILTER'] 364 self.filter = self.filter[0:1] 549 if not self.useFullTables: 550 if not self.getIDsFromDVO(): 551 return False 365 552 366 553 self.populateStackMeta() … … 368 555 self.populateStackModelFit() 369 556 self.populateStackApFlx() 370 371 372 stackBatch = StackBatch() 373 stackBatch.createEmptyPspsTables() 374 stackBatch.importIppTables(".*psf") 375 stackBatch.populatePspsTables() 376 stackBatch.exportPspsTablesToFits() 557 self.populateStackToImage() 558 self.populateSkinnyObject() 559 self.populateObjectCalColor() 560 561 self.setMinMaxObjID(["StackDetection"]) 562 563 return True 564 565 ''' 566 Checks whether this batch has already been processed and published 567 ''' 568 def alreadyProcessed(self): 569 570 return self.ippToPspsDb.alreadyProcessed("stack", "stack_id", self.stackID) 571 572 logging.config.fileConfig("logging.conf") 573 logger = logging.getLogger("stackbatch") 574 logger.info("Starting") 575 gpc1Db = Gpc1Db(logger) 576 stackType = "NIGHTLY_STACK" 577 skyIDs = gpc1Db.getIDsInThisDVODbForThisStage("MD04.Staticsky", "staticsky") 578 #skyIDs = gpc1Db.getIDsInThisDVODbForThisStage("MD04.GENE.PSPSDEEP", "staticsky") 579 #stackType = "DEEP_STACK" 580 #skyIDs = [689] 581 #skyIDs = [299] 582 #skyIDs = [302] 583 #skyIDs = [8508] 584 i = 0 585 for skyID in skyIDs: 586 587 logger.info("-------------------------------------------------- sky ID: %d" % skyID) 588 589 cmfFiles = gpc1Db.getStackStageCmfs(skyID) 590 591 for file in cmfFiles: 592 593 if not os.path.isfile(file): 594 logger.error("Cannot read file at '" + file) 595 continue 596 597 stackBatch = StackBatch(logger, skyID, file, stackType, True) 598 599 if not stackBatch.alreadyProcessed(): 600 601 stackBatch.createEmptyPspsTables() 602 stackBatch.importIppTables("") 603 if stackBatch.populatePspsTables(): 604 605 #stackBatch.reportNullsInAllPspsTables(False) 606 stackBatch.exportPspsTablesToFits() 607 stackBatch.writeBatchManifest() 608 #stackBatch.createTarball() 609 #stackBatch.publishToDatastore() 610 611 i = i + 1 612 #if i > 0: sys.exit() 613 614 logger.info("Finished")
Note:
See TracChangeset
for help on using the changeset viewer.
