forked from andyhuynh3/target-jsonl
-
Notifications
You must be signed in to change notification settings - Fork 0
/
target_jsonl.py
executable file
·109 lines (87 loc) · 3.19 KB
/
target_jsonl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#!/usr/bin/env python3
import argparse
import io
import jsonschema
import simplejson as json
import os
import sys
from datetime import datetime
from pathlib import Path
import singer
from jsonschema import Draft4Validator, FormatChecker
from adjust_precision_for_schema import adjust_decimal_precision_for_schema
logger = singer.get_logger()
def emit_state(state):
if state is not None:
line = json.dumps(state)
logger.debug('Emitting state {}'.format(line))
sys.stdout.write("{}\n".format(line))
sys.stdout.flush()
def persist_messages(
messages,
destination_path,
custom_name=None,
do_timestamp_file=True
):
state = None
schemas = {}
key_properties = {}
validators = {}
timestamp_file_part = '-' + datetime.now().strftime('%Y%m%dT%H%M%S') if do_timestamp_file else ''
for message in messages:
try:
o = singer.parse_message(message).asdict()
except json.decoder.JSONDecodeError:
logger.error("Unable to parse:\n{}".format(message))
raise
message_type = o['type']
if message_type == 'RECORD':
if o['stream'] not in schemas:
raise Exception(
"A record for stream {}"
"was encountered before a corresponding schema".format(o['stream'])
)
try:
validators[o['stream']].validate((o['record']))
except jsonschema.ValidationError as e:
logger.error(f"Failed parsing the json schema for stream: {o['stream']}.")
raise e
filename = (custom_name or o['stream']) + timestamp_file_part + '.jsonl'
if destination_path:
Path(destination_path).mkdir(parents=True, exist_ok=True)
filename = os.path.expanduser(os.path.join(destination_path, filename))
with open(filename, 'a', encoding='utf-8') as json_file:
json_file.write(json.dumps(o['record']) + '\n')
state = None
elif message_type == 'STATE':
logger.debug('Setting state to {}'.format(o['value']))
state = o['value']
elif message_type == 'SCHEMA':
stream = o['stream']
schemas[stream] = o['schema']
adjust_decimal_precision_for_schema(schemas[stream])
validators[stream] = Draft4Validator((o['schema']))
key_properties[stream] = o['key_properties']
else:
logger.warning("Unknown message type {} in message {}".format(o['type'], o))
return state
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config', help='Config file')
args = parser.parse_args()
if args.config:
with open(args.config) as input_json:
config = json.load(input_json)
else:
config = {}
input_messages = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8')
state = persist_messages(
input_messages,
config.get('destination_path', ''),
config.get('custom_name', ''),
config.get('do_timestamp_file', True)
)
emit_state(state)
logger.debug("Exiting normally")
if __name__ == '__main__':
main()