Ingesting realtime tweets using Apache Kafka, Tweepy and Python

This post is a part of a series on Lambda Architecture consisting of:

You can also follow a walk-through of the code in this Youtube video:

Purpose in Lambda Architecture:

  • main data source for the lambda architecture pipeline
  • uses twitter streaming API to simulate new events coming in every minute
  • Kafka Producer sends the tweets as records to the Kafka Broker


Required libraries

import tweepy
import time
from kafka import KafkaConsumer, KafkaProducer

Twitter setup

# twitter setup
consumer_key = "x"
consumer_secret = "x"
access_token = "x"
access_token_secret = "x"
# Creating the authentication object
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
# Setting your access token and secret
auth.set_access_token(access_token, access_token_secret)
# Creating the API object by passing in auth information
api = tweepy.API(auth) 

A helper function to normalize the time a tweet was created with the time of our system

from datetime import datetime, timedelta

def normalize_timestamp(time):
    mytime = datetime.strptime(time, "%Y-%m-%d %H:%M:%S")
    mytime += timedelta(hours=1)   # the tweets are timestamped in GMT timezone, while I am in +1 timezone
    return (mytime.strftime("%Y-%m-%d %H:%M:%S")) 

Defining the Kafka producer

  • specify the Kafka Broker
  • specify the topic name
  • optional: specify partitioning strategy
producer = KafkaProducer(bootstrap_servers='localhost:9092')
topic_name = 'tweets-lambda1'

Producing and sending records to the Kafka Broker

  • querying the Twitter API Object
  • extracting relevant information from the response
  • formatting and sending the data to proper topic on the Kafka Broker
  • resulting tweets have following attributes:
    • id
    • created_at
    • followers_count
    • location
    • favorite_count
    • retweet_count
def get_twitter_data():
    res ="Apple OR iphone OR iPhone")
    for i in res:
        record = ''
        record += str(i.user.id_str)
        record += ';'
        record += str(normalize_timestamp(str(i.created_at)))
        record += ';'
        record += str(i.user.followers_count)
        record += ';'
        record += str(i.user.location)
        record += ';'
        record += str(i.favorite_count)
        record += ';'
        record += str(i.retweet_count)
        record += ';'
        producer.send(topic_name, str.encode(record))


  • perform the task every couple of minutes and wait in between
def periodic_work(interval):
    while True:
        #interval should be an integer, the number of seconds to wait
periodic_work(60*1)  # get data every couple of minutes


You can find the code from this blog post in this github repository.


  5. Jerome says:

    Hi Dorian, Thanks for the video.
    The statement : producer.send(topic_name,str.encode(record))
    throws the following error :
    UnicodeEncodeError: ‘ascii’ codec can’t encode characters in position 0-5: ordinal not in range(128)

    I understand the location is not coming in ascii and hence I get this error.But I do not know how to fix this.
    Could you help me with this please ?


