-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
0 parents
commit 36ab854
Showing
6 changed files
with
347 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
import sys | ||
import json | ||
import typing | ||
import logging | ||
import argparse | ||
import numpy as np | ||
import pandas as pd | ||
from pathlib import Path | ||
|
||
|
||
plate_params = { | ||
96: {"width": 12, "height": 8}, # +4 due to name + space + top/bottom indexes | ||
384: {"width": 24, "height": 16}, | ||
} | ||
|
||
|
||
def log_assert(assertion, message): | ||
try: | ||
assert assertion | ||
except AssertionError as err: | ||
logger.exception(message) | ||
raise err | ||
|
||
|
||
def parse_description(excel_file: pd.ExcelFile) -> dict: | ||
raw_description = excel_file.parse("Description", index_col=0) | ||
|
||
description = raw_description.iloc[:, :1].squeeze() | ||
new_index = description.index.dropna() | ||
description = description.loc[new_index] | ||
return description.to_dict() | ||
|
||
|
||
def parse_plates(excel_file: pd.ExcelFile, n_variables: int) -> list: | ||
plate_size = int(excel_file.sheet_names[-1].split()[0]) | ||
width, height = plate_params[plate_size].values() | ||
|
||
plate_sheet = excel_file.parse(excel_file.sheet_names[-1], header=None) | ||
plates = [ | ||
plate_sheet.iloc[ | ||
(2 + 4 * k) + k * height : (2 + 4 * k) + (k + 1) * height, 1 : 1 + width + 1 | ||
] | ||
for k in range(n_variables) | ||
] | ||
for plate in plates: | ||
plate.set_index(1, inplace=True, drop=True) | ||
plate.index.name = None | ||
plate.columns = range(1, width + 1) | ||
return plates | ||
|
||
|
||
def flatten_plate(plate: pd.DataFrame, title: str = None) -> pd.Series: | ||
flat = plate.stack(dropna=False) | ||
flat.index = flat.index.map("{0[0]}{0[1]}".format) | ||
if title: | ||
flat.name = title | ||
return flat | ||
|
||
|
||
def parse_phenix_metadata(fname: str) -> typing.Tuple[dict, pd.DataFrame]: | ||
|
||
valid_suffixes = (".xls", ".xlsx", ".xltx") | ||
fpath = Path(fname) | ||
log_assert(fpath.exists(), f"File {fpath} doesn't exist!") | ||
log_assert( | ||
fpath.suffix in valid_suffixes, | ||
f"File {fpath} doesn't have one of the following file extensions " | ||
f"[{' '.join(valid_suffixes)}]", | ||
) | ||
|
||
excel_file = pd.ExcelFile(fname) | ||
log_assert( | ||
len(excel_file.sheet_names) == 3, "Excel file malformed. Expected 3 sheets" | ||
) | ||
|
||
description = parse_description(excel_file) | ||
|
||
_variable_names = excel_file.parse("Variables", index_col=0, header=0)[ | ||
"Variable Name" | ||
] | ||
_valid_names = ~_variable_names.isnull() | ||
n_variables = np.sum(_valid_names) | ||
variable_names = _variable_names.loc[_valid_names].values | ||
if n_variables == 0: | ||
logger.error("No variables defined in variable sheet!") | ||
exit() | ||
|
||
plates = parse_plates(excel_file, n_variables) | ||
flat_plates = [ | ||
flatten_plate(plate, variable_name) | ||
for plate, variable_name in zip(plates, variable_names) | ||
] | ||
plate_map = pd.concat(flat_plates, axis=1) | ||
|
||
return description, plate_map | ||
|
||
|
||
def parse_args() -> argparse.Namespace: | ||
parser = argparse.ArgumentParser( | ||
description=( | ||
"Parse a 96-well or 384-well plate map for use with the Single Cell Biology " | ||
"Lab Opera Phenix" | ||
) | ||
) | ||
|
||
parser.add_argument("plate_map_path", help="Path to the Excel file") | ||
parser.add_argument( | ||
"output_metadata_path", help="Path where to write metadata.json" | ||
) | ||
|
||
return parser.parse_args() | ||
|
||
|
||
def main(args: argparse.Namespace) -> None: | ||
json_path = Path(args.output_metadata_path) | ||
log_assert( | ||
json_path.parent.exists(), f"Directory containing {json_path} doesn't exist!" | ||
) | ||
|
||
metadata, plate_dataframe = parse_phenix_metadata(args.plate_map_path) | ||
metadata.update(plate_dataframe.to_dict(orient="index")) | ||
|
||
with open(str(json_path), "w") as fout: | ||
json.dump(metadata, fout) | ||
|
||
logger.info(f"Successfully wrote metadat to {json_path}") | ||
|
||
|
||
if __name__ == "__main__": | ||
logging.basicConfig( | ||
stream=sys.stdout, | ||
level=logging.DEBUG, | ||
format="%(asctime)s - %(name)s: %(funcName)s - %(levelname)s: %(message)s", | ||
) | ||
logger = logging.getLogger(__file__) | ||
|
||
args = parse_args() | ||
logger.debug(f"Parsed arguments: {vars(args)}") | ||
main(args) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
# Phenix Service | ||
|
||
This repository contains a suite of scripts, tools, and templates used to manage | ||
the Opera Phenix high content screening platform at JAXGM. | ||
|
||
## Contents | ||
|
||
- `templates` - plate map and other data input templates | ||
- `post_processing` - scripts for post-processing data coming off the Phenix | ||
- `utils` - utility scripts | ||
- `windows_scripts` - scripts for management of the Harmony-PC |
Binary file not shown.
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
#!/usr/bin/env python | ||
""" | ||
""" | ||
import os | ||
import sys | ||
import json | ||
import pathlib | ||
import logging | ||
import argparse | ||
|
||
import xml.etree.ElementTree as ET | ||
|
||
|
||
VALID_DATATYPES = { | ||
"measurement": {"fields": ["PlateName", "UserName", "MeasurementID", | ||
"TargetTemperature", "TargetCO2"]}, | ||
"analysissequence": {"fields": []}, | ||
"experiment": {"fields": []}, | ||
"assaylayout": {"fields": []}, | ||
} | ||
TAG_PREFIX = "{http://www.perkinelmer.com/PEHH/HarmonyV5}" | ||
|
||
logging.basicConfig(stream=sys.stdout, level=logging.INFO) | ||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def search_xml_for_key(xml_path, key): | ||
tree = ET.parse(xml_path) | ||
root = tree.getroot() | ||
found_tag = root.find(f".//{TAG_PREFIX}{key}") | ||
value = None | ||
if found_tag is not None: | ||
value = found_tag.text | ||
return value | ||
|
||
parser = argparse.ArgumentParser() | ||
parser.add_argument("archive_path", type=pathlib.Path) | ||
args = parser.parse_args() | ||
|
||
if not args.archive_path.exists(): | ||
logger.error(f"{args.archive_path} doesn't exist!") | ||
sys.exit(1) | ||
|
||
xml_dir = args.archive_path / "XML" | ||
if not xml_dir.exists(): | ||
logger.critical(f"{args.archive_path} is empty! Skipping.") | ||
sys.exit(1) | ||
|
||
archive_metadata = {} | ||
for datatype_dir in xml_dir.iterdir(): | ||
datatype = datatype_dir.name.lower() | ||
|
||
datatype_obj = VALID_DATATYPES.get(datatype, None) | ||
if not datatype_obj: | ||
continue | ||
|
||
keys_to_search = datatype_obj["fields"] | ||
for search_key in keys_to_search: | ||
for xml_file in datatype_dir.glob("*.xml"): | ||
if xml_file.stem.endswith("attmt"): | ||
continue | ||
found_value = search_xml_for_key(xml_file, search_key) | ||
existing_value = archive_metadata.get(search_key, None) | ||
if existing_value is None: | ||
archive_metadata[search_key] = found_value | ||
elif isinstance(existing_value, str): | ||
archive_metadata[search_key] = [existing_value, found_value] | ||
else: | ||
try: | ||
archive_metadata[search_key].append(found_value) | ||
except Exception as e: | ||
logger.error(e) | ||
raise e | ||
|
||
print(archive_metadata) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
#!/usr/bin/env python3 | ||
import os | ||
import json | ||
import shutil | ||
import sqlite3 | ||
import argparse | ||
from pathlib import Path | ||
|
||
import xml.etree.ElementTree as ET | ||
|
||
|
||
def construct_argparser(): | ||
parser = argparse.ArgumentParser() | ||
subparsers = parser.add_subparsers(dest="command", title="subcommands") | ||
subparsers.required = True | ||
parent = argparse.ArgumentParser(add_help=False) | ||
parent.add_argument( | ||
"archive_root", type=Path, | ||
help="Path to 'Harmony-Archive' directory" | ||
) | ||
parent.add_argument( | ||
"output_location", type=Path, | ||
help="Output directory to store images and/or metadata" | ||
) | ||
|
||
convert = subparsers.add_parser( | ||
"convert", | ||
parents=[parent], | ||
help=( | ||
"Convert Harmony-Archive format to human-readable tiffs " | ||
"and metadata" | ||
) | ||
) | ||
convert.set_defaults(func=HarmonyArchive.convert_to_human_readable) | ||
metadata = subparsers.add_parser( | ||
"metadata", | ||
parents=[parent], | ||
help="Only generate metadata from a Harmony-Archive" | ||
) | ||
metadata.set_defaults(func=HarmonyArchive.generate_metadata_json) | ||
return parser | ||
|
||
|
||
class HarmonyArchive(object): | ||
human_readable_format = ( | ||
"r{Row:02g}c{Col:02g}f{Field:02g}p{Plane:02g}-" | ||
"ch{Channel}sk{SlowKin}fk{FastKin}fl{Flim}.tiff" | ||
) | ||
|
||
def __init__(self, location): | ||
self.location = location | ||
self.exists = self.location.exists() | ||
if not self.exists: | ||
raise Exception(f"Harmony Archive directory {self.location} doesn't exist") | ||
self._validate_archive() | ||
|
||
def _validate_archive(self): | ||
# expects /IMAGES/IMAGES.sqlite | ||
# expects /XML/MEASUREMENT/*/*.xml | ||
self.image_db_locations = self.location.glob("IMAGES/*/IMAGES.sqlite") | ||
#assert self.image_db_location.exists(), \ | ||
# f"Image DB {self.image_db_location} not found!" | ||
self.measurement_xml_locations = self.location.glob("XML/*/*.xml") | ||
|
||
def load_image_database(self): | ||
image_data = {} | ||
for database_location in self.image_db_locations: | ||
measurement_key = database_location.parent.name | ||
|
||
with sqlite3.connect(str(database_location)) as image_db: | ||
image_db.row_factory = sqlite3.Row | ||
select_all = "SELECT * FROM Image" | ||
|
||
image_data[measurement_key] = [ | ||
dict(row) for row in image_db.execute(select_all) | ||
] | ||
|
||
self.image_data = image_data | ||
|
||
def convert_to_human_readable(self, output_location): | ||
if "image_data" not in self.__dict__: | ||
self.load_image_database() | ||
|
||
for key, records in self.image_data.items(): | ||
for record in records: | ||
human_readable_image_name = human_readable_format.format(**record) | ||
record["human_readable"] = human_readable_image_name | ||
|
||
src_path = self.location / "IMAGES" / key / record["Url"] | ||
dest_path = output_location / key / human_readable_image_name | ||
|
||
if not dest_path.parent.exists(): | ||
os.makedirs(dest_path.parent) | ||
shutil.copyfile(src_path, dest_path) | ||
break | ||
|
||
|
||
def generate_metadata_json(self, output_location): | ||
for xml_location in self.measurement_xml_locations: | ||
tree = ET.parse(xml_location) | ||
root = tree.getroot() | ||
#with open(xml_location, "r") as xml_in: | ||
for child in root.iter(): | ||
#if child.tag.endswith("Measurement"): | ||
|
||
print(child.tag) | ||
break | ||
|
||
|
||
def main(): | ||
argparser = construct_argparser() | ||
args = argparser.parse_args() | ||
|
||
archive = HarmonyArchive(args.archive_root) | ||
archive.load_image_database() | ||
|
||
args.func(archive, args.output_location) | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |