-
Notifications
You must be signed in to change notification settings - Fork 0
/
queue-cleanup.py
80 lines (61 loc) · 2.02 KB
/
queue-cleanup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
"""
This script will remove videos in the download queue that are already present in TA.
Usage:
python queue-cleanup.py
"""
import os
from dotenv import load_dotenv
from elasticsearch import Elasticsearch, helpers
load_dotenv()
es = Elasticsearch(
[os.getenv("ES_HOST")], basic_auth=(os.getenv("ES_USER"), os.getenv("ES_PASSWORD"))
)
def fetch_ids(es, index, scroll="2m", size=1000):
query = {"query": {"match_all": {}}}
data = helpers.scan(
es,
index=index,
query=query,
scroll=scroll,
size=size,
_source_includes=["youtube_id"],
)
for hit in data:
if "youtube_id" in hit["_source"]:
yield hit["_source"]["youtube_id"]
def main():
video_index = "ta_video"
download_index = "ta_download"
queued_videos = set(fetch_ids(es, download_index))
print(f"Found {len(queued_videos)} video ids in download queue")
print("Fetching video ids already present in TA")
downloaded_videos = set(fetch_ids(es, video_index))
duplicates = queued_videos.intersection(downloaded_videos)
print(
f"Found {len(duplicates)} duplicates. \nEnsure to make a snapshot in TA before proceeding!!!"
)
confirmation = input(
"Do you want to remove these duplicates from download queue? (yes/no): "
)
if confirmation != "yes":
print("Aborting")
return
queries = [
{"_op_type": "delete", "_index": download_index, "_id": youtube_id}
for youtube_id in duplicates
]
confirmation = input(
f"Are you sure you want to remove {len(queries)} duplicates in download queue? (yes/no): "
)
if confirmation != "yes":
print("Aborting")
return
print(f"Removing {len(queries)} duplicates")
(success, failed) = helpers.bulk(es, queries, index=download_index)
if success > 0:
print("Successfully removed duplicates.")
if len(failed) > 0:
print("Failed to remove some duplicates.")
print(failed)
if __name__ == "__main__":
main()