AWSAWSDataWrangler

Reading Parquet files with AWS Lambda

I had a use case to read data (few columns) from parquet file stored in S3, and write to DynamoDB table, every time a file was uploaded. Thinking to use AWS Lambda, I was looking at options of how to read parquet files within lambda until I stumbled upon AWS Data Wrangler.
From the document –

What is AWS Data Wrangler?
An open-source Python package that extends the power of Pandas library to AWS connecting DataFrames and AWS data related services (Amazon Redshift, AWS Glue, Amazon Athena, Amazon EMR, etc).
Built on top of other open-source projects like Pandas, Apache Arrow, Boto3, s3fs, SQLAlchemy, Psycopg2 and PyMySQL, it offers abstracted functions to execute usual ETL tasks like load/unload data from Data Lakes, Data Warehouses and Databases.

Quick demo of use-case
Screen Shot 2020-04-13 at 11.58.58 PM
Source : Data stored in Parquet file on S3

+------+----------+-------------------+-----+----------+-------------+
|change|  comments|       last_updated|price|    sector|ticker_symbol|
+------+----------+-------------------+-----+----------+-------------+
|     1|VVS Ticker|2020-04-01 18:15:05|   14|    ENERGY|          VVS|
|     0|VVY Ticker|2020-04-02 18:15:05|   12|HEALTHCARE|          VVY|
|     9|TGT Ticker|2020-04-03 18:15:05|   58|    RETAIL|          TGT|
|    11|VVY Ticker|2020-04-04 18:15:05|   48|HEALTHCARE|          VVY|
|     3|WSB Ticker|2020-04-05 18:15:05|  147| FINANCIAL|          WSB|
|     7|KFU Ticker|2020-04-06 18:15:05|   33|    ENERGY|          KFU|
|     6|VVS Ticker|2020-04-07 18:15:05|   93|    ENERGY|          VVS|
|    91|SLW Ticker|2020-04-08 18:15:05|   35|    ENERGY|          SLW|
|    78|ALY Ticker|2020-04-09 18:15:05|    7|    ENERGY|          ALY|
+------+----------+-------------------+-----+----------+-------------+

Destination : Record stored in Dynamodb table.  It consist of “ticker_symbol”, “sector” and “last_updated (in epoch time)” data from parquet file , with extra attribute “location” specifying the S3 file containing the particular record and “source” with default value of system.
Screen Shot 2020-04-14 at 2.27.47 AM

  1. Created a Lambda function and attached IAM role with proper permissions to read from s3, put items into dynamodb and AWSLambdaBasicExecutionRole policy.

2. Created Lambda layer for AWS Data Wrangler

    • Go to GitHub’s release section and download the layer zip related to the desired version. In my case, I downloaded awswrangler-layer-1.0.1-py3.8.zip .
    • In AWS Lambda Panel, open the layer section (left side) and click create layer.
    • Set name and python version, upload your fresh downloaded zip file and press create to create the layer.
    • Go to your Lambda and select your new layer!

3. Created the function code, with few highlights

    • Read the parquet file (specified columns) into pandas dataframe.
    • Convert pandas dataframe column with Timestamp datatype to epoch time in number for record to be stored in dynamodb.
    • Convert the final pandas record dataframe to return list of dictionary.
    • Write the records to dynamodb.
import os
import json
import boto3
import logging
import urllib.parse
import awswrangler as wr
import pandas as pd
from datetime import datetime
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
def s3_url(bucket, prefix, folder=False):
    """ Produce s3 url from 'bucket' and 'prefix'
    """
    assert bucket and prefix
    return "s3://{bucket}/{prefix}{folder}".format(
        bucket=bucket,
        prefix=prefix,
        folder="/" if folder else "")
def read_parquet(s3_path, columns, source, dataset=True):
    """ Read Apache Parquet file(s) from a received S3
        prefix or list of S3 objects paths.
        Convert the last_updated column from Timestamp to Epoch time.
        Return records as list of dictionary.
    """
    assert source, "source can't be None"
    df = wr.s3.read_parquet(path=s3_path, columns=columns, dataset=dataset)
    df[["last_updated"]] = (df[["last_updated"]] - pd.Timestamp("1970-01-01")) // pd.Timedelta('1s')
    df["source"] = source
    df["location"] = s3_path
    return df.to_dict(orient='records')
def write_to_ddb(client, table_name, records):
    """ Write data to DynamoDB table.
        Returns inserted row count.
    """
    table = client.Table(table_name)
    row_count = 0
    for row in records:
        table.put_item(Item=row)
        row_count += 1
    return row_count
def lambda_handler(event, context):
    record = event["Records"][0]
    assert "eventSource" in record
    assert "eventName" in record
    assert record["eventSource"] == "aws:s3", "Unknown event source: %s." % record["eventSource"]
    assert record["eventName"].startswith(
        "ObjectCreated:"), "Unsupported event name: %s." % record["eventName"]
    assert "s3" in record
    assert "bucket" in record["s3"]
    assert "name" in record["s3"]["bucket"]
    assert "object" in record["s3"]
    assert "key" in record["s3"]["object"]
    bucket = record["s3"]["bucket"]["name"]
    # If S3 URL contains special character usch as '=', it will be quoted, like: %3D
    # This is to unquote them back to original character, or it will complain path not exist.
    key = urllib.parse.unquote(record["s3"]["object"]["key"])
    #create s3 path
    s3_path = s3_url(bucket=bucket, prefix=key)
    # Retrieving the data directly from Amazon S3
    cols = ["ticker_symbol", "sector", "last_updated"]
    df = read_parquet(s3_path=s3_path,
                      columns=cols,
                      source="system",
                      dataset=True)
    #Instantiate Dynamodb connection
    ddb = boto3.resource("dynamodb")
    #Get the dynamodb table name
    ddb_table = os.environ["DDB_TABLE"]
    #Write data to dynamodb
    record_count = write_to_ddb(
        client=ddb, table_name=ddb_table, records=df)
    #return total record inserted
    return("Total records inserted:", record_count)

4. Added S3 Event type: ObjectCreated as trigger. So every time a new file is uploaded to S3, the trigger gets fired invoking the lambda function to read the parquet file and write the data to dynamodb table.
Hope this helps!
Reference:-

Leave a Reply