"""
The .rpt file provided by Waters instrument is a text file
containing the output from each detector (UV, ELSD, ES+ and ES-),
described as "functions" sequentially. The functions are described for
each well in turn.
The following script extracts the data relevant to PyParse, and reformats
it into a common structure. It finds the appropriate sections of text by
looking for keywords that describe the start of a particular section.
"""
import math
import logging
import sys
def init(args):
global options
options = args
[docs]def getChromatogram(spectrum):
"""
Takes the region of text in rpt file corresponding to the chromatogram,
and extracts the data into two lists, corresponding to x-values and y-values.
:param spectrum: Section of rpt file as string
:return: A list of 2 lists, [x-values, y-values]
"""
x_val = []
y_val = []
lineData = spectrum.split("\n")
length = len(lineData)
n_val = math.ceil(length / options.points_per_trace)
for index, value in enumerate(lineData[1:]):
if value == "}": #stop for loop if end of data section is reached
break
elif index % n_val == 0:
if value != "{":
data = value.split("\t")
x_val.append(float(data[0]))
y_val.append(float(data[1]))
return [x_val, y_val]
[docs]def getMSData(spectrum):
"""
Takes the specific region of the rpt file pertaining to
m/z data for a specific peak in a specific well, and
returns a 2-D list containing all m/z peaks and their
normalised intensity.
:param spectrum: Section of rpt file as string
:return: 2-D list in following format:
[m/z value, normalised intensity of that value]
"""
masses = []
total = 0
lineData = spectrum.split(";Mass\t% BPI")[1].split("\n")
for line in lineData[1:]:
if line == "}": #stop the for loop if end of MSData section is reached
break
massData = line.split("\t")
if len(massData) == 2:
floatData = [float(i) for i in massData] #convert all data to float
masses.append(floatData)
total = total + floatData[1]
#Remove any masses which, as a percentage, round to 0
#to remove unnecessary baseline ions
refined_masses = []
for i in masses:
if math.floor((i[1]/total)*100) > 0:
refined_masses.append([i[0], i[1]])
return refined_masses
[docs]def getUVData(spectrum):
"""
Takes the specific region of the rpt file pertaining to
UV absorbance spectrum data for a specific peak in a specific
well, and returns a list containing all the maxima of that spectrum.
The height of the maxima must be greater than the min_uv_threshold
specified in options.
:param spectrum: Section of rpt file as string
:return: UV maxima as a list
"""
UVmaxima = []
lineData = spectrum.split(";Mass\t% BPI")[1].split("\n")
UVx = []
UVy = []
for line in lineData:
if line == "}":#stop for loop if end of UVData section is reached
break
UVdata = line.split("\t")
if len(UVdata) == 2:
UVx.append(float(UVdata[0]))
UVy.append(abs(float(UVdata[1])))
if UVy[0] > UVy[1] and UVy[0] > options.min_uv_threshold:
UVmaxima.append(UVx[0])
for i in range(1, len(UVy)-1):
if UVy[i] > UVy[i-1] and UVy[i] > UVy[i+1] and UVy[i] > options.min_uv_threshold:
UVmaxima.append(UVx[i])
if UVy[-1] > UVy[-2] and UVy[-1] > options.min_uv_threshold:
UVmaxima.append(UVx[-1])
return UVmaxima
[docs]def getData(filename):
"""
Converts the input .rpt file into a dictionary where each index corresponds to a well.
Each dictionary value contains a list of peaks, where each peak is represented as a dictionary
with the peak area, m/z and UV (if available) data present.
:param filename: Address of the input file
:return: List comprising: a dictionary of all peaks in all wells; a dictionary containing each chromatogram;
a dictionary of the sample IDs for each well; a dictionary of the total_abs_area of each well.
Each of these dictionaries are indexed by well.
"""
wellData = []
masterTable = {}
chroma = {}
sample_IDs = {}
total_area_abs = {}
with open(filename, errors = "ignore") as f:
fullText = f.read()
text_split = fullText.split("[SAMPLE]") #Split the file into individual wells
wellData = text_split[1:]
def findDataEntry(peaks, peakID):
"""
The program should match up equivalent peaks from different
detectors. Detectors may have subtly different retention times, but
the rpt file tags equivalent peaks with the same peakID. Use this value
to match up peaks from different detectors.
:param peaks: a dictionary of peaks found so far
:param peakID: a string; the peakID of a specific peak for a specific detector
:return: a float corresponding to an index for the peaks dictionary, or false if none was found.
"""
matches = [index for index, i in peaks.items() if i["peakID"] == peakID]
if len(matches) == 1:
return matches[0]
else:
return False
for i in range(len(wellData)):
well = wellData[i]
wellno = -1
functions = well.split("[FUNCTION]") #Split data by function (Prelude, MS+, MS- or UV)
#Get the plate dimensions from the first sample
#Data sample: #Plate 01TL,XY,SD,1: 8,2:12,3: 90.0...
#where number of rows is 8 and number of columns in 12
#Overwrite the default option, but ensure that the user retains control
#such that empty rows can be removed from the heatmap.
if i == 0 and (options.plate_row_no == 0 or options.plate_col_no == 0):
options.plate_row_no = int(functions[0].split("\n")[17].split(",")[3].split(":")[1])
options.plate_col_no = int(functions[0].split("\n")[17].split(",")[4].split(":")[1])
plate_cols_for_extraction = options.plate_col_no
elif i == 0:
plate_cols_for_extraction = int(functions[0].split("\n")[17].split(",")[4].split(":")[1])
#Find from rpt file how each well is specified
#Data sample: #Plate 01TL,XY,SD,1: 8,2:12,3: 90.0...
row_col_order = functions[0].split("\n")[17].split(",")[1]
well_type = functions[0].split("\n")[17].split(",")[2]
try:
#If the well type is just a Single Digit...
if well_type == "SD":
#If the well is simply an integer between 1 and infinity
#Single line function to trim full string to just the well number used
wellno = int(functions[0].split("Well")[1].split("\n")[0].split(":")[1].strip())
#If the column number specified by the user is different to that found in the rpt file,
#this is the result of the user looking to trim off blank columns. Only wells described by
#a single digit need to be modified to take this into account.
if options.plate_col_no != plate_cols_for_extraction:
wellno = math.floor(wellno / plate_cols_for_extraction)*options.plate_col_no + (wellno % plate_cols_for_extraction)
#If the well type is a combination of letters/numbers...
else:
#Find if the well
if row_col_order == "XY":
column = functions[0].split("Well")[1].split("\n")[0].split(":")[1].split(",")[0].strip()
row = functions[0].split("Well")[1].split("\n")[0].split(":")[1].split(",")[1].strip()
else:
column = functions[0].split("Well")[1].split("\n")[0].split(":")[1].split(",")[1].strip()
row = functions[0].split("Well")[1].split("\n")[0].split(":")[1].split(",")[0].strip()
#Convert the column to integer, either by direct
#conversion, or by finding position in the alphabet
try:
col_as_int = int(column)
except:
col_as_int = ord(column.capitalize()) - 64
#Convert the row to integer, either by direct
#conversion, or by finding position in the alphabet
try:
row_as_int = int(row)
except:
row_as_int = ord(row.capitalize()) - 64
#Calculate the wellno as a single integer
wellno = (row_as_int - 1) * options.plate_col_no + col_as_int
except:
logging.info("Unable to get well number. Terminating program.")
sys.exit(2)
#get the sample id, and load it into a list
#This list will be added as a column to the outputTable
well_ID = functions[0].split("SampleID")[1].split("\n")[0].strip()
sample_IDs[wellno] = well_ID
peaks = {}
for function in functions[1:]: #Ignore the prelude data
lines = function.split("\n")
spectra = function.split("[SPECTRUM]")[1:] #split by spectrum (i.e. each peak)
#get peakarea for this peak
if lines[4] == "Type\tDAD " and options.detector == "UV":
chromatograms = function.split("[CHROMATOGRAM]")[1:]
for chromatogram in chromatograms:
c_lines = chromatogram.split("\n")
if c_lines[3] == "Description\tDAD: TIC":
chroma[wellno] = getChromatogram(chromatogram.split("[TRACE]")[1]) #get chromatogram for this well
wellpeaks = chromatogram.split("[PEAK]")[1:]
for peak in wellpeaks:
retTime = float(peak.split("Time")[1].split("\n")[0].strip())
peakWidth = peak.split("Peak\t")[1].split("\n")[0].split("\t")
peakArea = float(peak.split("Area %Total")[1].split("\n")[0].strip())
peakAreaAbs = float(peak.split("AreaAbs")[1].split("\n")[0].strip())
peakID = int(peak.split("Peak ID")[1].split("\n")[0].strip())
peak_index = False
if len(list(peaks)) > 0:
peak_index = findDataEntry(peaks, peakID)
if peak_index == False:
peaks[retTime] = {
"MS+": [],
"MS-": [],
"area": 0,
"areaAbs": 0,
"UV": [],
"time": retTime,
"pStart": 0,
"pEnd": 0,
"peakID": peakID,
"well": wellno,
}
elif peak_index != retTime:
#If data has already been entered for this peak by a different
#function (i.e. MS+, MS-) using a slightly different retention time,
#copy this data over to a new entry
#which uses the retention time observed in the chromatogram
#i.e. we don't want some retention times from the MS+, and others
#from the chromatogram - we want a standard output
peaks[retTime] = {
"MS+": peaks[peak_index]["MS+"],
"MS-": peaks[peak_index]["MS-"],
"area": peaks[peak_index]["area"],
"areaAbs": peaks[peak_index]["areaAbs"],
"UV": peaks[peak_index]["UV"],
"time": retTime,
"pStart": peaks[peak_index]["pStart"],
"pEnd": peaks[peak_index]["pEnd"],
"peakID": peaks[peak_index]["peakID"],
"well": wellno,
}
#Delete the old entry now the data has been copied over
#to use the correct retention time.
del peaks[peak_index]
peaks[retTime]["area"] = peakArea
peaks[retTime]["areaAbs"] = peakAreaAbs
peaks[retTime]["pStart"] = float(peakWidth[0])
peaks[retTime]["pEnd"] = float(peakWidth[1])
#Add additional section to search for and use the ELSD data
#as opposed to the UV data. Note that the two detectors cannot currently
#be used in parallel.
if lines[3].strip() == "Description\tANALOG" and options.detector == "ELSD":
chromatograms = function.split("[CHROMATOGRAM]")[1:]
for chromatogram in chromatograms:
c_lines = chromatogram.split("\n")
if c_lines[3].strip() == "Description\t (3) ELSD Signal":
chroma[wellno] = getChromatogram(chromatogram.split("[TRACE]")[1]) #get chromatogram for this well
wellpeaks = chromatogram.split("[PEAK]")[1:]
for peak in wellpeaks:
retTime = float(peak.split("Time")[1].split("\n")[0].strip())
peakWidth = peak.split("Peak\t")[1].split("\n")[0].split("\t")
peakArea = float(peak.split("Area %Total")[1].split("\n")[0].strip())
peakAreaAbs = float(peak.split("AreaAbs")[1].split("\n")[0].strip())
peakID = int(peak.split("Peak ID")[1].split("\n")[0].strip())
peak_index = False
if len(list(peaks)) > 0:
peak_index = findDataEntry(peaks, peakID)
if peak_index == False:
peaks[retTime] = {
"MS+": [],
"MS-": [],
"area": 0,
"areaAbs": 0,
"UV": [],
"time": retTime,
"pStart": 0,
"pEnd": 0,
"peakID": peakID,
"well": wellno,
}
elif peak_index != retTime:
#If data has already been entered for this peak by a different
#function (i.e. MS+, MS-) using a slightly different retention time,
#copy this data over to a new entry
#which uses the retention time observed in the chromatogram
#i.e. we don't want some retention times from the MS+, and others
#from the chromatogram - we want a standard output
peaks[retTime] = {
"MS+": peaks[peak_index]["MS+"],
"MS-": peaks[peak_index]["MS-"],
"area": peaks[peak_index]["area"],
"areaAbs": peaks[peak_index]["areaAbs"],
"UV": peaks[peak_index]["UV"],
"time": retTime,
"pStart": peaks[peak_index]["pStart"],
"pEnd": peaks[peak_index]["pEnd"],
"peakID": peaks[peak_index]["peakID"],
"well": wellno,
}
#Delete the old entry now the data has been copied over
#to use the correct retention time.
del peaks[peak_index]
peaks[retTime]["area"] = peakArea
peaks[retTime]["areaAbs"] = peakAreaAbs
peaks[retTime]["pStart"] = float(peakWidth[0])
peaks[retTime]["pEnd"] = float(peakWidth[1])
for spectrum in spectra:
peakID = int(spectrum.split("Peak ID")[1].split("\n")[0].strip())
retTime = float(spectrum.split("Time")[1].split("\n")[0].strip())
peak_index = False
#Find matching peaks in other functions by reference to the peakID
if len(list(peaks)) > 0:
peak_index = findDataEntry(peaks, peakID)
if peak_index == False:
peaks[retTime] = {
"MS+": [],
"MS-": [],
"area": 0,
"areaAbs": 0,
"UV": [],
"time": retTime,
"pStart": 0,
"pEnd": 0,
"peakID": peakID,
"well": wellno,
}
else:
retTime = peak_index
#For each function, parse all the associated data and store
#as a float.
#For UV, take only the lambaMax
if lines[3] == "IonMode\tES+ ":
data = getMSData(spectrum)
peaks[retTime]["MS+"] = data
elif lines[3] == "IonMode\tES- ":
data = getMSData(spectrum)
peaks[retTime]["MS-"] = data
elif lines[4] == "Type\tDAD ":
data = getUVData(spectrum)
peaks[retTime]["UV"] = data
logging.debug(f'{len(peaks)} peaks found in well {wellno}.')
#Filter out any peaks within retention times specified by the user
#Expected uses: remove solvent peaks from consideration
#Expected format is a list of specified ranges, e.g.
#[min_time1-max_time1, min_time2-max_time2]
#where the start of the range appears first and is separated from
#the end of the range with a hyphen
if len(options.filter_by_rt) > 0:
to_remove = []
for filter_range in options.filter_by_rt:
min_time = float(filter_range.split("-")[0].strip())
max_time = float(filter_range.split("-")[1].strip())
for index in peaks.keys():
if float(index) > min_time and index < max_time:
to_remove.append(index)
for index in to_remove:
del peaks[index]
#If the reprocess_by_rt parameter was used, recalculate the percentage peak area for each peak
if len(options.filter_by_rt) > 0:
total_area_abs[wellno] = 0
for peak in peaks.values():
total_area_abs[wellno] = total_area_abs[wellno] + peak["areaAbs"]
#Recalculate the peak area percentages now that the total absolute area
#for that well has been calculated.
if len(options.filter_by_rt) > 0:
for index in peaks.keys():
peaks[index]["area"] = round((peaks[index]["areaAbs"]*100) / total_area_abs[wellno], 2)
masterTable[wellno] = peaks.values()
return [masterTable, chroma, sample_IDs, total_area_abs]