-
Notifications
You must be signed in to change notification settings - Fork 0
/
DataFlowParseXML.py
executable file
·117 lines (94 loc) · 4.75 KB
/
DataFlowParseXML.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#!/usr/bin/env python
import sys
import re
import argparse
import apache_beam as beam
import xml.etree.ElementTree as ET
from apache_beam import pvalue
def parse_and_move(path_and_meta):
import xml.etree.ElementTree as ET
import re
import sys
import apache_beam as beam
from apache_beam import pvalue
try:
path,_,_ = path_and_meta
_,unprocessed_dir,_ = path_and_meta
_,_,processed_dir = path_and_meta
open_file = beam.io.filesystems.FileSystems.open(path)
content = open_file.read()
root = ET.fromstring(content)
root.findall(".")
item_list = []
for item in root.findall(".//channel/item"):
link = item.find('link').text
title = item.find('title').text
pubdate = item.find('pubDate').text
i = {
"pubdate": pubdate,
"link": link,
"title": title
}
item_list.append(i)
dest = re.sub(unprocessed_dir, processed_dir, path)
beam.io.filesystems.FileSystems.rename([path], [dest])
yield pvalue.TaggedOutput('ok', item_list)
yield item_list
except Exception as e:
error_pack = [{"filepath":path,"errormsg":str(e)}]
yield pvalue.TaggedOutput('fail', error_pack)
yield error_pack
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-ds', '--dataset', dest='dataset', action='store', help='target dataset name')
parser.add_argument('-t', '--table', dest='table', action='store', help='target table name')
parser.add_argument('-et', '--error_table', dest='err_table', action='store', help='error table name')
parser.add_argument('-p', '--project', dest='project', action='store', help='project name')
parser.add_argument('-b', '--bucketpath', dest='bucketpath', action='store', help='temporary bucket path for processing')
parser.add_argument('-pt', '--patterns', dest='patterns', action='store', help='pattern(s) of source XML file')
parser.add_argument('-pd', '--processed_dir', dest='processed_dir', action='store', help='path to processed files')
parser.add_argument('-ud', '--unprocessed_dir', dest='unprocessed_dir', action='store', help='path to unprocessed files')
parser.add_argument('-sd', '--staging_dir', dest='staging_dir', action='store', help='path to staging directory')
parser.add_argument('-r', '--runner', dest='runner', action='store', help='run method')
parser.add_argument('-rg', '--region', dest='region', action='store', help='region where dataflow job runs')
args = parser.parse_args()
pattern_path = args.bucketpath + '/' + args.unprocessed_dir + '/' + args.patterns
staging_path = args.bucketpath + '/' + args.staging_dir
temp_path = args.bucketpath + '/' + args.staging_dir
OUTPUT_TABLE = args.project + ':' + args.dataset + '.' + args.table
TABLE_SCHEMA = ('pubdate:STRING, link:STRING, title:STRING')
ERR_OUTPUT_TABLE = args.project + ':' + args.dataset + '.' + args.err_table
ERR_TABLE_SCHEMA = ('filepath:STRING, errormsg:STRING')
argv = [
'--project={0}'.format(args.project),
'--job_name=parse-and-write',
#'--save_main_session',
'--staging_location={0}/'.format(staging_path),
'--temp_location={0}/'.format(staging_path),
'--runner={0}'.format(args.runner),
'--region={0}'.format(args.region)
]
p = beam.Pipeline(argv=argv)
fmd_list = beam.io.filesystems.FileSystems.match([pattern_path])
path_list = []
for i in fmd_list[0].metadata_list:
path_list.append((i.path, args.unprocessed_dir, args.processed_dir))
print("Number of files to be processed: ", len(path_list))
collection = (p | 'CreatePathList' >> beam.Create(path_list)
| 'ParseXML' >> beam.FlatMap(parse_and_move).with_outputs('ok', 'fail', main='main path')
)
(collection['ok'] | 'Flatten' >> beam.FlatMap(lambda x: x)
| 'WriteToBigQuery' >> beam.io.gcp.bigquery.WriteToBigQuery(
table = OUTPUT_TABLE,
schema = TABLE_SCHEMA,
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND)
)
(collection['fail'] | 'FlattenErrors' >> beam.FlatMap(lambda x: x)
| 'WriteErrorsToBigQuery' >> beam.io.gcp.bigquery.WriteToBigQuery(
table = ERR_OUTPUT_TABLE,
schema = ERR_TABLE_SCHEMA,
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()