apel/db/backends/mysql.py
'''
Copyright (C) 2011,2012 STFC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Created on 27 Oct 2011
@author: Will Rogers, Konrad Jopek
'''
from apel.db import ApelDbException
from apel.db.records import (BlahdRecord,
CloudRecord,
CloudSummaryRecord,
EventRecord,
GroupAttributeRecord,
JobRecord,
NormalisedSummaryRecord,
ProcessedRecord,
StorageRecord,
SummaryRecord,
SyncRecord)
import MySQLdb.cursors
import datetime
import logging
import decimal
# set up the logger
log = logging.getLogger(__name__)
# treat MySQL warnings as exceptions
# warnings.simplefilter("error", category=MySQLdb.Warning)
class ApelMysqlDb(object):
'''
MySQL implementation of the general ApelDb interface.
'''
MYSQL_TABLES = {EventRecord : 'EventRecords',
JobRecord : 'VJobRecords',
BlahdRecord : 'BlahdRecords',
SyncRecord : 'SyncRecords',
CloudRecord : 'VCloudRecords',
CloudSummaryRecord : 'VCloudSummaries',
NormalisedSummaryRecord : 'VNormalisedSummaries',
ProcessedRecord : 'VProcessedFiles',
SummaryRecord : 'VSummaries',
StorageRecord: 'VStarRecords'}
# These simply need to have the same number of arguments as the stored procedures defined in the database schemas.
INSERT_PROCEDURES = {
EventRecord : 'CALL InsertEventRecord(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)',
BlahdRecord : "CALL InsertBlahdRecord(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)",
}
REPLACE_PROCEDURES = {
EventRecord : 'CALL ReplaceEventRecord(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)',
JobRecord : "CALL ReplaceJobRecord(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)",
SummaryRecord: "CALL ReplaceSummary(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)",
NormalisedSummaryRecord: "CALL ReplaceNormalisedSummary(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)",
SyncRecord : "CALL ReplaceSyncRecord(%s, %s, %s, %s, %s, %s)",
ProcessedRecord : "CALL ReplaceProcessedFile(%s, %s, %s, %s, %s)",
CloudRecord : "CALL ReplaceCloudRecord(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)",
CloudSummaryRecord : "CALL ReplaceCloudSummaryRecord(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)",
StorageRecord: "CALL ReplaceStarRecord(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)",
GroupAttributeRecord: "CALL ReplaceGroupAttribute(%s, %s, %s)"
}
def __init__(self, host, port, username, pwd, db):
'''
Initialise variables, and define stored procedures.
'''
self._db_host = host
self._db_port = port
self._db_username = username
self._db_pwd = pwd
self._db_name = db
self._summarise_jobs_proc = "SummariseJobs"
self._normalise_summaries_proc = "NormaliseSummaries"
self._summarise_vms_proc = "SummariseVMs"
self._copy_summaries_proc = "CopyNormalisedSummaries"
self._hep_spec_hist_proc = "CreateHepSpecHistory"
self._join_records_proc = "JoinJobRecords"
self._local_jobs_proc = "LocalJobs"
self._spec_lookup_proc = "SpecLookup (%s, %s, %s, %s)"
self._spec_update_proc = "CALL SpecUpdate (%s, %s, %s, %s, %s)"
self._processed_clean = "CALL CleanProcessedFiles(%s)"
try:
self.db = MySQLdb.connect(host=self._db_host, port=self._db_port,
user=self._db_username, passwd=self._db_pwd,
db=self._db_name)
except MySQLdb.Error as e:
log.error('Error connecting to database: %s', e)
raise ApelDbException(e)
def test_connection(self):
'''
Tests the DB connection - if it fails a MySQLdb.OperationalError will be
thrown.
'''
try:
db = MySQLdb.connect(host=self._db_host, port=self._db_port,
user=self._db_username, passwd=self._db_pwd,
db=self._db_name)
log.info('Connected to %s:%s', self._db_host, self._db_port)
log.info('Database: %s; username: %s', self._db_name, self._db_username)
db.close()
except MySQLdb.OperationalError as e:
raise ApelDbException("Failed to connect to database: " + str(e))
def load_records(self, record_list, replace=True, source=None):
'''
Loads the records in the list into the DB. This is transactional -
either all or no records will be loaded. Includes the DN of the
sender.
'''
# All records in the list should be of the same type (but may not be),
# unless they are Storage or GroupAttribute records which can be mixed.
try:
record_type = type(record_list[0])
# Check that all the records are the same type as the first (except
# for Storage and GroupAttribute records).
for record in record_list:
if (type(record) != record_type and type(record) not in
(StorageRecord, GroupAttributeRecord)):
raise ApelDbException("Not all records in list are of type %s." % record_type)
if replace:
proc = self.REPLACE_PROCEDURES[record_type]
else:
proc = self.INSERT_PROCEDURES[record_type]
except KeyError:
raise ApelDbException('No procedure found for %s; replace = %s' % (record_type, replace))
except IndexError:
# no records to load
return
try:
# prevent MySQLdb from raising
# 'MySQL server has gone' exception
self._mysql_reconnect()
c = self.db.cursor(cursorclass=MySQLdb.cursors.DictCursor)
for record in record_list:
values = record.get_db_tuple(source)
log.debug('Values: %s', values)
if type(record) in (StorageRecord, GroupAttributeRecord):
# These types can be found in the same record list, so need
# to get the right proedure for each one.
proc = self.REPLACE_PROCEDURES[type(record)]
c.execute(proc, values)
self.db.commit()
except (MySQLdb.Warning, MySQLdb.Error, KeyError) as err:
log.error("Error loading records: %s", err)
log.error("Transaction will be rolled back.")
self.db.rollback()
raise ApelDbException(err)
def get_records(self, record_type, table_name=None, query=None, records_per_message=1000):
'''
Yields lists of records fetched from database of the given type. This is used
if the records are coming directly from a table or view.
'''
if table_name is None:
table_name = self.MYSQL_TABLES[record_type]
select_query = 'SELECT * FROM %s' % (table_name)
if query is not None:
select_query += query.get_where()
log.debug(select_query)
for batch in self._get_records(record_type, select_query, records_per_message):
yield batch
def get_sync_records(self, query=None, records_per_message=1000):
"""
Get sync records from the SuperSummaries table. Filter by the
provided query.
"""
if query is not None:
where = query.get_where()
else:
where = ''
select_query = ('SELECT Site, SubmitHost, sum(NumberOfJobs) '
'AS NumberOfJobs, Month, Year FROM VSuperSummaries %s '
'GROUP BY Site, SubmitHost, Month, Year ORDER BY NULL'
% where)
log.debug(select_query)
for batch in self._get_records(SyncRecord, select_query, records_per_message):
yield batch
def _get_records(self, record_type, query_string, records_per_message=1000):
record_list = []
try:
# prevent MySQLdb from raising
# 'MySQL server has gone' exception
self._mysql_reconnect()
c = self.db.cursor(cursorclass=MySQLdb.cursors.SSDictCursor)
c.execute(query_string)
while True:
for row in c.fetchmany(size=records_per_message):
record = record_type()
# row is a dictionary {Field: Value}
record.set_all(row)
record_list.append(record)
if len(record_list) > 0:
yield record_list
record_list = []
else:
break
except MySQLdb.Error as err:
log.error('Error during getting records: %s', err)
log.error('Transaction will be rolled back.')
self.db.rollback()
raise ApelDbException(err)
except MySQLdb.Warning as warning:
log.warning('Warning from MySQL: %s', warning)
def get_last_updated(self):
'''
Find the last time that messages were sent.
'''
query = 'select UpdateTime from LastUpdated where Type = "sent"'
c = self.db.cursor()
c.execute(query)
result = c.fetchone()
if result is None:
return None
else:
return result[0]
def set_updated(self):
'''
Set the current time as the last time messages were sent.
'''
log.info('Updating timestamp.')
query = 'call UpdateTimestamp("sent")'
c = self.db.cursor()
c.execute(query)
c.fetchone()
self.db.commit()
return True
def check_duplicate_sites(self):
'''
Check that records from the same site are not in both the
JobRecords table and the Summaries table.
'''
try:
# prevent MySQLdb from raising
# 'MySQL server has gone' exception
self._mysql_reconnect()
c = self.db.cursor()
c.execute("select count(*) from JobRecords j inner join Summaries s using (SiteID)")
conflict = c.fetchone()[0] > 0
if conflict:
raise ApelDbException("Records exist in both job and summary tables for the same site.")
self.db.commit()
except MySQLdb.Error as e:
log.error("A mysql error occurred: %s", e)
log.error("Any transaction will be rolled back.")
if self.db is not None:
self.db.rollback()
raise
def summarise_jobs(self):
'''
Aggregate the data from the JobRecords table and put the results in the
HybridSuperSummaries table. This method does this by calling the
SummariseJobs stored procedure.
Any failure will result in the entire transaction being rolled
back.
'''
try:
# prevent MySQLdb from raising
# 'MySQL server has gone' exception
self._mysql_reconnect()
c = self.db.cursor()
log.info("Summarising job records...")
c.callproc(self._summarise_jobs_proc, ())
log.info("Done.")
self.db.commit()
except MySQLdb.Error as e:
log.error("A mysql error occurred: %s", e)
log.error("Any transaction will be rolled back.")
if self.db is not None:
self.db.rollback()
raise
def normalise_summaries(self):
"""
Normalise data from Summaries and insert into HybridSuperSummaries.
Normalise the data from the Summaries table (calculate the normalised
wall clock and cpu duration values from the ServiceLevel fields) and put
the results in the HybridSuperSummaries table. This is done by
calling the NormaliseSummaries stored procedure.
Any failure will result in the entire transaction being rolled
back.
"""
try:
# prevent MySQLdb from raising
# 'MySQL server has gone' exception
self._mysql_reconnect()
c = self.db.cursor()
log.info("Normalising the Summary records...")
c.callproc(self._normalise_summaries_proc, ())
log.info("Done.")
self.db.commit()
except MySQLdb.Error as e:
log.error("A MySQL error occurred: %s", e)
log.error("Any transaction will be rolled back.")
if self.db is not None:
self.db.rollback()
raise
def copy_summaries(self):
'''
Copy summaries from the NormalisedSummaries table to the HybridSuperSummaries table.
'''
try:
# prevent MySQLdb from raising
# 'MySQL server has gone' exception
self._mysql_reconnect()
c = self.db.cursor()
log.info("Copying summaries...")
c.callproc(self._copy_summaries_proc)
log.info("Done.")
self.db.commit()
except MySQLdb.Error as e:
log.error("A mysql error occurred: %s", e)
log.error("Any transaction will be rolled back.")
if self.db is not None:
self.db.rollback()
raise
def summarise_cloud(self):
'''
Aggregate the CloudRecords table and put the results in the
CloudSummaries table. This method does this by calling the
SummariseVMs() stored procedure.
Any failure will result in the entire transaction being rolled
back.
'''
try:
# prevent MySQLdb from raising
# 'MySQL server has gone' exception
self._mysql_reconnect()
c = self.db.cursor()
log.info("Summarising cloud records...")
c.callproc(self._summarise_vms_proc, ())
log.info("Done.")
self.db.commit()
except MySQLdb.Error as e:
log.error("A mysql error occurred: %s", e)
log.error("Any transaction will be rolled back.")
if self.db is not None:
self.db.rollback()
raise
def join_records(self):
'''
This method executes JoinJobRecords procedure in database which joins data
from BlahdRecords table and EventRecords table into JobRecord.
For exact implementantion please read the code of JoinJobRecords procedure
from client.sql in apeldb/schema folder.
'''
try:
# prevent MySQLdb from raising
# 'MySQL server has gone' exception
self._mysql_reconnect()
c = self.db.cursor()
log.info("Creating JobRecords")
c.callproc(self._join_records_proc)
log.info("Done.")
self.db.commit()
except MySQLdb.Error as e:
log.error("A mysql error occurred: %s", e)
log.error("Any transaction will be rolled back.")
if self.db is not None:
self.db.rollback()
raise e
def create_local_jobs(self):
try:
# prevent MySQLdb from raising
# 'MySQL server has gone' exception
self._mysql_reconnect()
c = self.db.cursor()
log.info("Creating local jobs")
c.callproc(self._local_jobs_proc)
log.info("Done.")
self.db.commit()
except MySQLdb.Error as e:
log.error("A mysql error occurred: %s", e)
log.error("Any transaction will be rolled back.")
if self.db is not None:
self.db.rollback()
raise e
def update_spec(self, site, ce, spec_level_type, spec_level):
'''
This method compares the existing data from database to given values
and updates the SpecRecords table if it is neccessary.
'''
try:
# prevent MySQLdb from raising
# 'MySQL server has gone' exception
self._mysql_reconnect()
c = self.db.cursor()
c.execute("SELECT %s" % self._spec_lookup_proc,
[site, ce, spec_level_type,
datetime.datetime.now()])
# MySQL 5.1.xx (or MySQL-python 1.2.1) returns None
# directly from c.fetchone(), so we must handle it in another way
try:
old_level = c.fetchone()[0]
except TypeError:
old_level = None
# Precision limited to third decimal place is enough for our purposes.
if old_level is None:
c.execute(self._spec_update_proc,
[site, ce, spec_level_type,
datetime.datetime.fromtimestamp(0),
spec_level])
elif abs((old_level-decimal.Decimal(str(spec_level)))) > decimal.Decimal('0.001'):
c.execute(self._spec_update_proc,
[site, ce, spec_level_type,
datetime.datetime.now(), spec_level])
self.db.commit()
except MySQLdb.Error as e:
log.error("A mysql error occurred: %s", e)
log.error("Any transaction will be rolled back.")
if self.db is not None:
self.db.rollback()
def clean_processed_files(self, hostname):
'''
Cleans ProcessedFiles table from records for given host.
'''
try:
# prevent MySQLdb from raising
# 'MySQL server has gone' exception
self._mysql_reconnect()
c = self.db.cursor()
c.execute(self._processed_clean, [hostname])
self.db.commit()
except MySQLdb.Error as e:
log.error("A mysql error occurred: %s", e)
log.error("Any transaction will be rolled back.")
if self.db is not None:
self.db.rollback()
def _mysql_reconnect(self):
'''
Checks if current connection is active and
reconnects if necessary
'''
try:
self.db.ping()
except MySQLdb.Error:
try:
self.db = MySQLdb.connect(host=self._db_host, port=self._db_port,
user=self._db_username, passwd=self._db_pwd,
db=self._db_name)
except MySQLdb.Error as e:
log.error('Error connecting to database: %s', e)
raise ApelDbException(e)