implementation/primaries/ExtractMetadata/classes/MusicManager.py
import os
import shutil
import zipfile
from xml.sax._exceptions import *
import requests.exceptions
from implementation.primaries.ExtractMetadata.classes import MusicData, MetaParser, OnlineMetaParser
from implementation.primaries.ImportOnlineDBs.classes import ApiManager
from implementation.primaries.ExtractMetadata.classes.DataLayer.helpers import filter_dict
from implementation.primaries.ExtractMetadata.classes.helpers import get_set_of_dict_values
from MuseParse.classes.Output import LilypondOutput
from MuseParse.classes import Exceptions
from MuseParse.classes.Input import MxmlParser
from implementation.primaries.globals import LOG_NAME
import logging
from .DataLayer.exceptions import BadPieceException, BadTableException
logger = logging.getLogger(LOG_NAME)
def col_or_none(data, col):
if len(data) > 0:
return data[0][col]
class Unzipper(object):
"""
This class pretty much does what it says on the tin - takes a list of input files and unzips them all.
Works only with mxl which is the default zip type for music xml
"""
def __init__(
self,
folder="/Users/charlottegodley/PycharmProjects/FYP",
files=[]):
self.folder = folder
"""path to the folder where the music collection is stored"""
self.files = files
"""list of mxl files to unzip"""
def createOutputList(self):
"""
Takes the file list (self.file) and produces their outputs with the xml extension
Return value: list of xml files
"""
result = [file.split('.')[0] + ".xml" for file in self.files]
return result
def unzipInputFiles(self):
'''
Method which takes self.files and iterates each one, producing a ZipFile class and extracting the files.
It will then remove the unnecessary meta-inf folder and add the file to the result list if it managed to unzip
it without issues.
Return value: list of unzipped xml files
'''
resulting_file_list = []
for file in self.files:
path = os.path.join(self.folder, file)
if os.path.exists(path):
try:
zip_file = zipfile.ZipFile(path)
zip_file.extractall(path=self.folder)
file = list(
filter(
lambda k: "META-INF" not in k.filename,
zip_file.filelist))
file = file[0].filename
resulting_file_list.append(file)
zip_file.close()
except Exception as e:
logging.log(
logging.ERROR,
"file " +
file +
" was skipped: " +
str(e))
return resulting_file_list
def unzip(self):
"""
Method which pulls together the above two methods and renames each output file to match what it should be
on input.
Return value: None
"""
output_list = self.createOutputList()
results = self.unzipInputFiles()
for expected, result in zip(output_list, results):
output_path = os.path.join(self.folder, expected)
result_path = os.path.join(self.folder, result)
self.rename_output(result_path, output_path)
if os.path.exists(os.path.join(self.folder, 'META-INF')):
shutil.rmtree(os.path.join(self.folder, 'META-INF'))
def rename_output(self, input_path, output_path):
if input_path != output_path and os.path.exists(input_path):
if os.path.exists(output_path):
os.remove(output_path)
try:
os.rename(input_path, output_path)
except Exception as e:
logger.exception(
"File %s was skipped from renaming: %s" %
(input_path, str(e)))
class FolderBrowser(object):
"""
Class which takes a folder and a list of files in the database and produces 3 lists:
- new files: files which aren't in the database but exist in the folder
- old files: files which are in the database but no longer exist in the folder
- zip files: files which end in the extension .mxl
Works only with xml files and mxl files.
"""
def __init__(
self,
db_files=[],
folder='/Users/charlottegodley/PycharmProjects/FYP'):
self.db_files = db_files
"""A list of files in the database"""
self.folder = folder
"""the folder in which the collection is stored"""
def resetDbFileList(self, files):
self.db_files = files
def getFolderFiles(self, extensions=['xml', 'mxl']):
"""
method to search the given folder for all xml and mxl files
Return value: dictionary containing 2 optional indexes - xml and mxl depending whether any exist of either type
"""
folder_files = {}
for root, dirs, files in os.walk(self.folder):
index = len(self.folder)
substr = root[index + 1:]
for file in files:
ending = file.split(".")[-1]
if ending in extensions:
if ending not in folder_files:
folder_files[ending] = []
folder_files[ending].append(os.path.join(substr, file))
return folder_files
def getZipFiles(self):
"""Method which takes the result of the above method and returns only the zip files from that method"""
files = self.getFolderFiles()
if "mxl" in files:
return files["mxl"]
def getNewFileList(self, files):
"""
method to determine from a list of collected xml files from getFolderFiles which ones are new to the DB
Return value: list of file names which aren't in the db
"""
new_files = []
if "xml" in files:
xml_files = files["xml"]
new_files = [f for f in xml_files if f not in self.db_files]
return new_files
def getOldRecords(self, files):
"""
method to determine from a list of xml files from getFolderFiles which ones in the DB no longer exist in this
folder.
Return value: list of file names which are in the db but don't exist
"""
old_files = []
if "xml" in files:
xml_files = files["xml"]
old_files = [f for f in self.db_files if f not in xml_files]
return old_files
def getNewAndOldFiles(self, files):
"""
method which will do both of the above methods without calling self.getFolderFiles twice
which is probably inefficient
Return value: dict containing new and old files separated by relevant indices
"""
result_set = {
"new": self.getNewFileList(files),
"old": self.getOldRecords(files)}
return result_set
class SearchLayer(MusicData):
def __init__(self, folder, database):
super().__init__(database)
self.folder = folder
def getPlaylistsFromPlaylistTable(self):
data = self.getAllUserPlaylists()
return data
def addPlaylist(self, data):
self.addPlaylist(data["name"], data["pieces"])
def deletePlaylists(self, names):
[self.deletePlaylist(name) for name in names]
def handleTextQueries(self, search_data, online=False):
# check title, composer, lyricist, instruments for matches
results = {}
all_matched = True
instruments = self.get_instrument_names()
instrument_list = []
for value in search_data["text"]:
combined = {}
file_result = self.query(
likedata={
'filename': '%{}%'.format(value)},
table='pieces')
combined["filename"] = file_result
title_result = self.getPieceByTitle(
value, online=online)
combined["Title"] = title_result
composer_result = self.get_pieces_by_creator(
value, online=online)
combined["Composer"] = composer_result
lyricist_result = self.get_pieces_by_creator(
value, online=online, creator_type='lyricist')
combined["Lyricist"] = lyricist_result
if value in instruments:
instrument_list.append(value)
combined = filter_dict(combined)
if len(combined) > 0:
results.update(combined)
else:
all_matched = False
if len(search_data['text']) == len(instrument_list):
all_matched = True
if len(instrument_list) > 0:
instrument_result = self.get_pieces_by_any_all_instruments(
instrument_list, online=online)
results.update(instrument_result)
if "All Instruments" not in results:
all_matched = False
return results, all_matched
def handleInstrumentQueries(self, search_data, online=False):
results = {}
all_matched = True
result_data = {}
for instrument in search_data["instrument"]:
if "key" in search_data:
if instrument not in search_data["key"]:
result_data[instrument] = search_data[
"instrument"][instrument]
if "clef" in search_data:
if instrument not in search_data["clef"]:
result_data[instrument] = search_data[
"instrument"][instrument]
if "key" not in search_data and "clef" not in search_data:
result_data = search_data["instrument"]
if len(result_data) > 0:
instrument_data = self.get_pieces_by_instruments(
result_data, online=online)
results, all_matched = self.create_results(
["Instruments"], [instrument_data])
return results, all_matched
def handleTempoQueries(self, search_data, online=False):
tempo_data = self.get_piece_by_tempo(
search_data["tempo"], online=online)
return self.create_results(["Tempo"], tempo_data)
def handleTimeQueries(self, search_data, online=False):
time_data = self.getPieceByMeter(
search_data["time"], online=online)
return self.create_results(["Meter/Time signature"], [time_data])
def handle_clef_or_key_queries(
self,
search_data,
online=False,
query='keys'):
keys = []
data = []
if "other" in search_data[query]:
keydata = self.get_piece_by_join(
search_data[query]["other"], query)
search_data[query].pop("other")
data.append(keydata)
keys.append(query.capitalize())
if len(search_data[query]) > 0:
instrument_data = self.piece_by_ins_in_(
search_data[query], table=query)
data.append(instrument_data)
keys.append("Instruments in {}".format(query.capitalize()))
return self.create_results(keys, data)
def piece_by_ins_in_(self, data, table='clefs'):
queries = []
for name in data:
ins_id_row = self.query({"name": name}, table='instruments')
ins_id = col_or_none(ins_id_row, 'id')
for elem in data[name]:
elem_id_row = self.query(elem, table=table)
elem_id = col_or_none(elem_id_row, 'id')
queries.append({'instruments.id': [ins_id],
'{}.id'.format(table): [elem_id]})
row_ids = self.query_multiple(
queries, table="{}_ins_piece".format(table))
fnames = self.get_pieces_by_row_id(row_ids)
return fnames
def handleTranspositionQueries(self, search_data, online=False):
results = self.fetch_results(
search_data["transposition"],
"Instrument or transposition",
self.getPieceByInstrumentsOrSimilar,
online=online)
return self.create_results(results.keys(), results.items())
def create_results(self, keys, values, method=lambda n: len(n) > 0):
results = {}
all_matched = True
for key, value in zip(keys, values):
if method(value):
results[key] = value
else:
all_matched = False
return results, all_matched
def handleFilenameQueries(self, search_data, online=False):
results = {}
all_matched = True
files = self.get_file_list(online=online)
result_files = [filename for filename in search_data[
"filename"] if filename in files]
if len(result_files) > 0:
results["Filename"] = result_files
else:
all_matched = False
return results, all_matched
def fetch_results(self, data, key, method, *args, **kwargs):
results = {}
for elem in data:
files = method(elem, *args, **kwargs)
if len(files) > 0:
results["{}: {}".format(key, elem)] = files
return results
def fetch_and_form_results(self, data, key, method, *args, **kwargs):
files = self.fetch_results(data, key, method, *args, **kwargs)
return self.create_results(files.keys(), files.values())
def handle_bibliography_queries(self, data, query='creator', online=False):
method = self.get_pieces_by_creator
if query == 'title':
method = self.getPieceByTitle
return self.fetch_and_form_results(
data[query],
query.capitalize(),
method,
creator_type=query,
online=False)
def getPieceSummary(self, file_list, sort_method="title", online=False):
info = self.get_all_piece_info(file_list, online=online)
ids = ["title", "composer", "lyricist", "filename"]
summary_strings = []
for elem in info:
entry = " ".join(["{}: {}".format(key, elem[key])
for key in ids if key in elem and elem[key] != ''])
summary_strings.append((entry, elem['filename']))
return summary_strings
def runQueries(self, search_data, online=False):
results = {}
all_matched = True
method_table = {
"text": self.handleTextQueries,
"instrument": self.handleInstrumentQueries,
"tempo": self.handleTempoQueries,
"time": self.handleTimeQueries,
"transposition": self.handleTranspositionQueries,
"filename": self.handleFilenameQueries}
simpler_method_table = {"title": self.handle_bibliography_queries,
"lyricist": self.handle_bibliography_queries,
"composer": self.handle_bibliography_queries,
"clefs": self.handle_clef_or_key_queries,
"keys": self.handle_clef_or_key_queries}
for key in search_data:
if key in simpler_method_table:
key_result, all_matched = simpler_method_table[
key](search_data, query=key, online=online)
else:
key_result, all_matched = method_table[
key](search_data, online=online)
results.update(key_result)
summaries = {}
if all_matched:
intersection = set.intersection(
*get_set_of_dict_values(results))
results["Exact Matches"] = intersection
for key in results:
summaries[key] = self.getPieceSummary(
filter(None, results[key]), online=online)
return summaries
def getPlaylistFileInfo(self, playlist):
data = self.get_all_piece_info(playlist)
return data
def getFileInfo(self, filename):
data = self.get_all_piece_info([filename])
return data
def updatePlaylistTitle(self, new_title, old_title):
row_id = self.getUserPlaylist(old_title)
data = {"title": new_title}
self.updateUserPlaylist(row_id, data)
def getPlaylistByFilename(self, filename):
data = self.getUserPlaylistsForFile(filename)
return data
def getPlaylists(self, select_method="all"):
result_set = {}
elem_ids = ["composers", "lyricists"]
if select_method == "all":
clefs = self.get_piece_by_all_(elem='clefs')
keys = self.get_piece_by_all_(elem='keys')
composers = self.get_piece_by_all_creators(elem='composers')
lyricists = self.get_piece_by_all_creators(elem='lyricists')
instruments = self.get_piece_by_all_('instruments')
timesigs = self.get_piece_by_all_('time_signatures')
tempos = self.get_piece_by_all_('tempos')
result_set["clefs"] = clefs
result_set["keys"] = keys
result_set["composers"] = composers
result_set["lyricists"] = lyricists
result_set["instruments"] = instruments
result_set["time_signatures"] = timesigs
result_set["tempos"] = tempos
else:
if select_method not in elem_ids:
result_set[select_method] = self.get_piece_by_all_(
elem=select_method)
else:
result_set[select_method] = self.get_piece_by_all_creators(
elem=select_method)
return filter_dict(result_set)
class MusicManager(SearchLayer):
"""
Grand master class which pulls together features from every other class. This class is instantiated by the Application
class and should provide methods for the application to access everything else, from rendering to info extraction
to API access.
"""
def __init__(
self,
parent,
folder='/Users/charlottegodley/PycharmProjects/FYP',
db='sqlite:///:memory:'):
super(MusicManager, self).__init__(folder, db)
self.parent = parent
"""the application instance in which this manager resides"""
self.wifi = True
self.apiManager = ApiManager.ApiManager(folder=self.folder)
self.setupFolderBrowser()
def updateWifi(self, wifi):
self.apiManager.wifi = wifi
def addInstruments(self, data):
self.addInstruments(data)
def startRenderingTask(self, fname):
"""
method which parses a piece, then runs the renderer class on it which takes the lilypond
output, runs lilypond on it and gets the pdf. This is not generally called directly,
but rather called by a thread class in thread_classes.py
* Parameter fname: xml filename
* Return value: list of problems encountered
"""
errorList = []
parser = MxmlParser.MxmlParser()
piece_obj = None
path_to_file = os.path.join(self.folder, fname)
try:
piece_obj = parser.parse(path_to_file)
except Exceptions.DrumNotImplementedException as e:
errorList.append(
"Drum tab found in piece: this application does not handle drum tab.")
logger.exception(
"Drum tab found in piece:{} - {}".format(fname, str(e)))
except Exceptions.TabNotImplementedException as e:
errorList.append(
"Guitar tab found in this piece: this application does not handle guitar tab.")
logger.exception(
"Guitar tab found in piece:{} - {}".format(fname, str(e)))
except SAXParseException as e:
errorList.append(
"Sax parser had a problem with this file:" + str(e))
logger.exception(
"Exception SAX parsing file:{} - {}".format(fname, str(e)))
try:
loader = LilypondOutput.LilypondRenderer(
piece_obj,
os.path.join(
self.folder,
fname))
loader.run()
except BaseException as e:
errorList.append(str(e))
logger.exception(
"Exception rendering lilypond with file:{} - {}".format(fname, str(e)))
return errorList
def unzipApiFiles(self, data_set):
"""
method to download API files and unzip them as necessary
:return: dictionary of results indexed by source name
"""
results = {}
try:
file_set = self.apiManager.downloadFiles(data_set)
self.handleZips()
for source in file_set:
results[source] = []
for file in file_set[source]:
file_path = os.path.join(self.folder, file)
if os.path.exists(file_path):
os.remove(file_path)
n_filename = file.split(".")[0] + ".xml"
results[source].append(n_filename)
except requests.exceptions.ConnectionError as e:
logger.exception(
"Exception connecting to api to download files:{}".format(
str(e)))
return results
def getNewFiles(self):
cleaned_set = self.apiManager.fetchAllData()
filelist = self.get_file_list(online=True)
for file in filelist:
source = self.get_value_for_filename(file, "source")[0]
id = file.split(".")[0]
if id in cleaned_set[source]:
cleaned_set[source].pop(id)
return cleaned_set
def parseApiFiles(self, debug=False):
"""
method to extract data from apis and parse each created file for metadata
:return: dictionary of data indexed by filename
"""
parsing_errors = {}
result_set = {}
try:
new_files = self.getNewFiles()
file_set = self.unzipApiFiles(new_files)
for source in file_set:
result_set[source] = {}
for file in file_set[source]:
ignore_list = self.apiManager.getSourceIgnoreList(source)
parser = OnlineMetaParser.OnlineMetaParser(
ignored=ignore_list, source=source)
data = self.parseXMLFile(file, parser=parser)
# path_to_file = os.path.join(self.folder, file)
# if os.path.exists(path_to_file):
# os.remove(path_to_file)
if not isinstance(data, tuple):
result_set[source][file] = data
file_id = file.split("/")[-1].split(".")[0]
result_set[source][file].update(
new_files[source][file_id])
else:
parsing_errors[data[1]] = data[0]
except requests.exceptions.ConnectionError as e:
parsing_errors[
"Connection"] = "error connecting to the internet. Sources not refreshed."
self.log_errors(parsing_errors)
return result_set
def log_errors(self, errors):
if len(errors) > 0:
if self.parent is not None:
self.parent.updateStatusBar(
"Errors updating database. Contact developer if problem persists")
for error in errors:
logger.error("Error {} : {}".format(error, errors[error]))
def addApiFiles(self, data):
for source in data:
for file in data[source]:
self.addPiece(file, data[source][file])
def cleanupApiFiles(self, data, extensions=['mxl', 'xml']):
for source in data:
for file in data[source]:
for ext in extensions:
file_ext = file.split(".")[0] + "." + ext
if os.path.exists(os.path.join(self.folder, file_ext)):
os.remove(os.path.join(self.folder, file_ext))
def downloadFile(self, filename):
file_info = filename.split(".")
fname = file_info[0]
source = self.get_value_for_filename(filename, 'source')
if source is not None:
source = source['source']
secret = self.get_value_for_filename(filename, 'secret')
if secret is not None:
secret = secret['secret']
try:
status_code = self.apiManager.downloadFile(
source=source, file=fname, secret=secret, extension='pdf')
if status_code == 200:
self.update_piece(filename, {'source': 'local'})
return True
except requests.exceptions.ConnectionError as e:
logger.exception(
"Error downloading file - {} exception {}".format(filename, str(e)))
return False
def runApiOperation(self):
"""
method which gets all the data from the apis, unzips them,
parses them for data, puts the data in the database and finally
deletes the files
:return:
"""
result_set = self.parseApiFiles()
self.addApiFiles(result_set)
self.cleanupApiFiles(result_set)
def getPieceInfo(self, filenames):
return self.get_all_piece_info(filenames)
def setupFolderBrowser(self):
db_files = self.get_file_list()
self.folder_browser = FolderBrowser(
db_files=db_files,
folder=self.folder)
def handleZips(self):
zip_files = self.folder_browser.getZipFiles()
if zip_files is not None:
unzipper = Unzipper(folder=self.folder, files=zip_files)
unzipper.unzip()
def refresh(self):
if self.wifi:
self.runApiOperation()
errors = self.refreshWithoutDownload()
return errors
def refreshWithoutDownload(self):
db_files = self.get_file_list()
self.folder_browser.resetDbFileList(db_files)
self.handleZips()
errors = self.handleXMLFiles()
return errors
def getLicense(self, filename):
result = self.get_value_for_filename(filename, 'license')
# eventually we should open up a file and get the text based on the license name,
# but for now we need to do this
if result is not None:
result = result['license']
folder = '/users/charlottegodley/PycharmProjects/FYP/implementation/primaries' \
'/ImportOnlineDBs/licenses'
file = os.path.join(folder, result)
if os.path.exists(file):
fob = open(file, 'r')
lines = fob.readlines()
result = "\n".join(lines)
return result
def getPieceSummaryStrings(self, sort_method="title"):
file_list = self.get_file_list()
summary_strings = self.getPieceSummary(
file_list,
sort_method=sort_method)
return summary_strings
def parseOldFiles(self, file_list):
"""
method to remove or archive all the files in the list within the db
:param file_list: files to archive
:return: None
"""
for elem in file_list:
self.update_piece(elem, {'archived': True})
def parseXMLFile(self, filename, parser=None):
errorTuple = []
if parser is None:
parser = MetaParser.MetaParser()
try:
data_set = parser.parse(os.path.join(self.folder, filename))
return data_set
except Exception as e:
errorTuple.append(str(e))
errorTuple.append(filename)
logger.exception(
"Exception parsing {} - {}".format(filename, str(e)))
return tuple(errorTuple)
def parseError(self, exception):
string_val = str(exception)
self.parent.errorPopup(string_val)
def parseNewFiles(self, file_list):
"""
method to call the sax parser on each of the new files then send the data to the data layer
:param file_list:
:return:
"""
errors = {'bad_piece': [], 'bad_table': [], 'other': []}
for file in file_list:
data_set = self.parseXMLFile(file)
try:
self.add_piece(file, data_set)
except BadPieceException as e:
errors['bad_piece'].append((file, str(e)))
continue
except BadTableException as e:
errors['bad_table'].append((file, str(e)))
continue
except BaseException as e:
errors['other'].append((file, str(e)))
continue
return errors
def handleXMLFiles(self):
"""
method to get all the new and old files from the folder browser and call parseNew and parseOld methods
:return:
"""
files = self.folder_browser.getNewAndOldFiles(
self.folder_browser.getFolderFiles())
errors = None
if "new" in files:
errors = self.parseNewFiles(sorted(files["new"]))
if "old" in files:
self.parseOldFiles(sorted(files["old"]))
return errors
def copyFiles(self, filenames):
"""
method to copy a list of files from one folder to another
:param filenames: list of files including extension and folder
:return: none
"""
for file in filenames:
folder_file_split = os.path.split(file)
f = folder_file_split[-1]
shutil.copyfile(file, os.path.join(self.folder, f))