Skip to content

Commit

Permalink
twitter celery job
Browse files Browse the repository at this point in the history
  • Loading branch information
preddy5 committed Aug 16, 2014
1 parent 0bb1c32 commit 33d9e1e
Show file tree
Hide file tree
Showing 12 changed files with 239 additions and 6 deletions.
4 changes: 4 additions & 0 deletions bookie/bcelery/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ def load_ini():
CELERY_TASK_RESULT_EXPIRES=3600,
CELERY_RESULT_BACKEND=INI.get('celery_broker'),
CELERYBEAT_SCHEDULE={
'daily_jobs': {
'task': 'bookie.bcelery.tasks.daily_jobs',
'schedule': timedelta(seconds=24*60*60),
},
'daily_stats': {
'task': 'bookie.bcelery.tasks.daily_stats',
'schedule': timedelta(seconds=24*60*60),
Expand Down
57 changes: 56 additions & 1 deletion bookie/bcelery/tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from __future__ import absolute_import

import tweepy
from celery.utils.log import get_task_logger

from bookie.bcelery.celery import celery
Expand All @@ -13,11 +15,13 @@

from bookie.lib.importer import Importer
from bookie.lib.readable import ReadUrl
from bookie.lib.social_utils import get_url_title
from bookie.models import initialize_sql
from bookie.models import Bmark
from bookie.models import BmarkMgr
from bookie.models import Readable
from bookie.models.auth import UserMgr
from bookie.models.social import SocialMgr
from bookie.models.stats import StatBookmarkMgr
from bookie.models.queue import ImportQueueMgr

Expand All @@ -44,13 +48,21 @@ def hourly_stats():
count_tags.delay()


@celery.task(ignore_result=True)
def daily_jobs():
"""Daily jobs that are to be run
- Refresh's Twitter fetch from user's accounts
"""
process_twitter_connections.delay()


@celery.task(ignore_result=True)
def daily_stats():
"""Daily we want to run a series of numbers to track
Currently we're monitoring:
- Total number of bookmarks for each user in the system
- Delete's inactive accounts if any
"""
count_total_each_user.delay()
delete_non_activated_account.delay()
Expand Down Expand Up @@ -284,6 +296,15 @@ def reindex_fulltext_allbookmarks(sync=False):
fulltext_index_bookmark.delay(b.bid, None)


@celery.task(ignore_result=True)
def process_twitter_connections(username=None):
"""
Run twitter fetch for required username's
"""
for connection in SocialMgr.get_twitter_connections(username):
create_twitter_api(connection)


@celery.task(ignore_result=True)
def fetch_unfetched_bmark_content(ignore_result=True):
"""Check the db for any unfetched content. Fetch and index."""
Expand Down Expand Up @@ -359,3 +380,37 @@ def fetch_bmark_content(bid):
'No readable record '
'during existing processing')
trans.commit()


@celery.task(ignore_result=True)
def create_twitter_api(connection):
oauth_token = INI.get('twitter_consumer_key')
oauth_verifier = INI.get('twitter_consumer_secret')
try:
auth = tweepy.OAuthHandler(oauth_token, oauth_verifier)
auth.set_access_token(
connection.access_key, connection.access_secret)
twitter_user = tweepy.API(auth)
fetch_tweets(twitter_user, connection)
except (tweepy.TweepError, IOError):
logger.error('Twitter connection denied tweepy IOError')


@celery.task(ignore_result=True)
def fetch_tweets(twitter_user, connection):
tweets = twitter_user.user_timeline(
id=connection.twitter_username,
include_entities=True,
since_id=connection.last_tweet_seen)
if tweets:
for tweet in tweets:
for url in tweet.entities['urls']:
expanded_url, title = get_url_title(url['expanded_url'])
new = BmarkMgr.get_by_url(
expanded_url, connection.username)
if not new:
BmarkMgr.store(expanded_url, connection.username,
title, '', 'twitter')
SocialMgr.update_last_tweet_data(connection, tweets[0].id)
else:
pass
12 changes: 12 additions & 0 deletions bookie/lib/social_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@

from BeautifulSoup import BeautifulSoup
import requests
from tweepy import OAuthHandler
from tweepy import API

Expand All @@ -18,3 +20,13 @@ def create_twitter_OAuthHandler(consumer_key, consumer_secret):
auth = OAuthHandler(consumer_key, consumer_secret)
auth_url = auth.get_authorization_url()
return auth_url


def get_url_title(url):
"""Return title of webpage """
try:
webpage = requests.get(url)
parsed_html = BeautifulSoup(webpage.content)
return webpage.url, parsed_html.title.string
except:
return url, ''
17 changes: 17 additions & 0 deletions bookie/models/social.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ def get_all_connections(username):
BaseConnection.username == username)
return connections

@staticmethod
def get_twitter_connections(username=None):
""" Returns all twitter connections based on username """
if username:
connections = TwitterConnection.query.filter(
TwitterConnection.username == username).all()
else:
connections = TwitterConnection.query.all()
return connections

@staticmethod
def store_twitter_connection(username, credentials):
tconnection = TwitterConnection(
Expand All @@ -36,6 +46,12 @@ def store_twitter_connection(username, credentials):
DBSession.add(tconnection)
return tconnection

@staticmethod
def update_last_tweet_data(connection, tweet_id):
connection.last_tweet_seen = tweet_id
connection.refresh_date = datetime.now()
return connection


class BaseConnection(Base):
"""Table to store User basic social information"""
Expand Down Expand Up @@ -66,6 +82,7 @@ class TwitterConnection(BaseConnection):
access_secret = Column(Unicode(255))
twitter_username = Column(Unicode(255))
refresh_date = Column(DateTime)
last_tweet_seen = Column(Unicode(255))

__mapper_args__ = {
'polymorphic_identity': twitter_connection
Expand Down
8 changes: 8 additions & 0 deletions bookie/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,14 @@ def build_routes(config):
"api_admin_imports_reset",
"/api/v1/a/imports/reset/{id}",
request_method="POST")
config.add_route(
"api_admin_twitter_refresh_all",
"/api/v1/a/social/twitter_refresh/all",
request_method="GET")
config.add_route(
"api_admin_twitter_refresh",
"/api/v1/a/social/twitter_refresh/{username}",
request_method="GET")

config.add_route(
"api_admin_users_list",
Expand Down
4 changes: 2 additions & 2 deletions bookie/tests/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ def make_tag(name=None):
return Tag(name)


def make_twitter_connection():
tconnection = TwitterConnection(username='admin',
def make_twitter_connection(username='admin'):
tconnection = TwitterConnection(username=username,
is_active=True,
last_connection=datetime.now(),
uid=u'1022699448',
Expand Down
18 changes: 18 additions & 0 deletions bookie/tests/test_api/test_social_api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

import json
import transaction
from mock import patch

from bookie.tests import factory
from bookie.tests import TestViewBase
Expand Down Expand Up @@ -32,3 +33,20 @@ def testSocialAuthentication(self):
self.assertEqual(
res.status, "403 Forbidden",
"status should be 403")

@patch('bookie.bcelery.tasks.process_twitter_connections')
def testProcessTwitterConnections(self, mock_refresh):
"""Test if process_twitter_connections is called if admin tries to refresh
using api """

factory.make_twitter_connection()
transaction.commit()

params = {
'api_key': self.api_key
}
self.app.get("/api/v1/a/social/twitter_refresh/admin",
params=params,
status=200)

self.assertTrue(mock_refresh.called)
13 changes: 13 additions & 0 deletions bookie/tests/test_bcelery/test_bcelery.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from mock import patch
import transaction

from bookie.bcelery import tasks
Expand All @@ -9,6 +10,7 @@
from bookie.models.stats import StatBookmark

from bookie.tests import empty_db
from bookie.tests import factory
from bookie.tests import gen_random_word
from bookie.tests import TestDBBase

Expand Down Expand Up @@ -97,3 +99,14 @@ def test_task_count_user_total(self):
username = user_key[2]
self.assertTrue(username in expected)
self.assertEqual(expected[username], stat.data)

@patch('bookie.bcelery.tasks.create_twitter_api')
def test_process_twitter_connections(self, mock_create_twitter_api):
"""test if create_twitter_api is called"""
tasks.process_twitter_connections()
self.assertFalse(mock_create_twitter_api.called)

factory.make_twitter_connection()

tasks.process_twitter_connections()
self.assertTrue(mock_create_twitter_api.called)
39 changes: 36 additions & 3 deletions bookie/tests/test_models/test_socialmgr.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@

from datetime import datetime
from pyramid import testing
import transaction
from unittest import TestCase

from bookie.models.social import SocialMgr
from bookie.tests import TestDBBase
from bookie.tests import (
factory,
empty_db)


class TestSocialMgr(TestDBBase):
class TestSocialMgr(TestCase):

def setUp(self):
testing.setUp()

def tearDown(self):
testing.tearDown()
empty_db()

def testConnectionsStore(self):
credentials = {
Expand All @@ -20,4 +32,25 @@ def testConnectionsStore(self):
SocialMgr.store_twitter_connection('admin', credentials)

connection = SocialMgr.get_all_connections('admin')
self.assertEqual(1, len(connection.all()),)
self.assertEqual(1, len(connection.all()))

def testConnectionsReturn(self):
factory.make_twitter_connection()
factory.make_twitter_connection(username='bookie')
transaction.commit()

connections = SocialMgr.get_twitter_connections()
self.assertEqual(2, len(connections))

connection = SocialMgr.get_twitter_connections('bookie')
self.assertEqual(1, len(connection))

def testTweetIdUpdate(self):
factory.make_twitter_connection(username='admin')
transaction.commit()

connections = SocialMgr.get_twitter_connections('admin')
SocialMgr.update_last_tweet_data(connections[0], '123456')

new_connections = SocialMgr.get_twitter_connections('admin')
self.assertEqual(new_connections[0].last_tweet_seen, '123456')
15 changes: 15 additions & 0 deletions bookie/views/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,21 @@ def data(urls):
})


@view_config(route_name="api_admin_twitter_refresh", renderer="jsonp")
@view_config(route_name="api_admin_twitter_refresh_all", renderer="jsonp")
@api_auth('api_key', UserMgr.get, admin_only=True)
def twitter_refresh(request):
"""Update tweets fetched from user account """
mdict = request.matchdict
username = mdict.get('username', None)
tasks.process_twitter_connections(username)
ret = {
'success': True,
'message': "running bot to fetch user's tweets"
}
return _api_response(request, ret)


@view_config(route_name="api_admin_readable_reindex", renderer="jsonp")
@api_auth('api_key', UserMgr.get, admin_only=True)
def readable_reindex(request):
Expand Down
22 changes: 22 additions & 0 deletions dbversions/versions/dbc7a0f1182_adding_last_twitter_seen_column.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""adding last_twitter_seen column
Revision ID: dbc7a0f1182
Revises: 352bb5f4fff9
Create Date: 2014-07-06 21:23:56.002719
"""

# revision identifiers, used by Alembic.
revision = 'dbc7a0f1182'
down_revision = '352bb5f4fff9'

from alembic import op
import sqlalchemy as sa


def upgrade():
op.add_column('TwitterConnection', sa.Column('last_tweet_seen', sa.Unicode(length=255), nullable=True))


def downgrade():
op.drop_column('TwitterConnection', 'last_tweet_seen')
36 changes: 36 additions & 0 deletions docs/api/admin.rst
Original file line number Diff line number Diff line change
Expand Up @@ -278,3 +278,39 @@ GET `/api/v1/admin/stats/bmarks`

requests.get('http://127.0.0.1:6543/api/v1/admin/stats/bmarks?api_key=12345...')
>>> ...


/a/social/twitter_refresh/:username
-------------------
GET `/a/social/twitter_refresh/:username`

Refresh twitter fetch for specific user

:query param: api_key *required* - the api key for your account to make the call with
:query param: callback - wrap JSON response in an optional callback

::

requests.get('http://127.0.0.1:6543/api/v1/a/social/twitter_refresh/admin?api_key=12345...')
>>> {
"message": "running bot to fetch user's tweets"
"success": true,
}


/a/social/twitter_refresh/all
-------------------
GET `/a/social/twitter_refresh/all`

Refresh twitter fetch for all the users

:query param: api_key *required* - the api key for your account to make the call with
:query param: callback - wrap JSON response in an optional callback

::

requests.get('http://127.0.0.1:6543/api/v1/a/social/twitter_refresh/all?api_key=12345...')
>>> {
"message": "running bot to fetch user's tweets"
"success": true,
}

0 comments on commit 33d9e1e

Please sign in to comment.