Skip to content

Commit

Permalink
More fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffbaumes committed Sep 4, 2023
1 parent d07d31a commit 8fca7e2
Show file tree
Hide file tree
Showing 14 changed files with 843 additions and 106 deletions.
168 changes: 110 additions & 58 deletions nmdc_server/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@

router = APIRouter()

BIOSAMPLE_SEARCH_COLLECTION = "biosample_denormalized"
OMICS_PROCESSING_SEARCH_COLLECTION = "omics_processing_denormalized"


# get application settings
@router.get("/settings", name="Get application settings")
Expand Down Expand Up @@ -87,13 +90,13 @@ async def mongo_get_database_summary(db: Session = Depends(get_db)):
"gold_classification": {
"type": "sankey-tree",
},
"env_broad_scale.has_raw_value": {
"env_broad_scale.term.id": {
"type": "tree",
},
"env_local_scale.has_raw_value": {
"env_local_scale.term.id": {
"type": "tree",
},
"env_medium.has_raw_value": {
"env_medium.term.id": {
"type": "tree",
},
"lat_lon.latitude": {
Expand Down Expand Up @@ -164,6 +167,55 @@ async def get_environmental_sankey(
return crud.get_environmental_sankey(db, query)


@router.post(
"/environment/mongo_sankey",
tags=["aggregation"],
)
async def mongo_get_environmental_sankey(
query: query.BiosampleQuerySchema = query.BiosampleQuerySchema(),
db: Session = Depends(get_db),
):
settings = Settings()
client = MongoClient(
host=settings.mongo_host,
username=settings.mongo_user,
password=settings.mongo_password,
port=settings.mongo_port,
directConnection=True,
)
mongo_filter = conditions_to_mongo_filter(query.conditions)
results = client.nmdc[BIOSAMPLE_SEARCH_COLLECTION].aggregate([
{
"$match": mongo_filter,
},
{
"$group": {
"_id": {
"ecosystem": "$ecosystem",
"ecosystem_category": "$ecosystem_category",
"ecosystem_subtype": "$ecosystem_subtype",
"ecosystem_type": "$ecosystem_type",
"specific_ecosystem": "$specific_ecosystem",
},
"count": {
"$count": {},
},
},
},
{
"$set": {
"_id.count": "$count",
},
},
{
"$replaceRoot": {
"newRoot": "$_id",
},
},
])
return json.loads(bson.json_util.dumps(results))


@router.post(
"/environment/geospatial",
response_model=List[schemas.EnvironmentGeospatialAggregation],
Expand Down Expand Up @@ -191,7 +243,7 @@ async def mongo_get_environmental_geospatial(
directConnection=True,
)
mongo_filter = conditions_to_mongo_filter(query.conditions)
results = client.nmdc.denormalized.aggregate([
results = client.nmdc[BIOSAMPLE_SEARCH_COLLECTION].aggregate([
{
"$match": mongo_filter,
},
Expand Down Expand Up @@ -317,6 +369,9 @@ async def mongo_search_biosample(
{
"$unset": "gene_function",
},
{
"$sort": {"multiomics_count": -1},
},
]
print("mongo_search_biosample:")
print(aggregation)
Expand All @@ -334,8 +389,8 @@ def add_data_object_selection(sample):
return sample

return json.loads(bson.json_util.dumps({
"count": list(client.nmdc.denormalized.aggregate([*aggregation, {"$count": "count"}]))[0]["count"],
"results": [add_data_object_selection(doc) for doc in client.nmdc.denormalized.aggregate([*aggregation, {"$skip": pagination.offset}, {"$limit": pagination.limit}])],
"count": list(client.nmdc[BIOSAMPLE_SEARCH_COLLECTION].aggregate([*aggregation, {"$count": "count"}]))[0]["count"],
"results": [add_data_object_selection(doc) for doc in client.nmdc[BIOSAMPLE_SEARCH_COLLECTION].aggregate([*aggregation, {"$skip": pagination.offset}, {"$limit": pagination.limit}])],
}))


Expand Down Expand Up @@ -376,7 +431,7 @@ async def mongo_facet_biosample(query: query.FacetQuery):
print(aggregation)

return json.loads(bson.json_util.dumps({
"facets": { facet_value_to_key(facet["_id"]): facet["count"] for facet in client.nmdc.denormalized.aggregate(aggregation) },
"facets": { facet_value_to_key(facet["_id"]): facet["count"] for facet in client.nmdc[BIOSAMPLE_SEARCH_COLLECTION].aggregate(aggregation) },
}))


Expand Down Expand Up @@ -419,7 +474,7 @@ async def mongo_binned_facet_biosample(query: query.FacetQuery):
print(aggregation)

date_string = lambda d: f"{d['_id']['year']}-{str(d['_id']['month']).zfill(2)}-01"
binned_data = list(client.nmdc.denormalized.aggregate(aggregation))
binned_data = list(client.nmdc[BIOSAMPLE_SEARCH_COLLECTION].aggregate(aggregation))
binned_data.sort(key=date_string)
binned_data = [d for d in binned_data if d["_id"]["year"] is not None]

Expand Down Expand Up @@ -457,6 +512,27 @@ async def get_biosample(biosample_id: str, db: Session = Depends(get_db)):
return db_biosample


@router.get(
"/mongo_biosample/{biosample_id}",
tags=["biosample"],
)
async def mongo_get_biosample(biosample_id: str, db: Session = Depends(get_db)):
settings = Settings()
client = MongoClient(
host=settings.mongo_host,
username=settings.mongo_user,
password=settings.mongo_password,
port=settings.mongo_port,
directConnection=True,
)

biosamples = list(client.nmdc[BIOSAMPLE_SEARCH_COLLECTION].find({"id": biosample_id}))
if len(biosamples) == 0:
raise HTTPException(status_code=404, detail="Biosample not found")

return json.loads(bson.json_util.dumps(biosamples[0]))


@router.get(
"/envo/tree",
response_model=schemas.EnvoTreeResponse,
Expand Down Expand Up @@ -542,61 +618,16 @@ async def mongo_search_study(
"$group": {
"_id": "$study.id",
"study": { "$first": "$study" },
"sample_count": { "$count": {} },
"omics_processing": { "$push": "$omics_processing" },
},
},
{
"$set": {
"study.sample_count": "$sample_count",
"study.omics_processing": {
"$map": {
"input": {
# Transform array of arrays of omics_processing into flat array of omics_processing
"$reduce": {
"input": "$omics_processing",
"initialValue": [],
"in": { "$concatArrays": ["$$this", "$$value"] },
},
},
"in": "$$this",
},
},
},
},
{
"$replaceRoot": { "newRoot": "$study" },
},
]

results = [doc for doc in client.nmdc.denormalized.aggregate([*aggregation, {"$skip": pagination.offset}, {"$limit": pagination.limit}])]

# Count number of each omics_type
for result in results:
filtered_omics_processing = [{ "omics_processing": doc } for doc in result["omics_processing"]]

# This is a query per study to only match specific omics_processing when an omics_processing search is active.
# Can it instead be done all at once in the above aggregation pipeline?
omics_processing_conditions = [cond for cond in query.conditions if cond.table.name == "omics_processing"]
if len(omics_processing_conditions) > 0:
filtered_omics_processing = [doc for doc in client.nmdc.aggregate([
{
"$documents": filtered_omics_processing,
},
{
"$match": conditions_to_mongo_filter(omics_processing_conditions),
},
])]

omics_processing_counts = Counter([doc["omics_processing"]["omics_type"]["has_raw_value"] for doc in filtered_omics_processing])
count_list = [{"type": key, "count": value} for key, value in omics_processing_counts.items()]
count_list.sort(key=lambda x: x["type"])
result["omics_processing_counts"] = count_list if len(count_list) > 0 else None
del result["omics_processing"]

return json.loads(bson.json_util.dumps({
"count": list(client.nmdc.denormalized.aggregate([*aggregation, {"$count": "count"}]))[0]["count"],
"results": results,
"count": list(client.nmdc[BIOSAMPLE_SEARCH_COLLECTION].aggregate([*aggregation, {"$count": "count"}]))[0]["count"],
"results": [doc for doc in client.nmdc[BIOSAMPLE_SEARCH_COLLECTION].aggregate([*aggregation, {"$skip": pagination.offset}, {"$limit": pagination.limit}])],
}))


Expand Down Expand Up @@ -646,7 +677,7 @@ async def mongo_facet_study(query: query.FacetQuery):
]

return json.loads(bson.json_util.dumps({
"facets": { facet_value_to_key(facet["_id"]): facet["count"] for facet in client.nmdc.denormalized.aggregate(aggregation) },
"facets": { facet_value_to_key(facet["_id"]): facet["count"] for facet in client.nmdc[BIOSAMPLE_SEARCH_COLLECTION].aggregate(aggregation) },
}))


Expand All @@ -672,6 +703,27 @@ async def get_study(study_id: str, db: Session = Depends(get_db)):
return db_study


@router.get(
"/mongo_study/{study_id}",
tags=["study"],
)
async def mongo_get_study(study_id: str, db: Session = Depends(get_db)):
settings = Settings()
client = MongoClient(
host=settings.mongo_host,
username=settings.mongo_user,
password=settings.mongo_password,
port=settings.mongo_port,
directConnection=True,
)

studies = list(client.nmdc.study_transformed.find({"id": study_id}))
if len(studies) == 0:
raise HTTPException(status_code=404, detail="Study not found")

return json.loads(bson.json_util.dumps(studies[0]))


@router.get("/study/{study_id}/image", tags=["study"])
async def get_study_image(study_id: str, db: Session = Depends(get_db)):
image = crud.get_study_image(db, study_id)
Expand Down Expand Up @@ -737,7 +789,7 @@ async def mongo_facet_omics_processing(query: query.FacetQuery):
print(aggregation)

return json.loads(bson.json_util.dumps({
"facets": { facet_value_to_key(facet["_id"]): facet["count"] for facet in client.nmdc.omics_processing_denormalized.aggregate(aggregation) },
"facets": { facet_value_to_key(facet["_id"]): facet["count"] for facet in client.nmdc[OMICS_PROCESSING_SEARCH_COLLECTION].aggregate(aggregation) },
}))


Expand Down Expand Up @@ -878,7 +930,7 @@ def mongo_data_object_aggregation(
]

result = dict()
for facet in client.nmdc.denormalized.aggregate(aggregation):
for facet in client.nmdc[BIOSAMPLE_SEARCH_COLLECTION].aggregate(aggregation):
if "data_object_type" not in facet["_id"]:
# We are not considering data_objects without a data_object_type
continue
Expand Down
Loading

0 comments on commit 8fca7e2

Please sign in to comment.