Changeset 37246 for trunk/ippToPsps/jython/ipptopspsdb.py
- Timestamp:
- Aug 12, 2014, 4:10:18 PM (12 years ago)
- File:
-
- 1 edited
-
trunk/ippToPsps/jython/ipptopspsdb.py (modified) (16 diffs)
Legend:
- Unmodified
- Added
- Removed
-
trunk/ippToPsps/jython/ipptopspsdb.py
r36815 r37246 166 166 167 167 return ids 168 169 ## '''170 ## XXX EAM : removed this function and made it more explicit (with an additional block)171 ## Returns a list of merged batch IDs that are merged but not yet deleted172 ## '''173 ## def getMergedButNotDeletedBatchIDs(self, batchType, column):174 ##175 ## sql = "SELECT DISTINCT batch_id \176 ## FROM batch \177 ## WHERE timestamp > '" + self.skychunk.epoch + "' \178 ## AND batch_type = '" + batchType + "' \179 ## AND dvo_db = '" + self.skychunk.dvoLabel + "' \180 ## AND merged = 1 \181 ## AND " + column + " = 0"182 ##183 ## ids = []184 ## try:185 ## rs = self.executeQuery(sql)186 ## while (rs.next()): ids.append(rs.getInt(1))187 ## rs.close()188 ## except:189 ## self.logger.exception("Can't query for merged batch ids in ipptopsps Db")190 ##191 ## self.logger.debug("Found %d merged but un-deleted items" % len(ids))192 ##193 ## return ids194 195 ## '''196 ## Returns a list of batch IDs marked as 'purged' but not yet deleted197 ## XXX EAM : removed this function and made it more explicit (with an additional block)198 ## '''199 ## def getPurgedButNotDeletedBatchIDs(self, batchType, column):200 ##201 ## sql = "SELECT DISTINCT batch_id \202 ## FROM batch \203 ## WHERE timestamp > '" + self.skychunk.epoch + "' \204 ## AND batch_type = '" + batchType + "' \205 ## AND dvo_db = '" + self.skychunk.dvoLabel + "' \206 ## AND purged = 1 \207 ## AND " + column + " = 0"208 ##209 ## ids = []210 ## try:211 ## rs = self.executeQuery(sql)212 ## while (rs.next()): ids.append(rs.getInt(1))213 ## rs.close()214 ## except:215 ## self.logger.exception("Can't query for merged batch ids in ipptopsps Db")216 ##217 ##218 ## self.logger.debug("Found %d merged but un-deleted items" % len(ids))219 ##220 ## return ids221 222 ## '''223 ## Returns a list of processed batch IDs that have been loaded to the ODM but not yet deleted224 ## '''225 ## def getLoadedToODMButNotDeletedBatchIDs(self, batchType, column):226 ##227 ## sql = "SELECT DISTINCT batch_id \228 ## FROM batch \229 ## WHERE timestamp > '" + self.skychunk.epoch + "' \230 ## AND batch_type = '" + batchType + "' \231 ## AND dvo_db = '" + self.skychunk.dvoLabel + "' \232 ## AND (loaded_to_ODM = -1 OR merge_worthy = 1) \233 ## AND " + column + " = 0"234 ##235 ## ids = []236 ## try:237 ## rs = self.executeQuery(sql)238 ## while (rs.next()): ids.append(rs.getInt(1))239 ## rs.close()240 ## except:241 ## self.logger.exception("Can't query for merged batch ids in ipptopsps Db")242 ##243 ## self.logger.debug("Found %d merged but un-deleted items" % len(ids))244 ##245 ## return ids246 247 # '''248 # Returns a list of merged batch IDs that are not deleted from local disk249 # '''250 # def getMergedButNotDeletedFromLocalDisk(self, batchType):251 # return self.getMergedButNotDeletedBatchIDs(batchType, "deleted_local")252 253 ## '''254 ## Returns a list of processed batch IDs that are loaded to the ODM, but not deleted from local disk255 ## '''256 ## def getLoadedToODMButNotDeletedFromLocalDisk(self, batchType):257 ## return self.getLoadedToODMButNotDeletedBatchIDs(batchType, "deleted_local")258 ##259 ## '''260 ## Returns a list of processed batch IDs that are loaded to the ODM, but not deleted from datastore261 ## '''262 ## def getLoadedToODMButNotDeletedFromDatastore(self, batchType):263 ## return self.getLoadedToODMButNotDeletedBatchIDs(batchType, "deleted_datastore")264 ##265 ## '''266 ## Returns a list of processed batch IDs that are loaded to the ODM, but not deleted from DXLayer267 ## '''268 ## def getLoadedToODMButNotDeletedFromDXLayer(self, batchType):269 ## return self.getLoadedToODMButNotDeletedBatchIDs(batchType, "deleted_dxlayer")270 271 ## '''272 ## Returns a list of purged batch IDs that not deleted from local disk273 ## '''274 ## def getPurgedButNotDeletedFromLocalDisk(self, batchType):275 ## return self.getPurgedButNotDeletedBatchIDs(batchType, "deleted_local")276 ##277 ## '''278 ## Returns a list of purged batch IDs that are not deleted from datastore279 ## '''280 ## def getPurgedButNotDeletedFromDatastore(self, batchType):281 ## return self.getPurgedButNotDeletedBatchIDs(batchType, "deleted_datastore")282 283 168 284 169 ''' … … 786 671 and not self.consistentlyFailed(batchType, stageID)): 787 672 788 self.logger.infoPair("heather:","passed logic")673 # self.logger.infoPair("heather:","passed logic") 789 674 790 675 sql = "INSERT INTO batch ( \ … … 803 688 '" + self.skychunk.name + "' \ 804 689 )" 805 self.logger.infoPair("heather:","sql") 690 691 # self.logger.infoPair("heather:",sql) 806 692 807 693 self.execute(sql) … … 816 702 except: 817 703 self.logger.exception("Unable to get batch ID") 704 self.logger.infoPair("sql:",sql) 705 818 706 batchID = -1 819 707 … … 1017 905 response = raw_input(" * Name for new skychunk, or existing skychunk to edit? ") 1018 906 if response == "": return 907 response = response.strip() 1019 908 self.skychunk.name = response 1020 909 … … 1056 945 question = question + "? " 1057 946 response = raw_input(question) 947 response = response.strip() 948 print "response: '" + response + "'" 1058 949 if response != "": 1059 950 if type.find("varchar") != -1 or type.find("timestamp") != -1: quotes = "'" 1060 951 else: quotes = "" 952 # print "UPDATE skychunk SET " + field + " = " + quotes + response + quotes + " WHERE name = '" + self.skychunk.name + "'" 1061 953 self.execute("UPDATE skychunk SET " + field + " = " + quotes + response + quotes + " WHERE name = '" + self.skychunk.name + "'") 1062 954 … … 1129 1021 1130 1022 self.skychunk.batchTypes = [] 1131 try: 1132 rs = self.executeQuery(sql) 1133 1134 rs.first() 1135 self.skychunk.datastoreProduct = rs.getString(1) 1136 self.skychunk.datastoreType = rs.getString(2) 1137 if rs.getInt(3) == 1: self.skychunk.datastorePublishing = True 1138 else: self.skychunk.datastorePublishing = False 1139 self.skychunk.dvoLabel = rs.getString(4) 1140 1141 # if dvoLabel is null is can break queries, so set to something 1142 if not self.skychunk.dvoLabel: self.skychunk.dvoLabel = "none" 1143 self.skychunk.dvoLocation = rs.getString(5) 1144 self.skychunk.minRa = rs.getDouble(6) 1145 self.skychunk.maxRa = rs.getDouble(7) 1146 self.skychunk.minDec = rs.getDouble(8) 1147 self.skychunk.maxDec = rs.getDouble(9) 1148 1149 self.skychunk.boxSize = rs.getDouble(10) 1150 # self.skychunk.halfBox = self.skychunk.boxSize/2.0 1151 # self.skychunk.boxSizeWithBorder = self.skychunk.boxSize + (self.skychunk.BORDER * 2) 1152 1153 self.skychunk.basePath = rs.getString(11) 1154 self.skychunk.dataRelease = rs.getInt(12) 1155 1156 self.skychunk.deleteLocal = rs.getInt(13) 1157 self.skychunk.deleteDatastore = rs.getInt(14) 1158 self.skychunk.deleteDxLayer = rs.getInt(15) 1159 1160 self.skychunk.epoch = rs.getString(16) 1161 1162 self.skychunk.survey = rs.getString(17) 1163 self.skychunk.pspsSurvey = rs.getString(18) 1164 1165 if rs.getInt(19) == 1: self.skychunk.batchTypes.append("P2") 1166 if rs.getInt(20) == 1: self.skychunk.batchTypes.append("ST") 1167 if rs.getInt(21) == 1: self.skychunk.batchTypes.append("OB") 1168 if rs.getInt(22) == 1: self.skychunk.batchTypes.append("DF") 1169 if rs.getInt(23) == 1: self.skychunk.batchTypes.append("DO") 1170 if rs.getInt(24) == 1: self.skychunk.batchTypes.append("FW") 1171 if rs.getInt(25) == 1: self.skychunk.batchTypes.append("FO") 1172 1173 self.skychunk.force = True # TODO 1174 self.skychunk.parallel = False # TODO 1175 1176 if rs.getInt(26) == 1: self.skychunk.parallel = True 1177 self.skychunk.P2_smf_version = rs.getString(27) 1178 self.skychunk.ST_cmf_version = rs.getString(28) 1179 self.skychunk.trange_start = rs.getString(29) 1180 self.skychunk.trange_end = rs.getString(30) 1181 1182 if self.skychunk.parallel: print "USING parallel" 1183 self.skychunk.isLoaded = True 1184 except: 1023 1024 try: 1025 rs = self.executeQuery("show tables") 1026 except: 1027 self.logger.errorPair("cannot even show tables?", "Boo hoo") 1028 sys.exit(1) 1029 1030 # while rs.next(): 1031 # print "table row" 1032 # print rs.getString(1) 1033 # 1034 # rs = self.executeQuery("select * from skychunk") 1035 # while rs.next(): 1036 # print "skychunk row" 1037 # print rs.getString(1) 1038 # 1039 # print "SQL (2): " + sql 1040 1041 try: 1042 rs = self.executeQuery(sql) 1043 1044 except: 1045 self.logger.errorPair("problem with sql query for skychunk", sql) 1046 sys.exit(1) 1047 1048 if not rs.next(): 1185 1049 self.logger.errorPair("Could not read skychunk with name", self.skychunk.name) 1186 1050 self.skychunk.isLoaded = False 1051 return False 1052 1053 self.skychunk.datastoreProduct = rs.getString(1) 1054 self.skychunk.datastoreType = rs.getString(2) 1055 if rs.getInt(3) == 1: self.skychunk.datastorePublishing = True 1056 else: self.skychunk.datastorePublishing = False 1057 self.skychunk.dvoLabel = rs.getString(4) 1058 1059 # if dvoLabel is null is can break queries, so set to something 1060 if not self.skychunk.dvoLabel: self.skychunk.dvoLabel = "none" 1061 self.skychunk.dvoLocation = rs.getString(5) 1062 self.skychunk.minRa = rs.getDouble(6) 1063 self.skychunk.maxRa = rs.getDouble(7) 1064 self.skychunk.minDec = rs.getDouble(8) 1065 self.skychunk.maxDec = rs.getDouble(9) 1066 1067 self.skychunk.boxSize = rs.getDouble(10) 1068 # self.skychunk.halfBox = self.skychunk.boxSize/2.0 1069 # self.skychunk.boxSizeWithBorder = self.skychunk.boxSize + (self.skychunk.BORDER * 2) 1070 1071 self.skychunk.basePath = rs.getString(11) 1072 self.skychunk.dataRelease = rs.getInt(12) 1073 1074 self.skychunk.deleteLocal = rs.getInt(13) 1075 self.skychunk.deleteDatastore = rs.getInt(14) 1076 self.skychunk.deleteDxLayer = rs.getInt(15) 1077 1078 self.skychunk.epoch = rs.getString(16) 1079 1080 self.skychunk.survey = rs.getString(17) 1081 self.skychunk.pspsSurvey = rs.getString(18) 1082 1083 if rs.getInt(19) == 1: self.skychunk.batchTypes.append("P2") 1084 if rs.getInt(20) == 1: self.skychunk.batchTypes.append("ST") 1085 if rs.getInt(21) == 1: self.skychunk.batchTypes.append("OB") 1086 if rs.getInt(22) == 1: self.skychunk.batchTypes.append("DF") 1087 if rs.getInt(23) == 1: self.skychunk.batchTypes.append("DO") 1088 if rs.getInt(24) == 1: self.skychunk.batchTypes.append("FW") 1089 if rs.getInt(25) == 1: self.skychunk.batchTypes.append("FO") 1090 1091 self.skychunk.force = True # TODO 1092 self.skychunk.parallel = False # TODO 1093 1094 if rs.getInt(26) == 1: self.skychunk.parallel = True 1095 self.skychunk.P2_smf_version = rs.getString(27) 1096 # options: "use_new", "not_reproc", "use_original" 1097 1098 self.skychunk.ST_cmf_version = rs.getString(28) 1099 # options: not used? 1100 1101 # NOTE : the rs (ResultSet) class is somewhat annoying and finicky esp about timestamp values 1102 # for example, rs.getTimestamp(N) fails silently if the value is not a valid time. this is 1103 # also true to getString() on the same column. Docs for the ResultSet class can be found here: 1104 # http://docs.oracle.com/javase/1.5.0/docs/api/java/sql/ResultSet.html?is-external=true 1105 1106 # rsmd = rs.getMetaData() 1107 # print "got metadata" 1108 # print "number of columns: " + str(rsmd.getColumnCount()) 1109 # print "label: " + rsmd.getColumnLabel(25) 1110 # print "label: " + rsmd.getColumnLabel(29) 1111 # print "name: " + rsmd.getColumnName(25) 1112 # print "name: " + rsmd.getColumnName(29) 1113 # print "typename: " + rsmd.getColumnTypeName(25) 1114 # print "typename: " + rsmd.getColumnTypeName(29) 1115 1116 # epochStamp = rs.getTimestamp(16) 1117 # print "epoch: " + epochStamp.toString() 1118 1119 # print "label: " + rsmd.getColumnLabel(16) 1120 # print "name: " + rsmd.getColumnName(16) 1121 # print "typename: " + rsmd.getColumnTypeName(16) 1122 1123 self.skychunk.trange_start = rs.getString(29) 1124 # options: "YYYY-MM-DD,HH:MM:SS" or "0000-00-00 00:00:00" 1125 1126 self.skychunk.trange_end = rs.getString(30) 1127 # options: "YYYY-MM-DD,HH:MM:SS" or "0000-00-00 00:00:00" 1128 1129 if self.skychunk.parallel: print "USING parallel" 1130 self.skychunk.isLoaded = True 1187 1131 1188 1132 return self.skychunk.isLoaded … … 1308 1252 def insertPending(self, box_id, batchType, ids): 1309 1253 1310 #print "starting insert pending"1254 print "starting insert pending" 1311 1255 1312 1256 # first delete old pending items … … 1314 1258 WHERE box_id = " + str(box_id) + " \ 1315 1259 AND batch_type = '" + batchType + "'") 1316 1317 # print "deleted old items"1318 1260 1319 1261 for id in ids: … … 1323 1265 (" + str(box_id) + ", '" + batchType + "', " + str(id) + ")" 1324 1266 1325 # print "sql: ", sql 1326 self.execute(sql) 1267 try: self.execute(sql) 1268 except: 1269 print "failed to insert into pending" 1270 print "sql: " + sql 1271 raise 1327 1272 1328 1273 ''' … … 1348 1293 GROUP BY ra_center LIMIT 1" 1349 1294 1350 # XXX print "sql: ", sql 1351 1352 try: 1353 rs = self.executeQuery(sql) 1354 rs.first() 1355 raCenter = rs.getFloat(1) 1356 rs.close() 1357 except: 1358 self.logger.errorPair("No free stripes", "trying to steal boxes from other client...") 1359 sql = "SELECT ra_center, COUNT(*) AS numPending\ 1360 FROM box \ 1361 JOIN pending ON (id = box_id) \ 1362 WHERE skychunk = '" + self.skychunk.name + "' \ 1363 AND batch_type = '" + batchType + "' \ 1364 GROUP BY ra_center \ 1365 ORDER BY numPending \ 1366 DESC LIMIT 1" 1367 1368 try: 1369 rs = self.executeQuery(sql) 1370 rs.first() 1371 raCenter = rs.getFloat(1) 1372 rs.close() 1373 except: 1374 self.logger.errorPair("No outstanding boxes to steal", "no stripe obtained") 1295 try: rs = self.executeQuery(sql) 1296 except: 1297 self.logger.errorPair("Problem querying for outstanding boxes", "error in sql?") 1298 print "SQL: " + sql 1299 1300 if not rs.next(): 1301 self.logger.errorPair("No outstanding boxes", "nothing to do here") 1302 rs.close() 1303 self.unlockTables() 1304 return ids 1305 1306 raCenter = rs.getFloat(1) 1307 rs.close() 1308 1309 #except: 1310 # self.logger.errorPair("No free stripes", "trying to steal boxes from other client...") 1311 # sql = "SELECT ra_center, COUNT(*) AS numPending\ 1312 # FROM box \ 1313 # JOIN pending ON (id = box_id) \ 1314 # WHERE skychunk = '" + self.skychunk.name + "' \ 1315 # AND batch_type = '" + batchType + "' \ 1316 # GROUP BY ra_center \ 1317 # ORDER BY numPending \ 1318 # DESC LIMIT 1" 1319 # 1320 # try: 1321 # rs = self.executeQuery(sql) 1322 # rs.first() 1323 # raCenter = rs.getFloat(1) 1324 # rs.close() 1325 # except: 1326 # self.logger.errorPair("No outstanding boxes to steal", "no stripe obtained") 1375 1327 1376 1328 # tests against ra_center need to use a finite box … … 1421 1373 GROUP BY ra_center LIMIT 1" 1422 1374 1423 try: 1424 rs = self.executeQuery(sql) 1425 rs.first() 1426 raCenter = rs.getFloat(1) 1427 rs.close() 1428 except: 1429 self.logger.errorPair("No free stripes", "trying to steal boxes from other client...") 1430 sql = "SELECT ra_center, COUNT(*) AS numPending\ 1431 FROM box \ 1432 JOIN pending ON (id = box_id) \ 1433 WHERE skychunk = '" + self.skychunk.name + "' \ 1434 GROUP BY ra_center \ 1435 ORDER BY numPending \ 1436 DESC LIMIT 1" 1437 1438 try: 1439 rs = self.executeQuery(sql) 1440 rs.first() 1441 raCenter = rs.getFloat(1) 1442 rs.close() 1443 except: 1444 self.logger.errorPair("No outstanding boxes to steal", "no stripe obtained") 1375 try: rs = self.executeQuery(sql) 1376 except: 1377 self.logger.errorPair("Problem querying for outstanding boxes", "error in sql?") 1378 print "SQL: " + sql 1379 1380 if not rs.next(): 1381 self.logger.infoPair("No outstanding boxes", "nothing to do here") 1382 rs.close() 1383 self.unlockTables() 1384 return ids 1385 1386 raCenter = rs.getFloat(1) 1387 rs.close() 1388 1389 # except: 1390 # self.logger.errorPair("No free stripes", "trying to steal boxes from other client...") 1391 # sql = "SELECT ra_center, COUNT(*) AS numPending\ 1392 # FROM box \ 1393 # JOIN pending ON (id = box_id) \ 1394 # WHERE skychunk = '" + self.skychunk.name + "' \ 1395 # GROUP BY ra_center \ 1396 # ORDER BY numPending \ 1397 # DESC LIMIT 1" 1398 # 1399 # try: 1400 # rs = self.executeQuery(sql) 1401 # rs.first() 1402 # raCenter = rs.getFloat(1) 1403 # rs.close() 1404 # except: 1405 # self.logger.errorPair("No outstanding boxes to steal", "no stripe obtained") 1445 1406 1446 1407 # tests against ra_center need to use a finite box … … 1634 1595 self.logger.infoPair("Items written to Db", "%d" % count) 1635 1596 1636 # try:1637 # self.execute("DROP TABLE all_pending_alt")1638 # except:1639 # pass1640 #1641 # self.execute("CREATE TABLE all_pending_alt (stage_id bigint(20), ra_bore float, dec_bore float)")1642 # for row in rows:1643 # try:1644 # sql = "INSERT INTO all_pending_alt (stage_id, ra_bore, dec_bore) \1645 # VALUES (%d, %f, %f)" % (row[0], row[1], row[2])1646 # print "sql 2: ", sql1647 # self.execute(sql)1648 # except: continue1649 #1650 # count = self.getRowCount("all_pending_alt")1651 # self.logger.infoPair("Items written to alt Db", "%d" % count)1652 1653 1654 1597 ''' 1655 1598 Creates a temporary table and shoves it full of stage_ids and ra/dec coords … … 1670 1613 VALUES ( '" + dvo_db + "', '" + batchType + "'," + str(row[0]) + "," + str(row[1]) + " \ 1671 1614 , " + str(row[2]) + ", '"+uniq+"' ) " 1672 print "sql 1: ", sql1615 # print "sql 1: ", sql 1673 1616 self.execute(sql) 1674 1617 except: continue … … 1683 1626 ''' 1684 1627 def getItemsInThisBox(self, minRA, maxRA, minDEC, maxDEC): 1685 1686 # the old linear size is ill-conceived1687 ## XX halfSide = self.skychunk.boxSize/2.01688 ## XX minRa = ra-halfSide1689 ## XX maxRa = ra+halfSide1690 ## XX minDec = dec-halfSide1691 ## XX maxDec = dec+halfSide1692 1628 1693 1629 ids = [] … … 1698 1634 AND dec_bore BETWEEN " + str(minDEC) + " AND " + str(maxDEC) 1699 1635 1700 print "items: ", sql 1701 1702 try: 1703 rs = self.executeQuery(sql) 1704 while (rs.next()): ids.append(rs.getInt(1)) 1636 try: 1637 rs = self.executeQuery(sql) 1638 while (rs.next()): 1639 ids.append(rs.getInt(1)) 1705 1640 except: 1706 1641 self.logger.errorPair("Can't get items in this box", sql)
Note:
See TracChangeset
for help on using the changeset viewer.
