Ingesting realtime tweets using Apache Kafka, Tweepy and Python

This post is a part of a series on Lambda Architecture consisting of:

You can also follow a walk-through of the code in this Youtube video:


Purpose in Lambda Architecture:

  • main data source for the lambda architecture pipeline
  • uses twitter streaming API to simulate new events coming in every minute
  • Kafka Producer sends the tweets as records to the Kafka Broker

Contents:

Required libraries

import tweepy
import time
from kafka import KafkaConsumer, KafkaProducer

Twitter setup

# twitter setup
consumer_key = "x"
consumer_secret = "x"
access_token = "x"
access_token_secret = "x"
# Creating the authentication object
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
# Setting your access token and secret
auth.set_access_token(access_token, access_token_secret)
# Creating the API object by passing in auth information
api = tweepy.API(auth) 

A helper function to normalize the time a tweet was created with the time of our system

from datetime import datetime, timedelta

def normalize_timestamp(time):
    mytime = datetime.strptime(time, "%Y-%m-%d %H:%M:%S")
    mytime += timedelta(hours=1)   # the tweets are timestamped in GMT timezone, while I am in +1 timezone
    return (mytime.strftime("%Y-%m-%d %H:%M:%S")) 

Defining the Kafka producer

  • specify the Kafka Broker
  • specify the topic name
  • optional: specify partitioning strategy
producer = KafkaProducer(bootstrap_servers='localhost:9092')
topic_name = 'tweets-lambda1'

Producing and sending records to the Kafka Broker

  • querying the Twitter API Object
  • extracting relevant information from the response
  • formatting and sending the data to proper topic on the Kafka Broker
  • resulting tweets have following attributes:
    • id
    • created_at
    • followers_count
    • location
    • favorite_count
    • retweet_count
def get_twitter_data():
    res = api.search("Apple OR iphone OR iPhone")
    for i in res:
        record = ''
        record += str(i.user.id_str)
        record += ';'
        record += str(normalize_timestamp(str(i.created_at)))
        record += ';'
        record += str(i.user.followers_count)
        record += ';'
        record += str(i.user.location)
        record += ';'
        record += str(i.favorite_count)
        record += ';'
        record += str(i.retweet_count)
        record += ';'
        producer.send(topic_name, str.encode(record))
get_twitter_data()

Deployment

  • perform the task every couple of minutes and wait in between
def periodic_work(interval):
    while True:
        get_twitter_data()
        #interval should be an integer, the number of seconds to wait
        time.sleep(interval)
periodic_work(60*1)  # get data every couple of minutes

 

You can find the code from this blog post in this github repository.

Advertisements

About dorianbg

A Data Engineer based in London, United Kingdom
This entry was posted in Big Data, Data Engineering. Bookmark the permalink.

6 Responses to Ingesting realtime tweets using Apache Kafka, Tweepy and Python

  1. Pingback: Introduction to Lambda Architecture | Dorian Beganovic

  2. Pingback: Implementing the Batch Layer of Lambda Architecture using S3, Redshift and Apache Kafka | Dorian Beganovic

  3. Pingback: Implementing the Speed Layer of Lambda Architecture using Structured Spark Streaming | Dorian Beganovic

  4. Pingback: Implementing the Serving Layer of Lambda Architecture using Redshift | Dorian Beganovic

  5. Jerome says:

    Hi Dorian, Thanks for the video.
    The statement : producer.send(topic_name,str.encode(record))
    throws the following error :
    UnicodeEncodeError: ‘ascii’ codec can’t encode characters in position 0-5: ordinal not in range(128)

    I understand the location is not coming in ascii and hence I get this error.But I do not know how to fix this.
    Could you help me with this please ?
    Thanks
    Jerome

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s