Skip to content

Commit

Permalink
Update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
judahrand committed Apr 4, 2022
1 parent 5335be7 commit 535f236
Showing 1 changed file with 22 additions and 6 deletions.
28 changes: 22 additions & 6 deletions tests/units/fastsync/commons/test_fastsync_tap_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,26 @@ def execute_mock(query):
# mock cursor with execute method
cursor_mock = MagicMock().return_value
cursor_mock.__enter__.return_value.execute.side_effect = execute_mock
cursor_mock.__enter__.return_value.fetchone.return_value = (
'pipelinewise_test_database_test_tap', '242FC/BA84A740', '00000009-0192A151-1', 'wal2json'
)
type(cursor_mock.__enter__.return_value).rowcount = PropertyMock(return_value=0)

# mock PG connection instance with ability to open cursor
pg_con = Mock()
pg_con.__enter__ = lambda *_: pg_con
pg_con.__exit__ = lambda *_: None
pg_con.cursor.return_value = cursor_mock

self.postgres.primary_host_conn = pg_con

self.postgres.create_replication_slot()
assert self.postgres.executed_queries_primary_host == [
"SELECT * FROM pg_replication_slots WHERE slot_name = 'pipelinewise_test_database';",
"SELECT * FROM pg_create_logical_replication_slot('pipelinewise_test_database_test_tap', 'wal2json')",
]
cursor_mock.__enter__.return_value.create_replication_slot.assert_called_once_with(
'pipelinewise_test_database_test_tap', output_plugin='wal2json'
)

def test_create_replication_slot_2(self):
"""
Expand All @@ -103,19 +110,26 @@ def execute_mock(query):
# mock cursor with execute method
cursor_mock = MagicMock().return_value
cursor_mock.__enter__.return_value.execute.side_effect = execute_mock
cursor_mock.__enter__.return_value.fetchone.return_value = (
'pipelinewise_test_database_test_tap', '242FC/BA84A740', '00000009-0192A151-1', 'wal2json'
)
type(cursor_mock.__enter__.return_value).rowcount = PropertyMock(return_value=1)

# mock PG connection instance with ability to open cursor
pg_con = Mock()
pg_con.__enter__ = lambda *_: pg_con
pg_con.__exit__ = lambda *_: None
pg_con.cursor.return_value = cursor_mock

self.postgres.primary_host_conn = pg_con

self.postgres.create_replication_slot()
assert self.postgres.executed_queries_primary_host == [
"SELECT * FROM pg_replication_slots WHERE slot_name = 'pipelinewise_test_database';",
"SELECT * FROM pg_create_logical_replication_slot('pipelinewise_test_database', 'wal2json')",
]
cursor_mock.__enter__.return_value.create_replication_slot.assert_called_once_with(
'pipelinewise_test_database', output_plugin='wal2json'
)

@patch('pipelinewise.fastsync.commons.tap_postgres.psycopg2.connect')
def test_get_connection_to_primary(self, connect_mock):
Expand Down Expand Up @@ -253,6 +267,8 @@ def execute_mock(query):

# mock PG connection instance with ability to open cursor
pg_con = Mock()
pg_con.__enter__ = lambda *_: pg_con
pg_con.__exit__ = lambda *_: None
pg_con.cursor.return_value = cursor_mock

connect_mock.return_value = pg_con
Expand All @@ -261,8 +277,7 @@ def execute_mock(query):

assert self.postgres.executed_queries_primary_host == [
"SELECT * FROM pg_replication_slots WHERE slot_name = 'pipelinewise_my_db';",
'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE '
"slot_name = 'pipelinewise_my_db';",
'DROP_REPLICATION_SLOT "pipelinewise_my_db"',
]

@patch('pipelinewise.fastsync.commons.tap_postgres.psycopg2.connect')
Expand Down Expand Up @@ -294,6 +309,8 @@ def execute_mock(query):

# mock PG connection instance with ability to open cursor
pg_con = Mock()
pg_con.__enter__ = lambda *_: pg_con
pg_con.__exit__ = lambda *_: None
pg_con.cursor.return_value = cursor_mock

connect_mock.return_value = pg_con
Expand All @@ -302,8 +319,7 @@ def execute_mock(query):

assert self.postgres.executed_queries_primary_host == [
"SELECT * FROM pg_replication_slots WHERE slot_name = 'pipelinewise_my_db';",
'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE '
"slot_name = 'pipelinewise_my_db_tap_test';",
'DROP_REPLICATION_SLOT "pipelinewise_my_db_tap_test"',
]

def test_fetch_current_incremental_key_pos_empty_result_expect_exception(self):
Expand Down

0 comments on commit 535f236

Please sign in to comment.