Ingesting realtime tweets using Apache Kafka, Tweepy and Python

This post is a part of a series on Lambda Architecture consisting of:


Purpose in Lambda Architecture:

  • main data source for the lambda architecture pipeline
  • uses twitter streaming API to simulate new events coming in every minute
  • Kafka Producer sends the tweets as records to the Kafka Broker

Contents:

Required libraries

import tweepy
import time
from kafka import KafkaConsumer, KafkaProducer

Twitter setup

# twitter setup
consumer_key = "x"
consumer_secret = "x"
access_token = "x"
access_token_secret = "x"
# Creating the authentication object
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
# Setting your access token and secret
auth.set_access_token(access_token, access_token_secret)
# Creating the API object by passing in auth information
api = tweepy.API(auth) 

A helper function to normalize the time a tweet was created with the time of our system

from datetime import datetime, timedelta

def normalize_timestamp(time):
    mytime = datetime.strptime(time, "%Y-%m-%d %H:%M:%S")
    mytime += timedelta(hours=1)   # the tweets are timestamped in GMT timezone, while I am in +1 timezone
    return (mytime.strftime("%Y-%m-%d %H:%M:%S")) 

Defining the Kafka producer

  • specify the Kafka Broker
  • specify the topic name
  • optional: specify partitioning strategy
producer = KafkaProducer(bootstrap_servers='localhost:9092')
topic_name = 'tweets-lambda1'

Producing and sending records to the Kafka Broker

  • querying the Twitter API Object
  • extracting relevant information from the response
  • formatting and sending the data to proper topic on the Kafka Broker
  • resulting tweets have following attributes:
    • id
    • created_at
    • followers_count
    • location
    • favorite_count
    • retweet_count
def get_twitter_data():
    res = api.search("Apple OR iphone OR iPhone")
    for i in res:
        record = ''
        record += str(i.user.id_str)
        record += ';'
        record += str(normalize_timestamp(str(i.created_at)))
        record += ';'
        record += str(i.user.followers_count)
        record += ';'
        record += str(i.user.location)
        record += ';'
        record += str(i.favorite_count)
        record += ';'
        record += str(i.retweet_count)
        record += ';'
        producer.send(topic_name, str.encode(record))
get_twitter_data()

Deployment

  • perform the task every couple of minutes and wait in between
def periodic_work(interval):
    while True:
        get_twitter_data()
        #interval should be an integer, the number of seconds to wait
        time.sleep(interval)
periodic_work(60*1)  # get data every couple of minutes

 

You can find the code from this blog post in this github repository.

Advertisements
This entry was posted in Big Data, Data Engineering. Bookmark the permalink.

4 Responses to Ingesting realtime tweets using Apache Kafka, Tweepy and Python

  1. Pingback: Introduction to Lambda Architecture | Dorian Beganovic

  2. Pingback: Implementing the Batch Layer of Lambda Architecture using S3, Redshift and Apache Kafka | Dorian Beganovic

  3. Pingback: Implementing the Speed Layer of Lambda Architecture using Structured Spark Streaming | Dorian Beganovic

  4. Pingback: Implementing the Serving Layer of Lambda Architecture using Redshift | Dorian Beganovic

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

w

Connecting to %s