Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Handle TransportError Exceptions thrown from gapic_publish #1318

Merged
merged 1 commit into from
Jan 6, 2025

Conversation

mukund-ananthu
Copy link
Contributor

@mukund-ananthu mukund-ananthu commented Jan 6, 2025

Current behavior:

If TransportError is thrown by google.auth library:

https://github.com/googleapis/google-auth-library-python/blob/c3ea09fd8b9ee8f094263fdc809c59d9dfaa3124/google/auth/compute_engine/_metadata.py#L187-L189

when doing a _gapic_publish:

response = self._client._gapic_publish(
topic=self._topic,
messages=[wrapper.message for wrapper in self._message_wrappers],
retry=self._commit_retry,
timeout=self._commit_timeout,
)

Then,

  1. The exception is not caught:

except google.api_core.exceptions.GoogleAPIError as exc:

  1. Status of batch not set to ERROR:

self._status = base.BatchStatus.ERROR

  1. Exceptions not set in the publish futures

if self._batch_done_callback is not None:
# Failed to publish batch.
self._batch_done_callback(batch_transport_succeeded)
for future in self._futures:
future.set_exception(exc)

  1. The thread in which the batch gapic_publish was occuring dies without setting the correct batch state / future exceptions:

commit_thread = threading.Thread(
name="Thread-CommitBatchPublisher", target=self._commit, daemon=True
)

  1. State of Batch remains IN_PROGRESS indefinitely

self._status = base.BatchStatus.IN_PROGRESS

  1. Subsequent publishes of messages result in creation of a new batch:

if self.status != base.BatchStatus.ACCEPTING_MESSAGES:
return None

while future is None:
batch = self._create_batch(commit_retry=retry, commit_timeout=timeout)
self._ordered_batches.append(batch)
future = batch.publish(wrapper)

  1. Messages that were in the batch whose commit thread died:
  • Will not have the TransportError set as an exception in their publish futures
  • The publish future will either hang or timeout

Behavior expected with change

  1. TransportError during gapic_publish will be caught in addition to the existing GoogleAPIError

except google.api_core.exceptions.GoogleAPIError as exc:

  1. Existing behavior when GoogleAPIError is caught during gapic_publish would also now apply to TransportError, i.e:
  • Batch status will be set to ERROR, future exceptions will be set:

except google.api_core.exceptions.GoogleAPIError as exc:
# We failed to publish, even after retries, so set the exception on
# all futures and exit.
self._status = base.BatchStatus.ERROR
if self._client.open_telemetry_enabled:
if self._rpc_span:
self._rpc_span.record_exception(
exception=exc,
)
self._rpc_span.set_status(
trace.Status(status_code=trace.StatusCode.ERROR)
)
self._rpc_span.end()
for wrapper in self._message_wrappers:
wrapper.end_create_span(exc=exc)
batch_transport_succeeded = False
if self._batch_done_callback is not None:
# Failed to publish batch.
self._batch_done_callback(batch_transport_succeeded)
for future in self._futures:
future.set_exception(exc)
return

  • Subsequent publish of messages on the batch object will cause an AssertionError:

with self._state_lock:
assert (
self._status != base.BatchStatus.ERROR
), "Publish after stop() or publish error."

which will be bubbled up through the Ordered / Unordered Sequenceer's publish method:

batch = self._ordered_batches[-1]
future = batch.publish(wrapper)
while future is None:
batch = self._create_batch(commit_retry=retry, commit_timeout=timeout)
self._ordered_batches.append(batch)
future = batch.publish(wrapper)
return future

to the Publisher client's publish method:

future = sequencer.publish(
wrapper=wrapper, retry=retry, timeout=timeout
)
future.add_done_callback(on_publish_done)
except BaseException as be:
# Exceptions can be thrown when attempting to add messages to
# the batch. If they're thrown, record them in publisher
# batching and create span, end the spans and bubble the
# exception up.
if self._open_telemetry_enabled:
if wrapper:
wrapper.end_publisher_batching_span(be)
wrapper.end_create_span(be)
else: # pragma: NO COVER
warnings.warn(
message="PublishMessageWrapper is None. Hence, not recording exception and ending publisher batching span and create span",
category=RuntimeWarning,
)
raise be

which would bubble the exception back to the user

Additional details can be found here: #1173 (comment)

Fixes #1173 🦕

@mukund-ananthu mukund-ananthu requested review from a team as code owners January 6, 2025 01:18
@product-auto-label product-auto-label bot added size: s Pull request size is small. api: pubsub Issues related to the googleapis/python-pubsub API. labels Jan 6, 2025
@mukund-ananthu mukund-ananthu merged commit 0e058c7 into main Jan 6, 2025
27 checks passed
@mukund-ananthu mukund-ananthu deleted the gceMeta branch January 6, 2025 19:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/python-pubsub API. size: s Pull request size is small.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Publisher thread terminates, forever breaking publication when GCE metadata service blips
2 participants