Skip to content

Commit

Permalink
tdl-16123
Browse files Browse the repository at this point in the history
  • Loading branch information
JYOTHINARAYANSETTY committed Jan 16, 2024
1 parent e57af17 commit 1a7562a
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 69 deletions.
25 changes: 24 additions & 1 deletion tests/base_hubspot.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class HubspotBaseCase(BaseCase):
EXTRA_FIELDS = {
"contacts": { "versionTimestamp" }
}

def setUp(self):
missing_envs = [x for x in [
'TAP_HUBSPOT_REDIRECT_URI',
Expand Down Expand Up @@ -140,5 +140,28 @@ def expected_metadata(cls): # DOCS_BUG https://stitchdata.atlassian.net/browse/
BaseCase.REPLICATION_KEYS: {"updatedAt"},
BaseCase.API_LIMIT: 100,
BaseCase.OBEYS_START_DATE: True
},
# below are the custom_objects stream
"cars": {
BaseCase.PRIMARY_KEYS: {"id"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"updatedAt"},
BaseCase.API_LIMIT: 100,
BaseCase.EXPECTED_PAGE_SIZE: 100,
BaseCase.OBEYS_START_DATE: True
},
"co_firsts": {
BaseCase.PRIMARY_KEYS: {"id"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"updatedAt"},
BaseCase.API_LIMIT: 100,
BaseCase.EXPECTED_PAGE_SIZE: 100,
BaseCase.OBEYS_START_DATE: True
}

}

def expected_page_limits(self):
return {table: properties.get(BaseCase.EXPECTED_PAGE_SIZE, set())
for table, properties
in self.expected_metadata().items()}
92 changes: 24 additions & 68 deletions tests/test_hubspot_pagination.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,39 @@
from datetime import datetime
from datetime import timedelta
import time

import tap_tester.connections as connections
import tap_tester.menagerie as menagerie
import tap_tester.runner as runner
from tap_tester.logger import LOGGER

from client import TestClient
from base import HubspotBaseTest
from tap_tester.base_suite_tests.pagination_test import PaginationTest
from base_hubspot import HubspotBaseCase


class TestHubspotPagination(HubspotBaseTest):
class HubspotPaginationTest(PaginationTest, HubspotBaseCase):

@staticmethod
def name():
return "tt_hubspot_pagination"

def streams_to_test(self):
"""
# All streams with limits are under test
# """
streams_with_page_limits = {
stream
for stream, limit in self.expected_page_limits().items()
if limit
}
streams_to_test = streams_with_page_limits.difference({
# updates for contacts_by_company do not get processed quickly or consistently
# via Hubspot API, unable to guarantee page limit is exceeded
'contacts_by_company',
'email_events',
'subscription_changes', # BUG_TDL-14938 https://jira.talendforge.org/browse/TDL-14938
})
return streams_to_test

return self.streams_to_test()

def get_properties(self):
return {
'start_date' : datetime.strftime(datetime.today()-timedelta(days=7), self.START_DATE_FORMAT)
Expand Down Expand Up @@ -72,65 +89,4 @@ def setUp(self):

setup_end = time.perf_counter()
LOGGER.info(f"Test Client took about {str(setup_end-setup_start).split('.')[0]} seconds")

def streams_to_test(self):
"""
All streams with limits are under test
"""
streams_with_page_limits = {
stream
for stream, limit in self.expected_page_limits().items()
if limit
}
streams_to_test = streams_with_page_limits.difference({
# updates for contacts_by_company do not get processed quickly or consistently
# via Hubspot API, unable to guarantee page limit is exceeded
'contacts_by_company',
'email_events',
'subscription_changes', # BUG_TDL-14938 https://jira.talendforge.org/browse/TDL-14938
})
return streams_to_test

def test_run(self):
# Select only the expected streams tables
expected_streams = self.streams_to_test()
conn_id = connections.ensure_connection(self)
found_catalogs = self.run_and_verify_check_mode(conn_id)

catalog_entries = [ce for ce in found_catalogs if ce['tap_stream_id'] in expected_streams]
for catalog_entry in catalog_entries:
stream_schema = menagerie.get_annotated_schema(conn_id, catalog_entry['stream_id'])
connections.select_catalog_and_fields_via_metadata(
conn_id,
catalog_entry,
stream_schema
)

sync_record_count = self.run_and_verify_sync(conn_id)
sync_records = runner.get_records_from_target_output()


# Test by stream
for stream in expected_streams:
with self.subTest(stream=stream):

record_count = sync_record_count.get(stream, 0)

sync_messages = sync_records.get(stream, {'messages': []}).get('messages')

primary_keys = self.expected_primary_keys().get(stream)

# Verify the sync meets or exceeds the default record count
stream_page_size = self.expected_page_limits()[stream]
self.assertLess(stream_page_size, record_count)

# Verify we did not duplicate any records across pages
records_pks_set = {tuple([message.get('data').get(primary_key)
for primary_key in primary_keys])
for message in sync_messages}
records_pks_list = [tuple([message.get('data').get(primary_key)
for primary_key in primary_keys])
for message in sync_messages]
# records_pks_list = [message.get('data').get(primary_key) for message in sync_messages]
self.assertCountEqual(records_pks_set, records_pks_list,
msg=f"We have duplicate records for {stream}")
super().setUp()

0 comments on commit 1a7562a

Please sign in to comment.