AWSAWS Glue

AWS Glue – Querying Nested JSON with Relationalize Transform

AWS Glue has transform Relationalize that can convert nested JSON into columns that you can then write to S3 or import into relational databases. As an example –
Initial Schema:

>>> df.printSchema()
root
|-- Id: string (nullable = true)
|-- LastUpdated: long (nullable = true)
|-- LastUpdatedBy: string (nullable = true)
|-- Properties: struct (nullable = true)
|    |-- choices: string (nullable = true)
|    |-- database: string (nullable = true)
|    |-- object: string (nullable = true)
|    |-- store_time: string (nullable = true)

After Relationalize transformation:

>>> df.printSchema()
root
|-- Id: string (nullable = true)
|-- LastUpdated: long (nullable = true)
|-- LastUpdatedBy: string (nullable = true)
|-- Properties.choices: string (nullable = true)
|-- Properties.database: string(nullable=true)
|-- Properties.object: string (nullable = true)
|-- Properties.stored_time: string (nullable = true)

What if “Properties” was “string” type? Would relationalize work in that case? The answer is NO.
In this blog I will walk you through on how you can use relationalize if the column is “string” type.
I have a AWS Glue job to pull the data from DynamoDB in another account. Lets look at one of the records from table:-

{'LastUpdatedBy': 'System',
'FixedProperties': '{"choices":"one",
                    "timetstamp":"1577194786280",
                    "Id":"bce07b2a8ef5",
                    "score":null}',
'LastUpdated': Decimal('1578964270325'),
'StatementId': 'c24ad711696e'
}

The “FixedProperties” key is a string containing json records. Now lets look at steps to convert it to struct type.
1. Create AWS Glue DynamicFrame.

dynamic_dframe = glueContext.create_dynamic_frame.from_rdd(spark.sparkContext.parallelize(table_items),'table_items')

2. Describe the Glue DynamicFrame Schema.

dynamic_dframe.printSchema()
root
|-- FixedProperties: string
|-- LastUpdated: decimal
|-- LastUpdatedBy: string
|-- StatementId: string

As you would have noticed, Glue DynamicFrame recognizes key “FixedProperties” as string because value is in single quotes. So how do we convert it to struct type?
One of the way is to use pyspark functionality – to_json. To use this you will first need to convert the Glue DynamicFrame to Apache Spark dataframe using .toDF()
If the schema is the same for all records you can convert to a struct type by defining the schema like this:

schema = StructType([StructField("choices", StringType(), True),
                     StructField("Id", StringType(), True),
                     StructField("score", StringType(), True),
                     StructField("timetstamp", StringType(), True)],
                    )
df.withColumn("FixedProperties", from_json(
    col("FixedProperties"), schema)).show(truncate=False)
#+-----------------+-------------+-------------+------------------------------------+
#|StatementId      |LastUpdated  |LastUpdatedBy|FixedProperties                     |
#+-----------------+-------------+-------------+------------------------------------+
#|c24ad711696e     |1578964270325|System       |[one, bce07b2a8ef5, , 1577194786280]|
#+-----------------+-------------+-------------+------------------------------------+

The other way which I would say is the simpler way, is using AWS Glue “Unbox” transformer. It unboxes string into DynamicFrame.

unbox = Unbox.apply(frame = dynamic_dframe, path = "FixedProperties", format="json")
unbox.printSchema()
root
|-- FixedProperties: struct
|    |-- choices: string
|    |-- Id: string
|    |-- timetstamp: string
|    |-- score: null
|-- LastUpdated: decimal
|-- LastUpdatedBy: string
|-- StatementId: string

Isn’t that simple πŸ™‚
3. The “LastUpdated” contains epoch time so lets convert to timestamp.

spark_df = unbox.toDF()
spark_df_transformed = (
    spark_df
    .withColumn("LastUpdated", f.from_unixtime(f.col("LastUpdated")/1000).cast(t.TimestampType()))
    .withColumn("Stored_time", f.from_unixtime(f.col("FixedProperties.timestamp")/1000).cast(t.TimestampType()))
)

4. Review the Spark dataframe schema.

spark_df_transformed.printSchema()
root
 |-- FixedProperties: struct (nullable = true)
 |    |-- choices: string (nullable = true)
 |    |-- Id: string (nullable = true)
 |    |-- timestamp: string (nullable = true)
 |    |-- score: null (nullable = true)
 |-- LastUpdated: timestamp (nullable = true)
 |-- LastUpdatedBy: string (nullable = true)
 |-- StatementId: string (nullable = true)
 |-- Stored_timestamp: timestamp (nullable = true)

5. Covert the Apache Spark dataframe to Glue DynamicFrame and drop the FixedProperties.timestamp. (there could be better way of doing it.)

dy_df = DynamicFrame.fromDF(spark_df_transformed, glueContext, "dy_df")
dropped_fpt = DropFields.apply(frame = dy_df, paths = ["FixedProperties.timestamp"],transformation_ctx="dropped_fpt")
dropped_fpt.printSchema()
root
 |-- FixedProperties: struct (nullable = true)
 |    |-- choices: string (nullable = true)
 |    |-- Id: string (nullable = true)
 |    |-- score: null (nullable = true)
 |-- LastUpdated: timestamp (nullable = true)
 |-- LastUpdatedBy: string (nullable = true)
 |-- StatementId: string (nullable = true)
 |-- Stored_timestamp: timestamp (nullable = true)

Drum roll, please……
6. Finally, time to use relationalize to un-nest the struct type.

tempDir = "s3://content-demo/temp/"
dfc = dropped_fpt.relationalize("root", tempDir)
dyf_out = dfc.select("root")
dyf_out.printSchema()
root
|-- FixedProperties.choices: string
|-- FixedProperties.Id: string
|-- FixedProperties.score: null
|-- LastUpdated: timestamp
|-- LastUpdatedBy: string
|-- StatementId: string
|-- Stored_timestamp: timestamp
dyf_out.toDF().show()

7. Apply mapping using the ApplyMapping Class.

# Rename, cast, and nest with apply_mapping
final_data = ApplyMapping.apply(frame=dyf_out,
		mappings = [
                    ("`FixedProperties.choices`", "string", "choices", "string"),
                    ("`FixedProperties.Id`", "string", "id", "string"),
                    ("`FixedProperties.score`", "string", "score", "string"),
                    ("Stored_time", "timestamp", "stored_time", "timestamp"),
                    ("lastupdated", "timestamp", "last_updated", "timestamp"),
		    ("lastupdatedby", "string", "last_updated_by", "string"),
		    ("statementid", "string", "statement_id", "string")],
		transformation_ctx = "final_data")

8. Write the DynamicFrame to S3.

S3_PARQUET = "s3://abc-content-dev/a1/db/dt={}".format(datetime.utcnow().strftime("%Y-%m-%d-%H-%M"))
# Write it out in Parquet
glueContext.write_dynamic_frame.from_options(
    frame=final_data,
    connection_type = "s3",
    connection_options = {"path": S3_PARQUET},
    format = "parquet")

Once the parquet files are written to S3, you can use a AWS Glue crawler to populate the Glue Data Catalog with table and query the data from Athena.

4 thoughts on “AWS Glue – Querying Nested JSON with Relationalize Transform

  1. Does the JSON object need to be in a certain format in order for the Unbox transform to work? Are there other requirements for the Unbox transform to work? AWS documentation on this functionality is very light.
    Thanks

Leave a Reply