-
Notifications
You must be signed in to change notification settings - Fork 17
/
convert_VCF_info_fields.py
executable file
·192 lines (167 loc) · 8.12 KB
/
convert_VCF_info_fields.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
#!/usr/bin/env python3
# -----------------------------------------------------------------------------
# The MIT License (MIT)
# Copyright (c) 2014 galaxy-iuc
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# -----------------------------------------------------------------------------
# Takes in VCF file annotated with medaka tools annotate and converts
#
# Usage statement:
# python convert_VCF_info_fields.py in_vcf.vcf out_vcf.vcf
# 10/21/2020 - Nathan P. Roach, [email protected]
# adapted June 2024 - Marie Lataretu, [email protected]
# source for this scripts:
# https://github.com/galaxyproject/tools-iuc/blob/fd148a124034b44d0d61db3eec32ff991d8c152c/tools/medaka/convert_VCF_info_fields.py
import re
import sys
from collections import OrderedDict
from math import log10
import scipy.stats
def pval_to_phredqual(pval):
try:
ret = round(-10 * log10(pval))
except ValueError:
ret = 2147483647 # transform pval of 0.0 to max signed 32 bit int
return ret
def parseInfoField(info):
info_fields = info.split(";")
info_dict = OrderedDict()
for info_field in info_fields:
code, val = info_field.split("=")
info_dict[code] = val
return info_dict
def annotateVCF(in_vcf_filepath, out_vcf_filepath):
"""Postprocess output of medaka tools annotate.
Splits multiallelic sites into separate records.
Replaces medaka INFO fields that might represent information of the ref
and multiple alternate alleles with simple ref, alt allele counterparts.
"""
in_vcf = open(in_vcf_filepath, "r")
# medaka INFO fields that do not make sense after splitting of
# multi-allelic records
# DP will be overwritten with the value of DPSP because medaka tools
# annotate currently only calculates the latter correctly
# (https://github.com/nanoporetech/medaka/issues/192).
# DPS, which is as unreliable as DP, gets skipped and the code
# calculates the spanning reads equivalent DPSPS instead.
to_skip = {"SC", "SR", "AR", "DP", "DPSP", "DPS"}
struct_meta_pat = re.compile("##(.+)=<ID=([^,]+)(,.+)?>")
header_lines = []
contig_ids = set()
contig_ids_simple = set()
# parse the metadata lines of the input VCF and drop:
# - duplicate lines
# - INFO lines declaring keys we are not going to write
# - redundant contig information
while True:
line = in_vcf.readline()
if line[:2] != "##":
assert line.startswith("#CHROM")
break
if line in header_lines:
# the annotate tool may generate lines already written by
# medaka variant again (example: medaka version line)
continue
match = struct_meta_pat.match(line)
if match:
match_type, match_id, match_misc = match.groups()
if match_type == "INFO":
if match_id == "DPSP":
line = line.replace("DPSP", "DP")
elif match_id in to_skip:
continue
elif match_type == "contig":
contig_ids.add(match_id)
if not match_misc:
# the annotate tools writes its own contig info,
# which is redundant with contig info generated by
# medaka variant, but lacks a length value.
# We don't need the incomplete line.
contig_ids_simple.add(match_id)
continue
header_lines.append(line)
# Lets check the above assumption about each ID-only contig line
# having a more complete counterpart.
assert not (contig_ids_simple - contig_ids)
header_lines.insert(1, "##convert_VCF_info_fields=0.2\n")
header_lines += [
'##INFO=<ID=DPSPS,Number=2,Type=Integer,Description="Depth of spanning reads by strand">\n',
'##INFO=<ID=AF,Number=1,Type=Float,Description="Spanning Reads Allele Frequency">\n',
'##INFO=<ID=FAF,Number=1,Type=Float,Description="Forward Spanning Reads Allele Frequency">\n',
'##INFO=<ID=RAF,Number=1,Type=Float,Description="Reverse Spanning Reads Allele Frequency">\n',
'##INFO=<ID=SB,Number=1,Type=Integer,Description="Phred-scaled strand bias of spanning reads at this position">\n',
'##INFO=<ID=DP4,Number=4,Type=Integer,Description="Counts for ref-forward bases, ref-reverse, alt-forward and alt-reverse bases in spanning reads">\n',
'##INFO=<ID=AS,Number=4,Type=Integer,Description="Total alignment score to ref and alt allele of spanning reads by strand (ref fwd, ref rev, alt fwd, alt rev) aligned with parasail match 5, mismatch -4, open 5, extend 3">\n',
line,
]
with open(out_vcf_filepath, "w") as out_vcf:
out_vcf.writelines(header_lines)
for line in in_vcf:
fields = line.split("\t")
info_dict = parseInfoField(fields[7])
sr_list = [int(x) for x in info_dict["SR"].split(",")]
sc_list = [int(x) for x in info_dict["SC"].split(",")]
if len(sr_list) != len(sc_list):
print("WARNING - SR and SC are different lengths, " "skipping variant")
print(line.strip()) # Print the line for debugging purposes
continue
variant_list = fields[4].split(",")
dpsp = int(info_dict["DPSP"])
ref_fwd, ref_rev = 0, 1
dpspf, dpspr = (int(x) for x in info_dict["AR"].split(","))
for i in range(0, len(sr_list), 2):
dpspf += sr_list[i]
dpspr += sr_list[i + 1]
for j, i in enumerate(range(2, len(sr_list), 2)):
dp4 = (sr_list[ref_fwd], sr_list[ref_rev], sr_list[i], sr_list[i + 1])
dp2x2 = [[dp4[0], dp4[1]], [dp4[2], dp4[3]]]
_, p_val = scipy.stats.fisher_exact(dp2x2)
sb = pval_to_phredqual(p_val)
as_ = (sc_list[ref_fwd], sc_list[ref_rev], sc_list[i], sc_list[i + 1])
info = []
for code in info_dict:
if code in to_skip:
continue
val = info_dict[code]
info.append("%s=%s" % (code, val))
info.append("DP=%d" % dpsp)
info.append("DPSPS=%d,%d" % (dpspf, dpspr))
if dpsp == 0:
info.append("AF=.")
else:
af = (dp4[2] + dp4[3]) / dpsp
info.append("AF=%.6f" % af)
if dpspf == 0:
info.append("FAF=.")
else:
faf = dp4[2] / dpspf
info.append("FAF=%.6f" % faf)
if dpspr == 0:
info.append("RAF=.")
else:
raf = dp4[3] / dpspr
info.append("RAF=%.6f" % raf)
info.append("SB=%d" % sb)
info.append("DP4=%d,%d,%d,%d" % dp4)
info.append("AS=%d,%d,%d,%d" % as_)
new_info = ";".join(info)
fields[4] = variant_list[j]
fields[7] = new_info
out_vcf.write("\t".join(fields))
in_vcf.close()
if __name__ == "__main__":
annotateVCF(sys.argv[1], sys.argv[2])