The .daml files provided by Shimadzu come in an xml format
where key data pieces (chromatogram, observed m/z ratios
with their corresponding intensity and UV traces) are
encoded in base64 format.
The following script extracts the data relevant to PyParse, and reformats
it into a common structure.
import xml.etree.ElementTree as ET
import base64
import glob
import math
[docs]def init(args):
Initialise global options.
global options
options = args
[docs]def getData(input_dir):
Search the directory specified by the user to acquire any and all input
LC-MS data that is present. Format this data into the preferred PyParse format.
:param input_dir: A string specifying the directory to search for .daml files
:return: A list comprising: a dictionary of all peaks in all wells (indexed by well);
a dictionary of all chromatograms; a dictionary of all sample IDs; a dictionary
total_area_abs of each well. Each of these dictionaries are indexed by well
masterTable = {}
chroma = {}
sample_IDs = {}
total_area_abs = {}
input_files = glob.glob(input_dir + '*.daml')
for filename in input_files:
#Open the .daml file, which is structured as an xml
tree = ET.parse(filename)
root = tree.getroot()
#If the number of rows or number of columns has not yet been
#specified, determine these values from the .daml file
if options.plate_col_no == 0 or options.plate_row_no == 0:
vessels = root.find("datafile_attribs").find("Plate").find("Vessels")
#determine the number of columns/rows by how many times
#a well has the y-value = 0 or x-val = 0 respectively
for vessel in vessels.findall("Vessel"):
if int(vessel.get("x")) == 0:
options.plate_row_no += 1
if int(vessel.get("y")) == 0:
options.plate_col_no += 1
#get the well number
wellno = int(root.find("datafile_attribs").find("vial").text)
#get the sampleID
sample_IDs[wellno] = str(root.find("datafile_attribs").find("desc").text)
#Get to the results secton of the .daml file
results = root.find("results")
peaks = {}
#start by getting the peaks observed in the UV spectrum
raw_UV = results.find("lc_chros").find("chro")
for peak in raw_UV.findall("peak"):
#Fetch all data about a peak from the details section
new_entry = {}
for detail in peak.get("details").split(","):
name = detail.strip().split("=")[0]
value = detail.strip().split("=")[1]
if name == "Peak#":
new_entry["peakID"] = int(value)
elif name == "RT":
new_entry["time"] = float(value.split("mins")[0].strip())
new_entry["pStart"] = float(value.split("(")[1].split("-")[0])
new_entry["pEnd"] = float(value.split("-")[1].split("mins")[0])
elif name == "Area":
new_entry["areaAbs"] = float(value)
elif name == "Area%":
new_entry["area"] = float(value)
#Add placeholders for the correct data structure
new_entry["MS+"] = [[]]
new_entry["MS-"] = [[]]
#Add well number to the new entry
new_entry["well"] = wellno
#Add the new entry to the peaks dictionary
peaks[new_entry["peakID"]] = new_entry
#get the chromatographic trace for this well
coded_trace = raw_UV.find("data").text
decoded_trace = base64.b64decode(coded_trace)
trace_list = str(decoded_trace).split(";;;")[1].split("'")[0].split(";")
xval = []
yval = []
for i in range(0, len(trace_list), 2):
#Normalise the heights of the peaks so that they plot correctly
max_y_val = max(yval)
normalised_yval = [100*i/max_y_val for i in yval]
chroma[wellno] = [xval, normalised_yval]
#next, get the mass spec data for each peak, and match it to the UV peaks
raw_ms = results.find("spectra")
for spec in raw_ms.findall("spec"):
peakID = int(spec.find("peak_num").text)
if peakID in peaks.keys():
msdata = spec.find("data").text
decoded = base64.b64decode(msdata)
#determine whether this data is for MS+ or MS-
ms_type = "MS+" if str(decoded).find("MS(+)") > -1 else "MS-"
#Get the list of masses with their intensities, and group them
#pairwise ([massion, intensity])
decoded_list = str(decoded).split(";;;")[1].split("'")[0].split(";")
masses = []
total = 0
for i in range(0, len(decoded_list), 2):
datapair = [float(decoded_list[i])/10000, float(decoded_list[i+1])]
#normalise the intensities
max_intensity = max(masses, key = lambda x: x[1])[1]
normalised_masses = [[i[0], round(i[1]*100/max_intensity, 3)] for i in masses]
#Remove any masses which are so small as to be negligable
total = sum([i[1] for i in normalised_masses])
refined_masses = [i for i in normalised_masses if math.floor(i[1]*100/total) > 0]
if ms_type == "MS+":
peaks[peakID]["MS+"] = refined_masses
peaks[peakID]["MS-"] = refined_masses
#finally, fetch the UV absorption maxima values
raw_UV_ads = results.find("pda_spectra")
for pda_spec in raw_UV_ads.findall("pda_spec"):
#Get the peakID so the pda data is matched to the correct peaks
peakID = int(pda_spec.find("peak_num").text)
if peakID in peaks.keys():
UV_maxima = []
#Decode the base64 data, and group the data into pairs of x/y values
UV_pairs = []
pda_data = base64.b64decode(pda_spec.find("data").text)
decoded_list = str(pda_data).split(";;;")[1].split("'")[0].split(";")
for i in range(0, len(decoded_list), 2):
datapair = [float(decoded_list[i])/100, float(decoded_list[i+1])]
#Tag the very first value as a maxima if required
if UV_pairs[0][1] > UV_pairs[1][1] and UV_pairs[0][1] > options.min_uv_threshold:
#Iterate through all other n-1/n/n+1 comparisons to find all other maxima
for i in range(1, len(UV_pairs)-1):
if (UV_pairs[i][1] > UV_pairs[i-1][1] and UV_pairs[i][1] > UV_pairs[i+1][1]
and UV_pairs[i][1] > options.min_uv_threshold):
if UV_pairs[-1][1] > UV_pairs[-2][1] and UV_pairs[-1][1] > options.min_uv_threshold:
#Store these maxima values with the peak
peaks[peakID]["UV"] = UV_maxima
#Filter out any peaks within retention times specified by the user
#Expected uses: remove solvent peaks from consideration
#Expected format is a list of specified ranges, e.g.
#[min_time1-max_time1, min_time2-max_time2]
#where the start of the range appears first and is separated from
#the end of the range with a hyphen
if len(options.filter_by_rt) > 0:
to_remove = []
for filter_range in options.filter_by_rt:
min_time = float(filter_range.split("-")[0].strip())
max_time = float(filter_range.split("-")[1].strip())
for index in peaks.keys():
if float(index) > min_time and index < max_time:
for index in to_remove:
del peaks[index]
#If the reprocess_by_rt parameter was used, recalculate the percentage peak area for each peak
if len(options.filter_by_rt) > 0:
total_area_abs[wellno] = 0
for peak in peaks.values():
total_area_abs[wellno] = total_area_abs[wellno] + peak["areaAbs"]
#Recalculate the peak area percentages now that the total absolute area
#for that well has been calculated.
if len(options.filter_by_rt) > 0:
for index in peaks.keys():
peaks[index]["area"] = round((peaks[index]["areaAbs"]*100) / total_area_abs[wellno], 2)
#Append the peaks to the specific well
masterTable[wellno] = peaks.values()
return [masterTable, chroma, sample_IDs, total_area_abs]