diff --git a/target_postgres/stream_tracker.py b/target_postgres/stream_tracker.py index 7e68a65..e2b12b8 100644 --- a/target_postgres/stream_tracker.py +++ b/target_postgres/stream_tracker.py @@ -48,9 +48,9 @@ def flush_streams(self, force=False): self._emit_safe_queued_states(force=force) - def handle_state_message(self, line_data): + def handle_state_message(self, line): if self.emit_states: - self.state_queue.append({'state': line_data['value'], 'watermark': self.message_counter}) + self.state_queue.append({'state': line, 'watermark': self.message_counter}) self._emit_safe_queued_states() def handle_record_message(self, stream, line_data): @@ -80,9 +80,14 @@ def _emit_safe_queued_states(self, force=False): valid_flush_watermarks.append(watermark) safe_flush_threshold = min(valid_flush_watermarks, default=0) + # the STATE message that the target forwards emittable_state = None + emittable_state_str = None while len(self.state_queue) > 0 and (force or self.state_queue[0]['watermark'] <= safe_flush_threshold): - emittable_state = self.state_queue.popleft()['state'] + emittable_state_str = self.state_queue.popleft()['state'] + + if emittable_state_str is not None: + emittable_state = json.loads(emittable_state_str)['value'] if emittable_state: if len(statediff.diff(emittable_state, self.last_emitted_state or {})) > 0: diff --git a/target_postgres/target_tools.py b/target_postgres/target_tools.py index 3cc6070..197babc 100644 --- a/target_postgres/target_tools.py +++ b/target_postgres/target_tools.py @@ -152,7 +152,8 @@ def _line_handler(state_tracker, target, invalid_records_detect, invalid_records state_tracker.flush_stream(line_data['stream']) target.activate_version(stream_buffer, line_data['version']) elif line_data['type'] == 'STATE': - state_tracker.handle_state_message(line_data) + # pass the string instead of the deserialized object to save memory in the deque + state_tracker.handle_state_message(line) else: raise TargetError('Unknown message type {} in message {}'.format( line_data['type'],