-
Notifications
You must be signed in to change notification settings - Fork 1
/
chem_ner.py
157 lines (126 loc) · 6.16 KB
/
chem_ner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#!/usr/bin/env python3
'''chem_ner.py
requires Perl to be available in $PATH
usage:
cd bioshovel/src # this is the parent directory of this script file
python3 -m preprocess.chem_ner [path_to_paragraph_documents] [output_directory] --tmchem [path/to/tmChem.pl] --logdir [path/to/save/logfile]
runs:
perl tmChem.pl -i inputdir -o outputdir -m Model/All.Model
'''
import argparse
import logging
import logging.handlers
import os
import shutil
import subprocess
import sys
import tempfile
import threading
from glob import glob, iglob
from itertools import repeat
import multiprocessing as mp
from pathlib import Path
from tqdm import tqdm
from preprocess.util import (save_file,
create_n_sublists,
logging_thread,
file_exists_or_exit,
reorganize_directory)
from preprocess.reformat import (parse_parform_file,
parform_to_pubtator)
def process_and_run_chunk(filepaths_args_tuple):
''' Generates reformatted files for each file path in
list_of_file_paths, saves them to a single temp directory,
and calls tmChem using subprocess
'''
list_of_file_paths, args, q = filepaths_args_tuple
if not list_of_file_paths:
return
qh = logging.handlers.QueueHandler(q)
l = logging.getLogger()
parsed_files = [parse_parform_file(file_path)
for file_path in list_of_file_paths]
# filter out files with no title line
# (for which parse_parform_file returned None)
parsed_files = [f for f in parsed_files if f]
reformatted_files = [parform_to_pubtator(escaped_doi, title_line, body)
for escaped_doi, title_line, body in parsed_files]
with tempfile.TemporaryDirectory() as input_tempdir, tempfile.TemporaryDirectory() as output_tempdir:
for doi_filename, file_info in reformatted_files:
save_file(doi_filename, file_info, input_tempdir)
try:
out = subprocess.check_output(['perl',
os.path.join(args.tmchem,
'tmChem.pl'),
'-i', input_tempdir,
'-o', output_tempdir,
'-m', os.path.join(args.tmchem,
'Model',
'All.Model')
],
cwd=args.tmchem,
stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as err:
string_error = err.output.decode(encoding='UTF-8').rstrip('\n')
l.critical('tmChem error: {}'.format(string_error))
l.critical('tmChem error while processing chunk: {}'.format(list_of_file_paths))
all_tempfiles = glob(os.path.join(output_tempdir, '*'))
try:
subprocess.check_output(['cp', '-t', args.output_directory+'/'] + all_tempfiles)
except subprocess.CalledProcessError:
l.critical('Copy error, chunk: {}'.format(list_of_file_paths))
def main(args):
file_exists_or_exit(os.path.join(args.tmchem,'tmChem.pl'))
args.paragraph_path = os.path.abspath(args.paragraph_path)
# glob.glob doesn't support double star expressions in Python 3.4, so using this:
print('Reading input files...')
all_files = [str(f) for f in tqdm(Path(args.paragraph_path).glob('**/*'), disable=args.notqdm) if f.is_file()]
filelist_with_sublists = create_n_sublists(all_files, mp.cpu_count()*1000)
# check if save_directory exists and create if necessary
if not os.path.isdir(args.output_directory):
os.makedirs(args.output_directory)
log_filename = os.path.join(args.logdir, 'chem_ner.log')
logging_format = '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
logging.basicConfig(filename=log_filename,
format=logging_format,
level=logging.INFO,
filemode='w')
print('Using {} cores to process {} files...'.format(args.poolsize,
len(all_files)))
with mp.Pool(args.poolsize) as pool:
mgr = mp.Manager()
q = mgr.Queue()
log_thread = threading.Thread(target=logging_thread, args=(q,))
log_thread.start()
imap_gen = pool.imap_unordered(process_and_run_chunk,
zip(filelist_with_sublists,
repeat(args),
repeat(q)))
for i in tqdm(imap_gen,
total=len(filelist_with_sublists),
disable=args.notqdm):
pass
logging.info('Done processing {} files'.format(len(all_files)))
# reorganize a directory with a huge number of files into a bunch of
# subdirectories containing those same files, with a max of 10k files per
# subdirectory
reorganize_directory(args.output_directory,
max_files_per_subdir=10000,
quiet=args.notqdm)
# end logging_thread
q.put(None)
log_thread.join()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Run tmChem on a directory of paragraph files')
parser.add_argument('paragraph_path', help='Directory of parsed paragraph files')
parser.add_argument('output_directory', help='Final output directory')
parser.add_argument('--tmchem', help='Directory where tmChem.pl is located', default=os.getcwd())
parser.add_argument('--logdir', help='Directory where logfile should be stored', default='../logs')
parser.add_argument('--poolsize',
help='Size of multiprocessing process pool',
type=int,
default=mp.cpu_count())
parser.add_argument('--notqdm', help='Disable tqdm progress bar output',
action='store_true')
args = parser.parse_args()
main(args)