IPP Software Navigation Tools IPP Links Communication Pan-STARRS Links

Ignore:
Timestamp:
May 5, 2011, 10:05:10 AM (15 years ago)
Author:
eugene
Message:

merging updates from trunk

File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/eam_branches/ipp-20110404/ippToPsps/jython/batch.py

    r31117 r31439  
    44import datetime
    55import re
     6import sys
     7import os
     8import md5
     9import shutil
     10import logging
     11from subprocess import call, PIPE, Popen
     12
     13from datastore import Datastore
     14from scratchdb import ScratchDb
     15from gpc1db import Gpc1Db
     16from ipptopspsdb import IppToPspsDb
    617
    718from java.lang import *
    819from java.sql import *
    9 
    10 
     20from xml.etree.ElementTree import ElementTree, Element, tostring
     21
     22'''
     23Base class of all batch types.
     24'''
    1125class Batch(object):
    1226
    13     driverName="com.mysql.jdbc.Driver"
    14 
    1527    '''
    1628    Constructor
    17     '''
    18     def __init__(self, batchType, inputFitsPath, outputFitsPath, dbHost, dbName, dbUser, dbPass, survey=""):
     29
     30    >>> batch = Batch(1,2,3,4,5,6,7)
     31    >>> print batch.pspsVoTableFilePath
     32    "../config/2/tables.vot"
     33    '''
     34    def __init__(self, logger, batchType, inputFitsPath="", survey="", useFullTables=False):
     35
     36        # set up logging
     37        self.logger = logger
     38        self.logger.info("-------------------------------------------------------------------------------")
     39        self.logger.debug("Batch class constructor")
    1940
    2041        # set up class variables
     42        self.batchType = batchType;
    2143        self.pspsVoTableFilePath = "../config/" + batchType + "/tables.vot"
    2244        self.inputFitsPath = inputFitsPath
    23         self.outputFitsPath = outputFitsPath
    24         self.dbHost = dbHost
    25         self.dbName = dbName
    26         self.dbUser = dbUser
    27         self.dbPass = dbPass
    2845        self.survey = survey
    29 
    30         # set up JDBC connection
    31         self.url = "jdbc:mysql://"+self.dbHost+"/"+self.dbName+"?user="+self.dbUser+"&password="+self.dbPass
    32         self.con = DriverManager.getConnection(self.url)
    33         self.stmt = self.con.createStatement()
    34 
    35         # get survey ID from init table
    36         sql = "SELECT surveyID from Survey WHERE name = '" + survey + "'"
    37         try:
    38             rs = self.stmt.executeQuery(sql) 
    39             rs.first()
    40             self.surveyID = rs.getInt(1)
    41         except:
    42             self.log("No survey ID found for this survey: '" + survey + "'")
     46        self.useFullTables = useFullTables
     47
     48        # TODO
     49        self.tablesToExport = []
     50
     51        # open config
     52        doc = ElementTree(file="config.xml")
     53
     54        # create Gpc1Db object
     55        self.gpc1Db = Gpc1Db(self.logger)
     56        self.ippToPspsDb = IppToPspsDb(logger)
     57        self.scratchDb = ScratchDb(logger, self.useFullTables)
     58
     59        if self.survey != "":
     60            self.surveyID = self.scratchDb.getSurveyID(self.survey)
     61   
     62            # get dvo info from config
     63            dvoName = doc.find("dvo_" + self.survey + "/name").text
     64            self.dvoLocation = doc.find("dvo_" + self.survey + "/location").text
     65        else:
     66            dvoName = ""
     67            self.dvoLocation = ""
    4368            self.surveyID = -1;
    44    
     69         
     70        # get datastore info from config
     71        self.datastore = Datastore(self.logger)
     72
     73        # create a new batch
     74        self.batchID = self.ippToPspsDb.createNewBatch(
     75                self.getPspsBatchType(),
     76                survey,
     77                dvoName,
     78                self.datastore.product)
     79
     80        # get local storage location from config
     81        self.batchName = "B%08d" % self.batchID
     82        self.subDir = doc.find("localOutPath").text + "/" + self.getPspsBatchType() + "/" + dvoName
     83        self.localOutPath = self.subDir + "/" + self.batchName
     84        if not os.path.exists(self.localOutPath): os.makedirs(self.localOutPath)
     85
    4586        # store today's date
    4687        now = datetime.datetime.now();
     
    4889
    4990        if self.inputFitsPath != "":
    50             self.parseFitsHeader()
     91            file = open(self.inputFitsPath)
     92            self.header = self.parseFitsHeader(file)
     93            self.logger.info("Read primary and found " + str(len(self.header)) + " header cards")
     94            # TODO close file?
     95
     96        # create DVO tables if accessing DVO directly
     97        if not self.useFullTables: self.scratchDb.createDvoTables()
    5198
    5299    '''
     
    55102    def __del__(self):
    56103
    57         self.log("Batch destructor")
    58         self.stmt.close()
    59         self.con.close()
    60 
    61     '''
    62     Prints a log message with the current time
    63     '''
    64     def log(self, msg):
    65 
    66         print datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " | " + msg
    67 
    68     '''
    69     Updates a table with surveyID
    70     '''
    71     def updateSurveyID(self, table):
    72 
    73         sql = "UPDATE " + table + "  SET surveyID=%d" % self.surveyID
    74         self.stmt.execute(sql)
    75 
    76     '''
    77     Updates a table with filterID grabbed from Filter init table
    78     '''
    79     def updateFilterID(self, table):
    80 
    81         sql = "UPDATE "+table+" AS a, Filter AS b SET a.filterID=b.filterID WHERE b.filterType = '" + self.filter + "'"
    82         self.stmt.execute(sql)
     104        self.logger.debug("Batch destructor")
     105
     106
     107    '''
     108    Returns the value from this dictinary or else NULL
     109    '''
     110    def safeDictionaryAccess(self, header, key):
     111
     112         if key in header: return header[key]
     113         else: return "NULL"
     114
     115    '''
     116    Finds and reads a header extension
     117    '''
     118    def findAndReadFITSHeader(self, name, file):
     119
     120        found = False
     121       
     122        while True:
     123           
     124            index = file.tell()
     125
     126            record = file.read(80)
     127            if not record: break;
     128
     129            # quit when we reach 'END'
     130            if record.startswith("XTENSION= 'IMAGE"):
     131
     132                header = self.parseFitsHeader(file)
     133                if header['EXTNAME'] == name:
     134                    found = True
     135                    file.seek(index + 2880, 0)
     136                    break
     137
     138            file.seek(index + 2880, 0)
     139           
     140        if found != True: self.logger.error("...could not find extension '" + name + "'")
     141        else: self.logger.info("...read header at '" + name + "' and found " + str(len(header)) + " header cards")
     142
     143        return header
     144
     145
     146    '''
     147    Writes the batch manifest file
     148    '''
     149    def writeBatchManifest(self):
     150
     151        outPath = self.localOutPath + "/BatchManifest.xml"
     152        tmpPath = "./tmp.xml"
     153        self.logger.info("Creating batch manifest file here: " + outPath)
     154        root = Element('manifest')
     155
     156        # batch information
     157        root.attrib['name'] = self.batchName
     158        root.attrib['type'] = self.getPspsBatchType()
     159        root.attrib['timestamp'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
     160        if self.survey != "":
     161            root.attrib['survey'] = self.getBatchFriendlySurveyType()
     162        try: self.minObjID
     163        except: pass
     164        else: root.attrib['minObjId'] = str(self.minObjID)
     165        try: self.maxObjID
     166        except: pass
     167        else: root.attrib['maxObjId'] = str(self.maxObjID)
     168
     169        # get md5sum
     170        p = Popen("md5sum " + self.outputFitsPath, shell=True, stdout=PIPE)
     171        p.wait()
     172        out = p.stdout.read()
     173        md5sum = out[0:out.rfind(" ")]
     174
     175        # get file size
     176        fileSize = os.path.getsize(self.outputFitsPath)
     177
     178        # file information
     179        child = Element('file')
     180        root.append(child)
     181        child.attrib['name'] = self.outputFitsFile
     182        child.attrib['bytes'] = str(fileSize)
     183        child.attrib['md5'] = md5sum
     184
     185        # now create doc and write to file
     186        file = open(tmpPath, 'w')
     187        ElementTree(root).write(file)
     188        file.close()
     189
     190        # clunky way to prettify XML
     191        p = Popen("xmllint --format " + tmpPath + " > " + outPath, shell=True, stdout=PIPE)
     192        p.wait()
     193        os.remove(tmpPath)
     194
     195
     196    '''
     197    tar and zips batch directory
     198    '''
     199    def createTarball(self):
     200     
     201        # set up filenams and paths
     202        tarFile = self.batchName + ".tar"
     203        tarPath = self.subDir + "/" + tarFile
     204
     205        self.tarballFile = tarFile + ".gz"
     206        tarballPath = self.subDir + "/" + self.tarballFile
     207
     208        # tar directory
     209        p = Popen("tar -cvf " + tarPath + "\
     210                -C " + self.subDir + " \
     211                " + self.batchName, shell=True, stdout=PIPE)
     212        p.wait()
     213
     214        # zip tar archive
     215        p = Popen("gzip -c " + tarPath + " > " + tarballPath, shell=True, stdout=PIPE)
     216        p.wait()
     217
     218        # delete tar file and original directory
     219        os.remove(tarPath)
     220        shutil.rmtree(self.localOutPath)
     221
     222    '''
     223    Publishes this batch to the datastore
     224    '''
     225    def publishToDatastore(self):
     226
     227        if self.datastore.publish(self.batchName, self.subDir, self.tarballFile, "tgz"):
     228            self.ippToPspsDb.updateLoadedToDatastore(self.batchID, 1)
     229
     230    '''
     231    Gets PSPS-friendly survey type
     232    '''
     233    def getBatchFriendlySurveyType(self):
     234
     235        return "SCR" # TODO
     236
     237        try:
     238            self.survey
     239        except:
     240            return "NA"
     241
     242        if self.survey == "3PI": return "3PI"
     243        elif self.survey == "MD04": return "MD4"
     244        else:
     245            self.logger.error("Don't know this survey: '" + self.survey + "'")
     246            return "NA"
     247
     248    '''
     249    Gets PSPS friendly batch type
     250    '''
     251    def getPspsBatchType(self):
     252
     253        if self.batchType == "init": return "IN"
     254        elif self.batchType == "detection": return "P2"
     255        elif self.batchType == "stack": return "ST"
     256        else: self.logger.error("Don't know this batch type: " + self.survey)
     257
     258    '''
     259    Sets min and max obj ID using the provided table, or list of tables
     260    '''
     261    def setMinMaxObjID(self, tables):
     262
     263        first = True
     264        for table in tables:
     265
     266            sql = "SELECT MIN(objID), MAX(objID) FROM " + table
     267            rs = self.scratchDb.stmt.executeQuery(sql)
     268            rs.first()
     269
     270            if first:
     271                self.minObjID = rs.getLong(1)
     272                self.maxObjID = rs.getLong(2)
     273            else:
     274                if rs.getLong(1) < self.minObjID: self.minObjID = rs.getLong(1)
     275                if rs.getLong(2) > self.maxObjID: self.maxObjID = rs.getLong(2)
     276
     277            first = False
     278
     279        self.ippToPspsDb.updateMinMaxObjID(self.batchID, self.minObjID, self.maxObjID)
    83280
    84281    '''
    85282    Reads FITS header and stores all fields in a dictionary object
    86283    '''
    87     def parseFitsHeader(self):
    88 
    89         fitsFile = open(self.inputFitsPath)
    90 
    91         self.header = {}
     284    def parseFitsHeader(self, fitsFile):
     285
     286        header = {}
    92287
    93288        while (True):
     289
    94290           record = fitsFile.read(80)
    95291
    96292           # quit when we reach 'END'
    97            if record.startswith("END"): break
    98 
    99            # ignore comments
    100            if record.startswith("COMMENT"): continue
    101            match = re.match('(.*)=(.*)', record)
     293           if re.match('END\s+', record): break
     294
     295           # this regex will get param/value pairs for all header cards, ignoring comments and parsing out 'HIERACH' prefixes
     296           match = re.match('^(HIERARCH )*([a-zA-Z0-9-_\.]+)\s*=\s+\'*([a-zA-Z0-9-_\.:\s@#]+)\'*\\/*', record)
    102297           if match:
    103298
    104                # remove HIERARCH prefix
    105                param = match.group(1).replace("HIERARCH", "")
    106                param = param.strip()
    107 
    108                value = match.group(2)
    109                # remove trailing comment after / char, if there is one
    110                index = value.find("/")
    111                if index != -1: value = value[0:index]
    112 
    113                # remove ' chars around content
    114                value = value.replace("'", "")
    115 
    116                # remove leading and trailing whitespace
    117                value = value.strip()
    118 
    119                # store in out dictionary object
    120                self.header[param] = value
     299               param = match.group(2)
     300               value = match.group(3).strip()
     301               if value == "NaN": value = "NULL"
     302               header[param] = value
    121303
    122304               #print param + "|" + value + "|"
     305
     306        return header
    123307
    124308    '''
     
    129313         self.pspsTables = stilts.treads(self.pspsVoTableFilePath)
    130314         for table in self.pspsTables:
    131              self.log("Creating PSPS table: " + table.name)
    132              table.write(self.url + '#' + table.name)
    133 
    134          self.indexPspsTables()
     315             self.logger.info("Creating PSPS table: " + table.name)
     316             table.write(self.scratchDb.url + '#' + table.name)
     317             self.tablesToExport.append(table.name)
     318
     319         self.alterPspsTables();
    135320
    136321    '''
     
    138323    '''   
    139324    def indexIppTables(self):
    140         self.log("indexIppTables not implemented")
    141 
    142 
    143     '''
    144     Adds an index to the supplied table and column
    145     '''
    146     def createIndex(self, table, column):
    147 
    148         self.log("Creating index on column '"+column+"' for table '"+table+"'")
    149 
    150         sql = "CREATE INDEX "+table+"_index ON "+table+" ("+column+")"
    151         try:
    152             self.stmt.execute(sql)
    153         except:
    154             self.log("Index already in place on '" + column + "' for table '" + table + "'")
    155 
    156 
    157     '''
    158     Subclass should implement this to index PSPS tables
     325        self.logger.warn("indexIppTables not implemented")
     326
     327
     328    '''
     329    Alter PSPS tables
    159330    '''   
    160     def indexPspsTables(self):
    161         self.log("indexPspsTables not implemented")
     331    def alterPspsTables(self):
     332        self.logger.warn("alterPspsTables not implemented")
    162333
    163334    '''
    164335    Imports IPP tables from FITS file
    165336
    166     Accepts a regular expression filter so not all tabls need to be imported
     337    Accepts a regular expression filter so not all tables need to be imported
    167338    '''
    168339    def importIppTables(self, filter):
    169340
     341      self.logger.info("Attempting to import tables from input FITS file")
    170342      tables = stilts.treads(self.inputFitsPath)
    171343
     
    175347          match = re.match(filter, table.name)
    176348          if not match: continue
    177           self.log("Creating IPP table " + table.name)
     349          self.logger.info("   Reading IPP table " + table.name + " from FITS file")
    178350          table = stilts.tpipe(table, cmd='explodeall')
     351
     352          # drop any previous tables before import
     353          self.scratchDb.dropTable(table.name)
     354
     355          # IPP FITS files are littered with infinities, so remove these
     356          self.logger.info("   Removing Infinity values from all columns")
     357          table = stilts.tpipe(table, cmd='replaceval -Infinity null *')
     358          table = stilts.tpipe(table, cmd='replaceval Infinity null *')
     359
    179360          try:
    180               table.write(self.url + '#' + table.name)
     361              table.write(self.scratchDb.url + '#' + table.name)
    181362          except:
    182               self.log("ERROR problem writing table '" + table.name + "' to the database")
    183 
     363              self.logger.exception("   Problem writing table '" + table.name + "' to the database")
    184364          count = count + 1
    185365
    186       self.log("Imported %d tables from '%s' " % (count, self.inputFitsPath))
     366      self.logger.info("Done. Imported %d tables" % count)
    187367
    188368      self.indexIppTables()
    189369
    190370    '''
    191     Exports PSPS tables from the database to FITS format
     371    Exports PSPS tables from the database to FITS format. Optional regex if you want to alter table names prior to export
    192372    '''   
    193     def exportPspsTablesToFits(self):
    194 
    195         self.log("Exporting all PSPS tables to FITS")
     373    def exportPspsTablesToFits(self, regex="(.*)"):
     374
     375        self.logger.info("Replacing NULLs with -999 then exporting all PSPS tables to FITS")
    196376        _tables = []
    197377
    198         self.log("    Selecting database tables")
     378        self.logger.info("    Selecting database tables")
     379        for table in self.tablesToExport:
     380
     381           # check for an empty table
     382           if self.scratchDb.getRowCount(table) < 1: continue
     383
     384           # get everything from table
     385           _table = stilts.tread(self.scratchDb.url + '#SELECT * FROM ' + table)
     386
     387           # replace nulls and empty fields with weird PSPS -999 pseudo-null
     388           _table = stilts.tpipe(_table, cmd='replaceval "" -999 *')
     389
     390           match = re.match(regex, table)
     391           newTableName = match.group(1)
     392
     393           # change table names
     394           _table = stilts.tpipe(_table, cmd='tablename ' + newTableName)
     395           _tables.append(_table)
     396
     397        self.logger.info("    Writing to FITS file '" + self.outputFitsPath + "'...")
     398        stilts.twrites(_tables, self.outputFitsPath, fmt='fits')
     399        self.logger.info("    ...done")
     400        self.ippToPspsDb.updateProcessed(self.batchID, 1)
     401
     402    '''
     403    Searches all PSPS tables and reports the columns that are either partially or completely populated with NULLs
     404    '''
     405    def reportNullsInAllPspsTables(self, showPartials):
     406
    199407        for table in self.pspsTables:
    200            _table = stilts.tread(self.url + '#SELECT * FROM ' + table.name)
    201            _table = stilts.tpipe(_table, cmd='tablename ' + table.name)
    202            _tables.append(_table)
    203 
    204         self.log("    Writing to FITS file " + self.outputFitsPath)
    205         stilts.twrites(_tables, self.outputFitsPath, fmt='fits')
    206 
     408            self.scratchDb.reportNulls(table.name, showPartials)
     409
     410    '''
     411    Searches all PSPS tables and replaces all NULLs with the provided substitute
     412    '''
     413    def replaceAllPspsNulls(self, sub):
     414
     415        self.logger.info("Replacing all NULL values in PSPS tables with '" + sub + "'...")
     416        for table in self.pspsTables:
     417            self.scratchDb.replaceNulls(table.name, sub)
     418        self.logger.info("...done")
    207419
    208420    '''
     
    210422    '''
    211423    def populatePspsTables(self):
    212         self.log("Not implemented yet")
    213 
     424        self.logger.warn("Not implemented yet")
     425
     426    '''
     427    Calls DVO program to 'query' DVO database and populate results to local MySQL Db table
     428    '''
     429    def getIDsFromDVO(self):
     430
     431        # TODO path to DVO prog hardcoded temporarily
     432        cmd = "../src/dvograbber " + self.dvoLocation
     433        self.logger.info("Running: '" + cmd + "'...")
     434        p = Popen(cmd, shell=True, stdout=PIPE)
     435        p.wait()
     436        #        out = p.stdout.read()
     437        self.logger.info("...done")
     438
     439        if self.scratchDb.getRowCount("dvoDetection") < 1:
     440            self.logger.error("No DVO IDs found")
     441            return False
     442           
     443        return True
     444
     445    '''
     446    Checks whether this batch has already been processed and published. To be implemented by all subclasses
     447    '''
     448    def alreadyProcessed(self):
     449           self.logger.info("Not implemented")
     450
     451
     452
Note: See TracChangeset for help on using the changeset viewer.