Skip to content
This repository has been archived by the owner on Feb 23, 2022. It is now read-only.

Commit

Permalink
minor tweaks to loading and readme
Browse files Browse the repository at this point in the history
  • Loading branch information
ryankicks committed Jun 9, 2015
1 parent 5ec4399 commit 358eaf9
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 14 deletions.
10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ Twitter for BigQuery

This sample code will help you streaming Twitter data into BigQuery, and running simple visualizations. This sample also generates the queries you can run directly in the BigQuery interface, or extend for your applications.

<img src="static/img/screenshot.png" style="width: 70%;"/>
<img src="screenshot.png" style="width: 70%;"/>

Additionally, you can use other public or private datasets in BigQuery to do additional joins and develop other insights/correlations.

Expand Down Expand Up @@ -255,6 +255,10 @@ The following developers and bloggers have aided greatly in the development of t

TODO

- Backfill
- UI Dialog
- One Pager
- FAQ
- Easier to deploy full stack
- environment settings
- container deploy script
- Figure out location, specifically don't use Utils.scrub()
- Admin save/config page + deploy of service?
19 changes: 11 additions & 8 deletions load.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@
import ssl
import zlib
import threading

from threading import Lock
import logging.config
from httplib import *
from config import Config
from apiclient.errors import *
import logging.config
import tweepy

from utils import Utils


NEWLINE = '\r\n'
SLEEP_TIME = 10

Expand Down Expand Up @@ -60,9 +63,12 @@ def on_data(self, data):
if not table:
table = tag.split(".")
created = Utils.insert_table(table[0], table[1], self.schema)
if created:
self.table_mapping[tag] = table

# Brand new table
if created and created != True:
self.logger.info('Created BQ table: %s' % tag)

self.table_mapping[tag] = table

record_scrubbed = Utils.scrub(record)
Utils.insert_records(table[0], table[1], [record_scrubbed])
Expand Down Expand Up @@ -291,11 +297,8 @@ def main():
schema_str = Utils.read_file(schema_file)
schema = json.loads(schema_str)

try:
Utils.insert_table(config.DATASET_ID, config.TABLE_ID, schema)
print "Created default table: %s.%s" % (config.DATASET_ID, config.TABLE_ID)
except Exception, e:
print "Table already exists: %s" % e
Utils.insert_table(config.DATASET_ID, config.TABLE_ID, schema)
print "Default table: %s.%s" % (config.DATASET_ID, config.TABLE_ID)

if config.MODE == 'gnip':
GnipListener.start(schema, logger)
Expand Down
7 changes: 4 additions & 3 deletions utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,13 @@ def insert_table(dataset_id, table_id, schema):
try:
response = Utils.get_bq().tables().insert(projectId=config.PROJECT_ID, datasetId=dataset_id, body=body).execute()
except HttpError, e:
# ignore table already exist errors
if e.code == 409 or "Already Exists" in e.reason:
# HttpError 409 when requesting URI returned
# "Already Exists: Table twitter-for-bigquery:gnip.tweets_nbafinals"
if e.resp.status == 409:
response = True
else:
raise e

return response

@staticmethod
Expand Down

0 comments on commit 358eaf9

Please sign in to comment.