-
Notifications
You must be signed in to change notification settings - Fork 16
/
metrics.py
130 lines (119 loc) · 4.26 KB
/
metrics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# /usr/bin/env python
# coding=utf-8
def get_chunk_type(tok, idx_to_tag):
"""
Args:
tok: id of token, ex 4
idx_to_tag: dictionary {4: "B-PER", ...}
Returns:
tuple: "B", "PER"
"""
tag_name = idx_to_tag[tok]
content = tag_name.split('-')
tag_class = content[0]
if len(content) == 1:
return tag_class
ht = content[-1]
return tag_class, ht
def get_chunks(seq, tags):
"""Given a sequence of tags, group entities and their position
Args:
seq: np.array[4, 4, 0, 0, ...] sequence of labels
tags: dict["O"] = 4
Returns:
list of (chunk_type, chunk_start, chunk_end)
Example:
seq = [4, 5, 0, 3]
tags = {"B-PER": 4, "I-PER": 5, "B-LOC": 3}
result = [("PER", 0, 2), ("LOC", 3, 4)]
"""
default1 = tags['O']
idx_to_tag = {idx: tag for tag, idx in tags.items()}
chunks = []
chunk_type, chunk_start = None, None
for i, tok in enumerate(seq):
# End of a chunk 1
if tok == default1 and chunk_type is not None:
# Add a chunk.
chunk = (chunk_type, chunk_start, i)
chunks.append(chunk)
chunk_type, chunk_start = None, None
# End of a chunk + start of a chunk!
elif tok != default1:
res = get_chunk_type(tok, idx_to_tag)
if len(res) == 1:
continue
tok_chunk_class, ht = get_chunk_type(tok, idx_to_tag)
tok_chunk_type = ht
if chunk_type is None:
chunk_type, chunk_start = tok_chunk_type, i
elif tok_chunk_type != chunk_type or tok_chunk_class == "B":
chunk = (chunk_type, chunk_start, i)
chunks.append(chunk)
chunk_type, chunk_start = tok_chunk_type, i
else:
pass
# end condition
if chunk_type is not None:
chunk = (chunk_type, chunk_start, len(seq))
chunks.append(chunk)
return chunks
def tag_mapping_nearest(predict_tags, pre_rels=None, label2idx_sub=None, label2idx_obj=None):
"""
implement of the heuristic nearest principle
Args:
predict_tags: np.array, (xi, 2, max_sen_len)
pre_rels: (xi,)
"""
rel_num = predict_tags.shape[0]
pre_triples = []
for idx in range(rel_num):
heads, tails = [], []
pred_chunks_sub = get_chunks(predict_tags[idx][0], label2idx_sub)
pred_chunks_obj = get_chunks(predict_tags[idx][1], label2idx_obj)
pred_chunks = pred_chunks_sub + pred_chunks_obj
for ch in pred_chunks:
if ch[0] == 'H':
heads.append(ch)
elif ch[0] == 'T':
tails.append(ch)
# the heuristic nearest principle
if len(heads) != 0 and len(tails) != 0:
if len(heads) < len(tails):
heads += [heads[-1]] * (len(tails) - len(heads))
if len(heads) > len(tails):
tails += [tails[-1]] * (len(heads) - len(tails))
for h_t in zip(heads, tails):
if pre_rels is not None:
triple = list(h_t) + [pre_rels[idx]]
else:
triple = list(h_t) + [idx]
pre_triples.append(tuple(triple))
return pre_triples
def tag_mapping_corres(predict_tags, pre_corres, pre_rels=None, label2idx_sub=None, label2idx_obj=None):
"""
Args:
predict_tags: np.array, (xi, 2, max_sen_len)
pre_corres: (seq_len, seq_len)
pre_rels: (xi,)
"""
rel_num = predict_tags.shape[0]
pre_triples = []
for idx in range(rel_num):
heads, tails = [], []
pred_chunks_sub = get_chunks(predict_tags[idx][0], label2idx_sub)
pred_chunks_obj = get_chunks(predict_tags[idx][1], label2idx_obj)
pred_chunks = pred_chunks_sub + pred_chunks_obj
for ch in pred_chunks:
if ch[0] == 'H':
heads.append(ch)
elif ch[0] == 'T':
tails.append(ch)
retain_hts = [(h, t) for h in heads for t in tails if pre_corres[h[1]][t[1]] == 1]
for h_t in retain_hts:
if pre_rels is not None:
triple = list(h_t) + [pre_rels[idx]]
else:
triple = list(h_t) + [idx]
pre_triples.append(tuple(triple))
return pre_triples