Skip to content

Commit

Permalink
changes black file_handler
Browse files Browse the repository at this point in the history
  • Loading branch information
ArthurDeclercq committed Dec 3, 2024
1 parent 9606568 commit 999c95d
Showing 1 changed file with 56 additions and 43 deletions.
99 changes: 56 additions & 43 deletions mumble/file_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class _SpectrumFileHandler:
This class uses the pyteomics library for parsing MGF and mzML files.
Parsed spectra are stored as `rustyms.RawSpectrum` objects.
"""

def __init__(self, spectrum_file: str):
self.spectrum_file = spectrum_file
self.spectra = {} # Initialize an empty dictionary to hold the spectra
Expand All @@ -46,7 +46,6 @@ def __init__(self, spectrum_file: str):
else:
raise ValueError("Unsupported file format. Only MGF and mzML are supported.")


def _parse_mgf(self):
"""
Parse an MGF (Mascot Generic Format) file and store each spectrum as a RawSpectrum object.
Expand All @@ -61,31 +60,37 @@ def _parse_mgf(self):
try:
with mgf.MGF(self.spectrum_file) as spectra:
for spectrum in spectra:
spectrum_id = spectrum['params'].get('title', 'Unknown') # Extract spectrum ID from the MGF params
precursor_mass = spectrum['params'].get('pepmass', [None])[0] # Extract precursor mass

spectrum_id = spectrum["params"].get(
"title", "Unknown"
) # Extract spectrum ID from the MGF params
precursor_mass = spectrum["params"].get("pepmass", [None])[
0
] # Extract precursor mass

# Extract retention time
rt = 0.0
if 'rtinseconds' in spectrum['params']:
rt = float(spectrum['params']['rtinseconds'])
elif 'retention time' in spectrum['params']:
rt = float(spectrum['params']['retention time'])
if "rtinseconds" in spectrum["params"]:
rt = float(spectrum["params"]["rtinseconds"])
elif "retention time" in spectrum["params"]:
rt = float(spectrum["params"]["retention time"])

# Extract precursor charge
precursor_charge = 0
if 'charge' in spectrum['params']:
charge_str = spectrum['params']['charge']
precursor_charge = int(charge_str.strip('+')) # Remove '+' and convert to int
if "charge" in spectrum["params"]:
charge_str = spectrum["params"]["charge"]
precursor_charge = int(
charge_str.strip("+")
) # Remove '+' and convert to int

# Create a RawSpectrum object using required fields and additional attributes
self.spectra[spectrum_id] = RawSpectrum(
title=spectrum_id,
num_scans=len(spectrum['m/z array']),
title=spectrum_id,
num_scans=len(spectrum["m/z array"]),
rt=rt,
precursor_charge=precursor_charge,
mz_array=np.array(spectrum['m/z array']),
intensity_array=np.array(spectrum['intensity array']),
precursor_mass=precursor_mass
mz_array=np.array(spectrum["m/z array"]),
intensity_array=np.array(spectrum["intensity array"]),
precursor_mass=precursor_mass,
)
logging.info(f"Parsed {len(self.spectra)} spectra from {self.spectrum_file}")
except Exception as e:
Expand All @@ -105,36 +110,40 @@ def _parse_mzml(self):
try:
with mzml.MzML(self.spectrum_file) as spectra:
for spectrum in spectra:
spectrum_id = spectrum.get('id', None) # Get the spectrum ID from the mzML spectrum
spectrum_id = spectrum.get(
"id", None
) # Get the spectrum ID from the mzML spectrum
precursor_mass = 0.0
precursor_charge = 0
rt = 0.0

# Extract precursor mass and charge if available
if 'precursorList' in spectrum and spectrum['precursorList']:
precursor = spectrum['precursorList']['precursor'][0]
if 'selectedIonList' in precursor:
selected_ion = precursor['selectedIonList']['selectedIon'][0]
precursor_mass = selected_ion.get('selected ion m/z', 0.0)
precursor_charge = int(selected_ion.get('charge state', 0))
if "precursorList" in spectrum and spectrum["precursorList"]:
precursor = spectrum["precursorList"]["precursor"][0]
if "selectedIonList" in precursor:
selected_ion = precursor["selectedIonList"]["selectedIon"][0]
precursor_mass = selected_ion.get("selected ion m/z", 0.0)
precursor_charge = int(selected_ion.get("charge state", 0))

# Extract retention time
if 'scanList' in spectrum and spectrum['scanList']:
scan = spectrum['scanList']['scan'][0]
for cv_param in scan.get('cvParam', []):
if cv_param.get('accession') == 'MS:1000016': # accession for scan start time
rt = float(cv_param.get('value', 0.0))
if "scanList" in spectrum and spectrum["scanList"]:
scan = spectrum["scanList"]["scan"][0]
for cv_param in scan.get("cvParam", []):
if (
cv_param.get("accession") == "MS:1000016"
): # accession for scan start time
rt = float(cv_param.get("value", 0.0))
break

# Create a RawSpectrum object using required fields and additional attributes
self.spectra[spectrum_id] = RawSpectrum(
title=spectrum_id,
num_scans=len(spectrum['m/z array']),
num_scans=len(spectrum["m/z array"]),
rt=rt,
precursor_charge=precursor_charge,
mz_array=np.array(spectrum['m/z array']),
intensity_array=np.array(spectrum['intensity array']),
precursor_mass=precursor_mass
mz_array=np.array(spectrum["m/z array"]),
intensity_array=np.array(spectrum["intensity array"]),
precursor_mass=precursor_mass,
)
logging.info(f"Parsed {len(self.spectra)} spectra from {self.spectrum_file}")
except Exception as e:
Expand All @@ -143,10 +152,10 @@ def _parse_mzml(self):
def get_spectrum_from_psm(self, psm: PSM):
"""
Retrieve a RawSpectrum for a PSM by its ID.
Args:
psm (PSM): psm object
Returns:
RawSpectrum: The retrieved spectrum or None if not found.
"""
Expand All @@ -155,10 +164,10 @@ def get_spectrum_from_psm(self, psm: PSM):
def get_spectra_from_psm_list(self, psmList: PSMList):
"""
Retrieve all spectra for a PSMList.
Args:
psmList (PSMList): A list of PSM objects.
Returns:
list: A list of RawSpectrum objects corresponding to the PSMs.
None is included for any spectra not found.
Expand All @@ -168,7 +177,7 @@ def get_spectra_from_psm_list(self, psmList: PSMList):
def get_all_spectra(self):
"""
Retrieve all parsed spectra.
Returns:
dict: A dictionary of all parsed spectra, where keys are spectrum IDs
and values are RawSpectrum objects.
Expand All @@ -180,12 +189,12 @@ class _MetadataParser:
"""
Class to parse metadata files (CSV/TSV) containing PSM information.
"""

@staticmethod
def parse_csv_file(file_name: str, delimiter: str = "\t") -> list:
"""
Parse a CSV or TSV file containing PSM information and create PSM objects.
Args:
file_name (str): Path to the CSV or TSV file.
delimiter (str, optional): Delimiter used in the file. Defaults to "\t".
Expand All @@ -200,11 +209,11 @@ def parse_csv_file(file_name: str, delimiter: str = "\t") -> list:
pd.errors.ParserError: If there's an error parsing the file.
Notes:
The file must contain at least the following columns:
The file must contain at least the following columns:
'peptidoform', 'spectrum_id', and 'precursor_mz'.
If any of these columns are missing, an error is logged and an empty list is returned.
"""

try:
df = pd.read_csv(file_name, delimiter=delimiter)
except FileNotFoundError as e:
Expand All @@ -228,7 +237,11 @@ def parse_csv_file(file_name: str, delimiter: str = "\t") -> list:

# Create a list of PSM objects from the DataFrame rows
peptidoforms = [
PSM(peptidoform=row["peptidoform"], spectrum_id=row["spectrum_id"], precursor_mz=row["precursor_mz"])
PSM(
peptidoform=row["peptidoform"],
spectrum_id=row["spectrum_id"],
precursor_mz=row["precursor_mz"],
)
for _, row in df.iterrows()
]

Expand Down

0 comments on commit 999c95d

Please sign in to comment.