Implementing the Speed Layer of Lambda Architecture using Spark Structured Streaming

This post is a part of a series on Lambda Architecture consisting of:

You can find a Youtube playlist demonstrating each of the topics here.


Purpose in Lambda Architecture:

  • provide analytics on real time data (“intra day”) which batch layer cannot efficiently achieve
  • achieve this by:
    • ingest latest tweets from Kafka Producer and analtze only those for the current day
    • perform aggregations over the data to get the desired output of speed layer

Contents:

  • Configuring spark
  • Spark Structured Streaming
    • Input stage – defining the data source
    • Result stage – performing transformations on the stream
    • Output stage
  • Connecting to redshift cluster
  • Exporting data to Redshift

Requirements

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.ProcessingTime
import java.util.concurrent._

Configuring Spark

  • properly configuring spark for our workload
  • defining case class for tweets which will be used later on
Thread.sleep(5000)

val spark = SparkSession
  .builder()
  .config("spark.sql.shuffle.partitions","2")  // we are running this on my laptop
  .appName("Spark Structured Streaming example")
  .getOrCreate()
  
case class tweet (id: String, created_at : String, followers_count: String, location : String, favorite_count : String, retweet_count : String)

Thread.sleep(5000)

Input stage – defining the data source

  • using Kafka as data source we specify:
    • location of kafka broker
    • relevant kafka topic
    • how to treat starting offsets
var data_stream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "tweets-lambda1")
  .option("startingOffsets","earliest")  //or latest
  .load()
 
// note how similar API is to the batch version

Result stage – performing transformations on the stream

  • extract the value column of kafka message
  • parse each row into a member of tweet class
  • filter to only look at todays tweets as results
  • perform aggregations
var data_stream_cleaned = data_stream
    .selectExpr("CAST(value AS STRING) as string_value")
    .as[String]
    .map(x => (x.split(";")))
    .map(x => tweet(x(0), x(1), x(2),  x(3), x(4), x(5)))
    .selectExpr( "cast(id as long) id", "CAST(created_at as timestamp) created_at",  "cast(followers_count as int) followers_count", "location", "cast(favorite_count as int) favorite_count", "cast(retweet_count as int) retweet_count")
    .toDF()
    .filter(col("created_at").gt(current_date()))   // kafka will retain data for last 24 hours, this is needed because we are using complete mode as output
    .groupBy("location")
    .agg(count("id"), sum("followers_count"), sum("favorite_count"),   sum("retweet_count"))  

Output stage

  • specify the following:
    • data sink – exporting to memory (table can be accessed similar to registerTempTable()/ createOrReplaceTempView() function )
    • trigger – time between running the pipeline (ie. when to do: polling for new data, data transformation)
    • output mode – complete, append or update – since in Result stage we use aggregates, we can only use Complete or Update out put mode
val query = data_stream_cleaned.writeStream
    .format("memory")
    .queryName("demo")
    .trigger(ProcessingTime("60 seconds"))   // means that that spark will look for new data only every minute
    .outputMode("complete") // could also be complete or update
    .start()

Connecting to redshift cluster

  • defining JDBC connection to connect to redshift
//create properties object
Class.forName("com.amazon.redshift.jdbc42.Driver")

val prop = new java.util.Properties
prop.setProperty("driver", "com.amazon.redshift.jdbc42.Driver")
prop.setProperty("user", "x")
prop.setProperty("password", "x") 

//jdbc mysql url - destination database is named "data"
val url = "jdbc:redshift://data-warehouse.x.us-east-1.redshift.amazonaws.com:5439/lambda"

//destination database table 
val table = "speed_layer"

Exporting data to Redshift

  • “overwriting” the table with results of query stored in memory as result of the speed layer
  • scheduling the function to run every hour
def exportToRedshift(){
    val df = spark.sql("select * from demo")

    //write data from spark dataframe to database
    df.write.mode("overwrite").jdbc(url, table, prop)
}


val ex = new ScheduledThreadPoolExecutor(1)
val task = new Runnable { 
  def run() = exportToRedshift()
}
val f = ex.scheduleAtFixedRate(task, 1, 1, TimeUnit.HOURS)
f.cancel(false)  

 

You can find the code from this blog post in this github repository.

Advertisements
Posted in Big Data, Data Engineering | 4 Comments

Implementing the Serving Layer of Lambda Architecture using Redshift

This post is a part of a series on Lambda Architecture consisting of:


Purpose in Lambda Architecture:

  • merge the output of speed and batch layer aggregations
  • achieve this by:
    • every couple of hours run the re-computation
    • use the output of batch layer as base table
    • upsert the up-to-date values of speed layer into the base table

Contents:

Requirements

import psycopg2

Creating the serving layer

  • authenticate and create a connection using psycopg module
  • create and populate a temporary table with it’s base being batch layer and upserting the speed layer
  • drop the current serving layer and use the above mentioned temporary table for serving layer (no downtime)
config = { 'dbname': 'lambda', 
           'user':'x',
           'pwd':'x',
           'host':'data-warehouse.x.us-east-1.redshift.amazonaws.com',
           'port':'5439'
         }
conn =  psycopg2.connect(dbname=config['dbname'], host=config['host'], 
                              port=config['port'], user=config['user'], 
                              password=config['pwd'])

curs = conn.cursor()
curs.execute(""" 
    DROP TABLE IF EXISTS serving_layer_temp; 

    SELECT 
         *
    INTO 
        serving_layer_temp
    FROM 
        batch_layer ;


    UPDATE 
        serving_layer_temp
    SET
        count_id = count_id + speed_layer."count(id)",
        sum_followers_count = sum_followers_count + speed_layer."sum(followers_count)",
        sum_favorite_count = sum_favorite_count + speed_layer."sum(favorite_count)",
        sum_retweet_count = sum_retweet_count + speed_layer."sum(retweet_count)"
    FROM
        speed_layer
    WHERE 
        serving_layer_temp.location = speed_layer.location ;


    INSERT INTO 
        serving_layer_temp
    SELECT 
        * 
    FROM 
        speed_layer
    WHERE 
        speed_layer.location 
    NOT IN (
        SELECT 
            DISTINCT location 
        FROM 
            serving_layer_temp 
    ) ;
    drop table serving_layer ;
    
    alter table serving_layer_temp
    rename to serving_layer ;        
    
""")
curs.close()
conn.commit()
conn.close()

 

You can find the code from this blog post in this github repository.

Posted in Big Data, Data Engineering | 4 Comments

Implementing the Batch Layer of Lambda Architecture using S3, Redshift and Apache Kafka

This post is a part of a series on Lambda Architecture consisting of:


Purpose in Lambda Architecture:

  • store all the tweets that were produced by Kafka Producer into S3
  • export them into Redshift
  • perform aggregation on the tweets to get the desired output of batch layer
  • achieve this by:
    • every couple of hours get the latest unseen tweets produced by the Kafka Producer and store them into a S3 archive
    • every night run a sql query to compute the result of batch layer

Contents:

Required libraries

from kafka import KafkaConsumer
from io import StringIO
import boto3
import time
import random

Defining the Kafka consumer

  • setting the location of Kafka Broker
  • specifying the groupid and consumertimeout
  • subsribing to a topic
consumer = KafkaConsumer(
                        bootstrap_servers='localhost:9092',
                        auto_offset_reset='latest',  # Reset partition offsets upon OffsetOutOfRangeError
                        group_id='test',   # must have a unique consumer group id 
                        consumer_timeout_ms=10000)  
                                # How long to listen for messages - we do it for 10 seconds 
                                # because we poll the kafka broker only each couple of hours

consumer.subscribe('tweets-lambda1')

Defining a Amazon Web Services S3 storage client

  • setting the autohrizaition and bucket
s3_resource = boto3.resource(
    's3',
    aws_access_key_id='x',
    aws_secret_access_key='x',
)

s3_client = s3_resource.meta.client
bucket_name = 'lambda-architecture123'

Writing the data into a S3 bucket

  • polling the Kafka Broker
  • aggregating the latest messages into a single object in the bucket
def store_twitter_data(path):
    csv_buffer = StringIO() # S3 storage is object storage -> our document is just a large string

    for message in consumer: # this acts as "get me an iterator over the latest messages I haven't seen"
        csv_buffer.write(message.value.decode() + '\n') 

    s3_resource.Object(bucket_name,path).put(Body=csv_buffer.getvalue())

Exporting data from S3 bucket to Amazon Redshift using COPY command

  • authenticate and create a connection using psycopg module
  • export data using COPY command from S3 to Redshift “raw” table
import psycopg2
config = { 'dbname': 'lambda', 
           'user':'x',
           'pwd':'x',
           'host':'data-warehouse.x.us-east-1.redshift.amazonaws.com',
           'port':'5439'
         }
conn =  psycopg2.connect(dbname=config['dbname'], host=config['host'], 
                              port=config['port'], user=config['user'], 
                              password=config['pwd'])
def copy_files(conn, path):
    curs = conn.cursor()
    curs.execute(""" 
        copy 
        batch_raw
        from 
        's3://lambda-architecture123/""" + path + """'  
        access_key_id 'x'
        secret_access_key 'x'
        delimiter ';'
        region 'eu-central-1'
    """)
    curs.close()
    conn.commit()

Computing the batch layer output

  • querying the raw tweets stored in redshift to get the desired batch layer output
def compute_batch_layer(conn):
    curs = conn.cursor()
    curs.execute(""" 
        drop table if exists batch_layer;

        with raw_dedup as (
        SELECT
            distinct id,created_at,followers_count,location,favorite_count,retweet_count
        FROM
            batch_raw
        ),
        batch_result as (SELECT
            location,
            count(id) as count_id,
            sum(followers_count) as sum_followers_count,
            sum(favorite_count) as sum_favorite_count,
            sum(retweet_count) as sum_retweet_count
        FROM
            raw_dedup
        group by 
            location
        )
        select 
            *
        INTO
            batch_layer
        FROM
            batch_result""")
    curs.close()
    conn.commit()
# compute_batch_layer(conn)

Deployment

  • perform the task every couple of hours and wait in between
def periodic_work(interval):
    while True:
        path = 'apple-tweets/'+ time.strftime("%Y/%m/%d/%H") + '_tweets_' + str(random.randint(1,1000)) + '.log'
        store_twitter_data(path)
        time.sleep(interval/2)
        copy_files(conn, path)
        #interval should be an integer, the number of seconds to wait
        time.sleep(interval/2)

periodic_work(60* 4) ## 4 minutes !
# run at the end of the day
compute_batch_layer(conn)
conn.close()

 

You can find the code from this blog post in this github repository.

Posted in Big Data, Data Engineering | 4 Comments

Ingesting realtime tweets using Apache Kafka, Tweepy and Python

This post is a part of a series on Lambda Architecture consisting of:


Purpose in Lambda Architecture:

  • main data source for the lambda architecture pipeline
  • uses twitter streaming API to simulate new events coming in every minute
  • Kafka Producer sends the tweets as records to the Kafka Broker

Contents:

Required libraries

import tweepy
import time
from kafka import KafkaConsumer, KafkaProducer

Twitter setup

# twitter setup
consumer_key = "x"
consumer_secret = "x"
access_token = "x"
access_token_secret = "x"
# Creating the authentication object
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
# Setting your access token and secret
auth.set_access_token(access_token, access_token_secret)
# Creating the API object by passing in auth information
api = tweepy.API(auth) 

A helper function to normalize the time a tweet was created with the time of our system

from datetime import datetime, timedelta

def normalize_timestamp(time):
    mytime = datetime.strptime(time, "%Y-%m-%d %H:%M:%S")
    mytime += timedelta(hours=1)   # the tweets are timestamped in GMT timezone, while I am in +1 timezone
    return (mytime.strftime("%Y-%m-%d %H:%M:%S")) 

Defining the Kafka producer

  • specify the Kafka Broker
  • specify the topic name
  • optional: specify partitioning strategy
producer = KafkaProducer(bootstrap_servers='localhost:9092')
topic_name = 'tweets-lambda1'

Producing and sending records to the Kafka Broker

  • querying the Twitter API Object
  • extracting relevant information from the response
  • formatting and sending the data to proper topic on the Kafka Broker
  • resulting tweets have following attributes:
    • id
    • created_at
    • followers_count
    • location
    • favorite_count
    • retweet_count
def get_twitter_data():
    res = api.search("Apple OR iphone OR iPhone")
    for i in res:
        record = ''
        record += str(i.user.id_str)
        record += ';'
        record += str(normalize_timestamp(str(i.created_at)))
        record += ';'
        record += str(i.user.followers_count)
        record += ';'
        record += str(i.user.location)
        record += ';'
        record += str(i.favorite_count)
        record += ';'
        record += str(i.retweet_count)
        record += ';'
        producer.send(topic_name, str.encode(record))
get_twitter_data()

Deployment

  • perform the task every couple of minutes and wait in between
def periodic_work(interval):
    while True:
        get_twitter_data()
        #interval should be an integer, the number of seconds to wait
        time.sleep(interval)
periodic_work(60*1)  # get data every couple of minutes

 

You can find the code from this blog post in this github repository.

Posted in Big Data, Data Engineering | 4 Comments

Introduction to Lambda Architecture

This post is a part of a series on Lambda Architecture consisting of:


Contents

What is Lambda architecture

Definiton from wikipedia:

Lambda architecture is a data-processing architecture designed to handle massive quantities of data by taking advantage of both batch and stream processing methods. This approach to architecture attempts to balance latency, throughput, and fault-tolerance by using batch processing to provide comprehensive and accurate views of batch data, while simultaneously using real-time stream processing to provide views of online data. The two view outputs may be joined before presentation.

Lambda architecture consists of three main components:

  1. batch layer

  2. speed layer

  3. serving layer

Batch layer

Batch layer provides the functionality of append-only set of raw data and computed batch views.

The batch layer looks at all the data and server as the single version of the truth by processing all available data when generating views.

It server as correction of the speed layer because it can aggregate large volumes of historical data and provides full fault tolerance unlike streaming systems.

Speed layer

Speed layer processes data streams in real time to accomodate low latency requests for data.

It is responsible for filling the gap between the latest batch view and real time.

By operating on smaller windows of data it can provide fast, real time views of real time data.

Serving layer

Serving layer merges together the outputs of batch and speed layers to provide the lowest latency and highest accuracy responses to users queries.


 

Example of lambda architecture using Kafka, Spark Streaming, S3, Redshift

You can observe the three main layers in the following example of the architecture:

Lambda_architecture-2.png

Data ingestion implementation

Streaming data is ingested using Apache Kafka.

Kafka provides immutable, persisted and replicated short term storage of data.

The data is then forwarded to various consumers who subscribe to relevant topics.

Speed layer implementation

Streaming data in consumed every couple of minutes using Structured Spark Streaming.

The data from the start of the day until current time is aggregated every hour and the results of aggregation are exported (overwrite) into the Speed view table on Amazon Redshift (marked as Speed table in drawing).

Batch layer implementation

Streaming data is incrementally consumed every couple of hours using a Kafka Consumer.

The data is then uploaded to S3 object storage which serves as cheap long term storage.

Directly from S3 the data is also imported into Redshift “raw_batch” table.

This “raw batch” table is then aggregated to produce the Batch view which has the same definition as Speed view.

Compared to the Speed view, the Batch view is produced once a day based on all historical data.

Serving layer implementation

Batch view contains the aggregations of all historical data except the data for today and the Speed view contains only the aggregation of todays data.

Serving layer combines the aggregations found in Batch and Speed view to enable the consumption of most recent intraday data as well as all historical data.

Since both views share the same definition they can be easily merged using SQL commands on Amazon Redshift.

 

You can find the code from this blog post in this github repository.

Posted in Big Data, Data Engineering | 4 Comments

Windows functions in PostgresQL

1. Setting up postgresql on Mac OS

Install postgresql:

brew install postgresql 

Start postgres:

brew services start postgresql

Login to postgres shell to create user:

/usr/local/bin/psql -d postgres

Create the user:

CREATE USER user PASSWORD ‘password';

If you use a SQL client, I recommend dBeaver, you can now easily connect to the database.

2. Preparing our dataset (DDL, DML)

Defining our table:

create table topup_data (
    user_id int not null, 
    date date not null, 
    top_up_value int not null default '0'  
)

Populating the table:

insert into topup_data values 
(1,'2017-06-20',15),
(1,'2017-05-22',10),
(1,'2017-04-18',20),
(1,'2017-03-20',20),
(1,'2017-02-20',15),
(2,'2017-06-20',5),
(2,'2017-06-05',10),
(2,'2017-04-22',10),
(2,'2017-03-30',10),
(2,'2017-03-15',15),
(2,'2017-02-10',10)

3. Basic window function queries

Task 1

Using window function row_number() we can find the 5 most recent top-ups per user:

with temp as (
select 
    user_id, 
    date,
    top_up_value, 
    row_number() over (partition by user_id order by date DESC) as row_n
from 
    topup_data
)
select 
    *
from 
    temp
where 
    row_n <= 5

Task 2

Using window functions rank() and dense_rank() we can also find the 5 largest top ups per user:

with temp as (
    select 
        user_id, 
        date,
        top_up_value, 
        rank() over (partition by user_id order by top_up_value DESC) as row_n
    from 
        topup_data
)
select 
    *
from 
    temp
where 
    row_n <= 5

Compared to row_number(), the rank() function assigned the same rank to identical values while possibly skipping rank values after it assigned multiple ranks to just one value. Eg. 1 -> 2 -> 2 -> 4 -> 5.

If we used dense_rank() instead of rank() then we will not skip rank values in the identical rows. Eg. 1 -> 2 -> 2 -> 3 -> 4.

Task 3

We can also do inter-row calculations to enrich the original dataset to include extra columns “previous_top-up_date” and “days_since_previous_top_up”. We will use the lag() and lead() window functions for this task.

We could easily create a new table schema as shown in task 2, but we can also create a new table from the result of table as shown below:

with temp1 as (
    select 
        user_id, 
        date,
        top_up_value, 
        lead(date,1) over (partition by user_id order by date desc) as previous_top_up_date
    from 
        topup_data
), 
temp2 as (
    select 
        * , 
        date - previous_top_up_date as days_since_previous_top_up
    from 
        temp1
)
select 
    *
into 
    enriched_topup_data
from 
    temp2

Please note that we could have also used the lag() window function in the last query.

Conclusion

Window functions are the bread and butter of analytical queries since they allow complex queries using very simple syntax.

 

 

 

Posted in Data Engineering, SQL Server, T-SQL | Leave a comment

T-SQL Window functions syntax

Window functions are an advanced and powerful feature of the T-SQL language. I will give a few tips on how to use and examples on the AdventureWorks2014 OLTP database.

Here I will give some notes on how to use them:

1. They can only be specified in the SELECT and ORDER BY clause of a SQL statement

This has important implications since we know that the logical processing of a query is in the following order:

FROM -> WHERE -> GROUP BY -> HAVING -> SELECT -> ORDER BY

Thus the input of the WINDOW FUNCTION is the result set after applying FROM, WHERE, GROUP BY and HAVING transformation.

This means that if there was the use of the WHERE clause of GROUP BY we will get different input for the window function that what is in the source table.

 Example:


SELECT TOP 5 AdressLine1, ROW_NUMBER() OVER (ORDER BY AdressLine1) as [row_number]
FROM [AdventureWorks2014].[Person].[Address]

While a similar query with where clause gives completely different result.

SELECT TOP 5 AdressLine1, ROW_NUMBER() OVER (ORDER BY AdressLine1) as [row_number]
FROM [AdventureWorks2014].[Person].[Address]
WHERE city_name LIKE 'New York'

2. They are outlined by 2 main parts

<window function> OVER ( <window definition>) AS alias

  1. Window function
    • There are 2 types of window functions
      1. Ranking
        • ROW_NUMBER()
        • RANK()
        • DENSE_RANK()
        • NTILE()
      2. Offset
        • LEAD(<col>)
        • LAG(<col>)
        • FIRST_VALUE(<col>)
        • LAST_VALUE(<col>)
      3. Aggregate window functions
        • Sum(col)
        • Avg(col)
        • Max(col)
  2. Window on which the function operates
    • specified in the OVER clause
    • defined in more detail below

3. The OVER clause has 3 parts

Before reading anything below, you must understand that the window function operates on the Result Set of the underlying query.

  1. Partitioning
    • PARTITION BY clause
    • restricts the window on which the window function operates
    • acts like a where clause
    • defined on a column of the underlying result set
  2. Ordering
    • ORDER BY clause
    • defines ordering in the window
    • defined on a column of the underlying result set
  3. Framing
    • ROWS BETWEEN <above delimiter> AND <below delimiter> clause
    • defines how many rows from the CURRENT ROW will be in the window
    • example functions:
      • UNBOUNDED PRECEDING
      • CURRENT ROW
      • UNBOUNDED FOLLOWING

An example of a window function utilizing most of the options is:

 Example:

This code will calculate for each item (salesOrderDetailId) in a large order (orderId) the item which was bought before with the maximum price.

Eg. I bought 3 items in one order

  • 1 Fridge for 1000$
  • 2 TV for 2000$
  • 3 Phone for 500$

The result would be

  • 1 1000$
  • 2 2000$
  • 3 2000$

Code:

SELECT&amp;nbsp;
  salesOrderId,
  max(linetotal) OVER (
      PARTITION BY salesOrderId&amp;nbsp;
      ORDER BY salesOrderDetailId
      ROWS BETWEEN unbounded preceding AND current row)
   AS average_line_total;
FROM
    [AdventureWorks2014].[Sales].[SalesOrderDetail]

 

 

Posted in Data Engineering, SQL Server, T-SQL | Leave a comment