Implementing the Serving Layer of Lambda Architecture using Redshift

This post is a part of a series on Lambda Architecture consisting of:


Purpose in Lambda Architecture:

  • merge the output of speed and batch layer aggregations
  • achieve this by:
    • every couple of hours run the re-computation
    • use the output of batch layer as base table
    • upsert the up-to-date values of speed layer into the base table

Contents:

Requirements

import psycopg2

Creating the serving layer

  • authenticate and create a connection using psycopg module
  • create and populate a temporary table with it’s base being batch layer and upserting the speed layer
  • drop the current serving layer and use the above mentioned temporary table for serving layer (no downtime)
config = { 'dbname': 'lambda', 
           'user':'x',
           'pwd':'x',
           'host':'data-warehouse.x.us-east-1.redshift.amazonaws.com',
           'port':'5439'
         }
conn =  psycopg2.connect(dbname=config['dbname'], host=config['host'], 
                              port=config['port'], user=config['user'], 
                              password=config['pwd'])

curs = conn.cursor()
curs.execute(""" 
    DROP TABLE IF EXISTS serving_layer_temp; 

    SELECT 
         *
    INTO 
        serving_layer_temp
    FROM 
        batch_layer ;


    UPDATE 
        serving_layer_temp
    SET
        count_id = count_id + speed_layer."count(id)",
        sum_followers_count = sum_followers_count + speed_layer."sum(followers_count)",
        sum_favorite_count = sum_favorite_count + speed_layer."sum(favorite_count)",
        sum_retweet_count = sum_retweet_count + speed_layer."sum(retweet_count)"
    FROM
        speed_layer
    WHERE 
        serving_layer_temp.location = speed_layer.location ;


    INSERT INTO 
        serving_layer_temp
    SELECT 
        * 
    FROM 
        speed_layer
    WHERE 
        speed_layer.location 
    NOT IN (
        SELECT 
            DISTINCT location 
        FROM 
            serving_layer_temp 
    ) ;
    drop table serving_layer ;
    
    alter table serving_layer_temp
    rename to serving_layer ;        
    
""")
curs.close()
conn.commit()
conn.close()

 

You can find the code from this blog post in this github repository.

Advertisements
This entry was posted in Big Data, Data Engineering. Bookmark the permalink.

4 Responses to Implementing the Serving Layer of Lambda Architecture using Redshift

  1. Pingback: Introduction to Lambda Architecture | Dorian Beganovic

  2. Pingback: Ingesting realtime tweets using Apache Kafka, Tweepy and Python | Dorian Beganovic

  3. Pingback: Implementing the Batch Layer of Lambda Architecture using S3, Redshift and Apache Kafka | Dorian Beganovic

  4. Pingback: Implementing the Speed Layer of Lambda Architecture using Structured Spark Streaming | Dorian Beganovic

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

w

Connecting to %s