Skip to content

Commit

Permalink
[8.17] also cast doc_id to str (#3031) (#3034)
Browse files Browse the repository at this point in the history
Co-authored-by: Sean Story <[email protected]>
  • Loading branch information
github-actions[bot] and seanstory authored Dec 16, 2024
1 parent 44233ba commit 527f3fb
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 21 deletions.
28 changes: 14 additions & 14 deletions connectors/es/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
get_size,
iso_utc,
retryable,
sanitize,
)

__all__ = ["SyncOrchestrator"]
Expand Down Expand Up @@ -484,17 +485,22 @@ async def put_doc(self, doc):
await self.queue.put(doc)

async def run(self, generator, job_type):
sanitized_generator = (
(sanitize(doc), *other) async for doc, *other in generator
)
try:
match job_type:
case JobType.FULL:
await self.get_docs(generator)
await self.get_docs(sanitized_generator)
case JobType.INCREMENTAL:
if self.skip_unchanged_documents:
await self.get_docs(generator, skip_unchanged_documents=True)
await self.get_docs(
sanitized_generator, skip_unchanged_documents=True
)
else:
await self.get_docs_incrementally(generator)
await self.get_docs_incrementally(sanitized_generator)
case JobType.ACCESS_CONTROL:
await self.get_access_control_docs(generator)
await self.get_access_control_docs(sanitized_generator)
case _:
raise UnsupportedJobType
except asyncio.CancelledError:
Expand Down Expand Up @@ -546,7 +552,7 @@ async def get_docs(self, generator, skip_unchanged_documents=False):
self._log_progress()

doc_id = doc.pop("_id")
doc["id"] = str(doc_id)
doc["id"] = doc_id

if self.basic_rule_engine and not self.basic_rule_engine.should_ingest(
doc
Expand Down Expand Up @@ -662,7 +668,7 @@ async def get_docs_incrementally(self, generator):
self._log_progress()

doc_id = doc.pop("_id")
doc["id"] = str(doc_id)
doc["id"] = doc_id

if self.basic_rule_engine and not self.basic_rule_engine.should_ingest(
doc
Expand Down Expand Up @@ -719,13 +725,7 @@ async def get_access_control_docs(self, generator):
self._logger.info("Starting access control doc lookups")
generator = self._decorate_with_metrics_span(generator)

existing_ids = {
doc_id: last_update_timestamp
async for (
doc_id,
last_update_timestamp,
) in self.client.yield_existing_documents_metadata(self.index)
}
existing_ids = await self._load_existing_docs()

if self._logger.isEnabledFor(logging.DEBUG):
self._logger.debug(
Expand All @@ -740,7 +740,7 @@ async def get_access_control_docs(self, generator):
self._log_progress()

doc_id = doc.pop("_id")
doc["id"] = str(doc_id)
doc["id"] = doc_id
doc_exists = doc_id in existing_ids

if doc_exists:
Expand Down
9 changes: 9 additions & 0 deletions connectors/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,15 @@ def nested_get(dictionary_, keys_, default_=None):
return nested_get(dictionary, keys, default)


def sanitize(doc):
if doc["_id"]:
# guarantee that IDs are strings, and not numeric
doc["_id"] = str(doc["_id"])
doc["id"] = doc["_id"]

return doc


class Counters:
"""
A utility to provide code readability to managing a collection of counts
Expand Down
14 changes: 7 additions & 7 deletions tests/test_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,23 +383,23 @@ async def _dl(doit=True, timestamp=None):
def index_operation(doc):
# deepcopy as get_docs mutates docs
doc_copy = deepcopy(doc)
doc_id = doc_copy.pop("_id")
doc_copy["id"] = str(doc_id)
doc_id = str(doc_copy.pop("_id"))
doc_copy["id"] = doc_id

return {"_op_type": "index", "_index": INDEX, "_id": doc_id, "doc": doc_copy}


def update_operation(doc):
# deepcopy as get_docs mutates docs
doc_copy = deepcopy(doc)
doc_id = doc_copy.pop("_id")
doc_copy["id"] = str(doc_id)
doc_id = str(doc_copy.pop("_id"))
doc_copy["id"] = doc_id

return {"_op_type": "update", "_index": INDEX, "_id": doc_id, "doc": doc_copy}


def delete_operation(doc):
return {"_op_type": "delete", "_index": INDEX, "_id": doc["_id"]}
return {"_op_type": "delete", "_index": INDEX, "_id": str(doc["_id"])}


def end_docs_operation():
Expand Down Expand Up @@ -752,7 +752,7 @@ async def test_get_docs(
lazy_downloads = await lazy_downloads_mock()

yield_existing_documents_metadata.return_value = AsyncIterator(
[(doc["_id"], doc["_timestamp"]) for doc in existing_docs]
[(str(doc["_id"]), doc["_timestamp"]) for doc in existing_docs]
)

with mock.patch("connectors.utils.ConcurrentTasks", return_value=lazy_downloads):
Expand Down Expand Up @@ -1042,7 +1042,7 @@ async def test_get_access_control_docs(
expected_total_docs_deleted,
):
yield_existing_documents_metadata.return_value = AsyncIterator(
[(doc["_id"], doc["_timestamp"]) for doc in existing_docs]
[(str(doc["_id"]), doc["_timestamp"]) for doc in existing_docs]
)

queue = await queue_mock()
Expand Down

0 comments on commit 527f3fb

Please sign in to comment.