Skip to content
This repository has been archived by the owner on Jul 18, 2023. It is now read-only.

Commit

Permalink
Implement Control Results Report as new endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronstephenson committed May 20, 2019
1 parent 1da1428 commit ab86549
Showing 1 changed file with 143 additions and 63 deletions.
206 changes: 143 additions & 63 deletions lideservices/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1681,6 +1681,7 @@ def post(self, request):
pos = "Positive"
neg = "Negative"
nr = "No Result"
na = "Not Analyzed"

# PCRReplicateBatch-level controls
# Ext Neg
Expand Down Expand Up @@ -1711,71 +1712,150 @@ def post(self, request):
}
ext_neg_results[ext_neg['extraction_batch__id']] = data
# convert the dict of dicts into a list of dicts
ext_neg_results_list = ext_neg_results.values()

# # PCR Neg
# pcr_negs = PCRReplicateBatch.objects.all().annotate(
# result=Case(
# When(pcr_neg_cq_value__gt=0, then=Value(pos)),
# When(pcr_neg_cq_value__exact=0, then=Value(neg)),
# default=Value(nr), output_field=CharField()
# )).annotate(pcrreplicate_batch=F('id')).values(
# 'extraction_batch__analysis_batch', 'extraction_batch__extraction_number', 'replicate_number', 'target__name', 'id', 'result')
# # PCR Pos
# pcr_poss = PCRReplicateBatch.objects.all().annotate(
# result=Case(
# When(pcr_pos_cq_value__gt=0, then=Value(pos)),
# When(pcr_pos_cq_value__exact=0, then=Value(neg)),
# default=Value(nr), output_field=CharField()
# )).annotate(pcrreplicate_batch=F('id')).values(
# 'extraction_batch__analysis_batch', 'extraction_batch__extraction_number', 'replicate_number', 'target__name', 'id', 'result')
#
# # ExtractionBatch-level controls
# # Ext Pos
# ext_poss = ExtractionBatch.objects.all().annotate(
# ext_pos_rna_rt_cq_value=Max('reversetranscriptions__ext_pos_rna_rt_cq_value',
# filter=Q(reversetranscriptions__re_rt__isnull=True))
# ).annotate(
# result=Case(
# When(ext_pos_rna_rt_cq_value__gt=0, then=Value(pos)),
# When(ext_pos_dna_cq_value__gt=0, then=Value(pos)),
# When(ext_pos_dna_cq_value__exact=0, then=Value(neg)),
# default=Value(nr), output_field=CharField()
# )).values('analysis_batch', 'extraction_number', 'result')
#
# if target_ids:
#
# pcr_negs = pcr_negs.filter(target__in=target_ids)
# pcr_poss = pcr_poss.filter(target__in=target_ids)
#
# # Sample-level controls
# # PegNegs
# peg_negs = Sample.objects.filter(record_type=2)
# peg_neg_results = []
# for peg_neg in peg_negs:
# peg_neg_resp = {"id": peg_neg.id, "collection_start_date": peg_neg.collection_start_date}
# for target in targets:
# # only check for valid reps with the same target
# reps = PCRReplicate.objects.filter(
# sample_extraction__sample_id=peg_neg.id,
# pcrreplicate_batch__target_id__exact=target['id'], invalid=False)
# # if even a single one of the peg_neg reps is greater than zero,
# # the data rep result must be set to positive
# pos_result = [rep.cq_value for rep in reps if rep.cq_value is not None and rep.cq_value > 0]
# if pos_result:
# result = pos
# else:
# neg_result = [rep.cq_value for rep in reps if rep.cq_value is not None and rep.cq_value == 0]
# result = neg if neg_result else nr
# peg_neg_resp[target['name']] = result
# peg_neg_results.append(peg_neg_resp)
ext_neg_results_list = list(ext_neg_results.values())
# include targets not analyzed
for ext_neg_result in ext_neg_results_list:
for target in targets:
if target['name'] not in ext_neg_result:
ext_neg_result[target['name']] = na

# PCR Neg
pcr_negs = PCRReplicateBatch.objects.all().annotate(
result=Case(
When(pcr_neg_cq_value__gt=0, then=Value(pos)),
When(pcr_neg_cq_value__exact=0, then=Value(neg)),
default=Value(nr), output_field=CharField()
)).annotate(pcrreplicate_batch=F('id')).values(
'extraction_batch__id', 'extraction_batch__analysis_batch', 'extraction_batch__extraction_number',
'replicate_number', 'target__name', 'pcrreplicate_batch', 'result')
if target_ids:
pcr_negs = pcr_negs.filter(target__in=target_ids)
pcr_neg_results = {}
for pcr_neg in pcr_negs:
# if the EB is already included in our local dict, just append the current target to it
if pcr_neg_results.get(pcr_neg['extraction_batch__id'], None) is not None:
data = pcr_neg_results[pcr_neg['extraction_batch__id']]
data[pcr_neg['target__name']] = pcr_neg['result']
# otherwise, add the EB to our local dict and append the current target to it
else:
data = {
"analysis_batch": pcr_neg['extraction_batch__analysis_batch'],
"extraction_number": pcr_neg['extraction_batch__extraction_number'],
# "replicate_number": pcr_neg['replicate_number'],
pcr_neg['target__name']: pcr_neg['result']
}
pcr_neg_results[pcr_neg['extraction_batch__id']] = data
# convert the dict of dicts into a list of dicts
pcr_neg_results_list = list(pcr_neg_results.values())
# include targets not analyzed
for pcr_neg_result in pcr_neg_results_list:
for target in targets:
if target['name'] not in pcr_neg_result:
pcr_neg_result[target['name']] = na

# PCR Pos
pcr_poss = PCRReplicateBatch.objects.all().annotate(
result=Case(
When(pcr_pos_cq_value__gt=0, then=Value(pos)),
When(pcr_pos_cq_value__exact=0, then=Value(neg)),
default=Value(nr), output_field=CharField()
)).annotate(pcrreplicate_batch=F('id')).values(
'extraction_batch__id', 'extraction_batch__analysis_batch', 'extraction_batch__extraction_number',
'replicate_number', 'target__name', 'pcrreplicate_batch', 'result')
if target_ids:
pcr_poss = pcr_poss.filter(target__in=target_ids)
pcr_pos_results = {}
for pcr_pos in pcr_poss:
# if the EB is already included in our local dict, just append the current target to it
if pcr_pos_results.get(pcr_pos['extraction_batch__id'], None) is not None:
data = pcr_pos_results[pcr_pos['extraction_batch__id']]
data[pcr_pos['target__name']] = pcr_pos['result']
# otherwise, add the EB to our local dict and append the current target to it
else:
data = {
"analysis_batch": pcr_pos['extraction_batch__analysis_batch'],
"extraction_number": pcr_pos['extraction_batch__extraction_number'],
# "replicate_number": pcr_neg['replicate_number'],
pcr_pos['target__name']: pcr_pos['result']
}
pcr_pos_results[pcr_pos['extraction_batch__id']] = data
# convert the dict of dicts into a list of dicts
pcr_pos_results_list = list(pcr_pos_results.values())
# include targets not analyzed
for pcr_pos_result in pcr_pos_results_list:
for target in targets:
if target['name'] not in pcr_pos_result:
pcr_pos_result[target['name']] = na

ext_poss = PCRReplicateBatch.objects.all().annotate(
ext_pos_rna_rt_cq_value=Max('extraction_batch__reversetranscriptions__ext_pos_rna_rt_cq_value',
filter=Q(extraction_batch__reversetranscriptions__re_rt__isnull=True))
).annotate(
result=Case(
When(ext_pos_rna_rt_cq_value__gt=0, then=Value(pos)),
When(extraction_batch__ext_pos_dna_cq_value__gt=0, then=Value(pos)),
When(extraction_batch__ext_pos_dna_cq_value__exact=0, then=Value(neg)),
default=Value(nr), output_field=CharField()
)).annotate(pcrreplicate_batch=F('id')).values(
'extraction_batch__id', 'extraction_batch__analysis_batch', 'extraction_batch__extraction_number',
'replicate_number', 'target__name', 'pcrreplicate_batch', 'result')
if target_ids:
ext_poss = ext_poss.filter(target__in=target_ids)
ext_pos_results = {}
for ext_pos in ext_poss:
# if the EB is already included in our local dict, just append the current target to it
if ext_pos_results.get(ext_pos['extraction_batch__id'], None) is not None:
data = ext_pos_results[ext_pos['extraction_batch__id']]
data[ext_pos['target__name']] = ext_pos['result']
# otherwise, add the EB to our local dict and append the current target to it
else:
data = {
"analysis_batch": ext_pos['extraction_batch__analysis_batch'],
"extraction_number": ext_pos['extraction_batch__extraction_number'],
# "replicate_number": pcr_neg['replicate_number'],
ext_pos['target__name']: ext_pos['result']
}
ext_pos_results[ext_pos['extraction_batch__id']] = data
# convert the dict of dicts into a list of dicts
ext_pos_results_list = list(ext_pos_results.values())
# include targets not analyzed
for ext_pos_result in ext_pos_results_list:
for target in targets:
if target['name'] not in ext_pos_result:
ext_pos_result[target['name']] = na

# Sample-level controls
# PegNegs
peg_negs = Sample.objects.filter(record_type=2)
peg_neg_results_list = []
for peg_neg in peg_negs:
peg_neg_resp = {"id": peg_neg.id, "collection_start_date": peg_neg.collection_start_date}
for target in targets:
# only check for valid reps with the same target
reps = PCRReplicate.objects.filter(
sample_extraction__sample_id=peg_neg.id,
pcrreplicate_batch__target_id__exact=target['id'], invalid=False)
# if there are no reps, then this target was not analyzed
if len(reps) == 0:
result = na
else:
# if even a single one of the peg_neg reps is greater than zero,
# the data rep result must be set to positive
pos_result = [rep.cq_value for rep in reps if rep.cq_value is not None and rep.cq_value > 0]
if pos_result:
result = pos
else:
neg_result = [rep.cq_value for rep in reps if rep.cq_value is not None and rep.cq_value == 0]
result = neg if neg_result else nr
peg_neg_resp[target['name']] = result
peg_neg_results_list.append(peg_neg_resp)

resp = {
"ext_neg": ext_neg_results_list
# "pcr_neg": list(pcr_negs),
# "pcr_pos": list(pcr_poss),
# "ext_pos": list(ext_poss),
# "peg_neg": list(peg_neg_results),
"ext_neg": ext_neg_results_list,
"pcr_neg": pcr_neg_results_list,
"pcr_pos": pcr_pos_results_list,
"ext_pos": ext_pos_results_list,
"peg_neg": peg_neg_results_list
}

return Response(resp)

0 comments on commit ab86549

Please sign in to comment.