Skip to content

Commit

Permalink
fix: start from earliest checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
farbodahm committed Sep 14, 2024
1 parent d78fba0 commit 2f51109
Showing 1 changed file with 4 additions and 2 deletions.
6 changes: 4 additions & 2 deletions tests/unit/reader/test_kafka_reader.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from time import sleep
from typing import Any
from collections.abc import Generator
import pytest
Expand Down Expand Up @@ -110,7 +111,7 @@ def kafka_reader(
schema_version="latest",
kafka_spark_options={
"kafka.bootstrap.servers": KAFKA_BROKER_URL,
"auto.offset.reset": "latest",
"auto.offset.reset": "earliest",
"enable.auto.commit": True,
},
)
Expand Down Expand Up @@ -166,10 +167,11 @@ def test_kafka_reader_avro(
.start()
)

sleep(3)
produce_avro_message(kafka_producer, kafka_reader.topic, avro_serializer, value)

# Allow the stream to process the data
query.awaitTermination(20)
query.awaitTermination(30)

# Query the in-memory table
result_df: DataFrame = spark_session.sql("SELECT * FROM kafka_test")
Expand Down

0 comments on commit 2f51109

Please sign in to comment.