-
Notifications
You must be signed in to change notification settings - Fork 1
/
send-data-to-pubsub.py
37 lines (31 loc) · 1.03 KB
/
send-data-to-pubsub.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from google.cloud import pubsub_v1
from google.cloud import storage
import time
#Create a publisher client
publisher = pubsub_v1.PublisherClient()
#Specify the topic path
topic_path = 'projects/streaming-project-415718/topics/topic-conversations'
#Get the topic
topic = publisher.get_topic(request={"topic": topic_path})
#Check if the topic exists
if topic is None:
print('Topic does not exist:', topic_path)
exit()
#Create a storage client
storage_client = storage.Client()
#Specify the bucket and file names
bucket_name = 'streaming-project'
file_name = 'conversations.json'
#Get the bucket and blob
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_name)
#Read the file line by line
with blob.open("r") as f_in:
for line in f_in:
#Data must be a bytestring
data = line.encode('utf-8')
#Publish the data to the topic
future = publisher.publish(topic=topic.name, data=data)
print(future.result())
#Sleep for 1 second before publishing the next message
time.sleep(1)