Streaming services are the new norm for data ingestion and therefore we are using Apache Kafka to ingest data in a columnar parquet format in S3. The data is stored in S3, partitioned by year and month. On top we have Schema Crawler running on it and are generating a schema automatically, which is accessible from AWS Athena.

Our use case for the DWH is, to push our data into AWS Redshift, which is a columnar MPP database and used for our Data Warehousing purpose. In order to generate a schema in AWS Redshift, it needs to know the meta data of the parquet files. The good news is, that we can retrieve the meta data of those files via AWS Athena and Redshift Spectrum. Therefore, we can create an external schema with example schema given below and we will receive all tables from Athena lists and make it accessible with external schema in Redshift.

create external schema schema_name_of_your_choice
from data catalog
database ‘your_external_schema’
iam_role ‘arn:aws:iam::your_arn_number:role/your_redshift_role’;

Alright – So now first challenge solved!!!

Now the next question is how to create tables so we can use the Redshift COPY command to load data. Good news is that Redshift has its own internal metadata tables where information about table definition is stored. Therefore, we can use the SVV_COLUMNS table to get the positions of columns and may be filter some of columns which are not part of the table, for example a partitioned column. Some data types in Redshift are different to Athena – for example struct and array is stored as super data type in Redshift. The query given below provides us with data regarding a table and weird concatenation case when statement which result in many rows for one table – but do not worry 🙂 it will make sense shortly.

SELECT
            TABLE_NAME,
            CASE
                WHEN ordinal_position = 1
                THEN ‘create  table ’ + TABLE_NAME + ' ( '
                ELSE ‘,’
            END AS comma ,
            COLUMN_NAME,
            CASE
                WHEN data_type = ‘string’
                THEN ‘varchar’ + ‘(’ + character_maximum_length + ‘)’
                WHEN LEFT(data_type,6) = ‘struct’
                OR  LEFT(data_type,5) = ‘array’
                THEN ‘super’
                ELSE data_type
            END AS data_type ,
            ordinal_position
        FROM
            SVV_COLUMNS
        WHERE
            table_schema IN (‘your_external_schema’)
        AND COLUMN_NAME NOT IN (‘your_partition_name’)
        ORDER BY
            TABLE_NAME,
            ordinal_position

The query given above provides us with an unusual data structure and many rows per tables which does not make sense at all. Luckily, we have one great function in Redshift to help us, called “listagg”. This function creates multiple rows, adds them as one and we can group by table name. Pffffff, finally now we have one row for one table and its table definition generated dynamically. Now we have schemas generated automatically (YAY!! :-)).

But how do we generate copy command dynamically as well, so we can load data from S3 to Redshift? Well, it turns out that we can quite easily add one more column with few modifications.

DROP TABLE
    IF EXISTS table_definition_temp;
CREATE TEMP TABLE table_definition_temp AS
WITH
    temp_table AS
    (   SELECT
            TABLE_NAME,
            CASE
                WHEN ordinal_position = 1
                THEN ‘create  table ’ + TABLE_NAME + ' ( '
                ELSE ‘,’
            END AS comma ,
            COLUMN_NAME,
            CASE
                WHEN data_type = ‘string’
                THEN ‘varchar’ + ‘(’ + character_maximum_length + ‘)’
                WHEN LEFT(data_type,6) = ‘struct’
                OR  LEFT(data_type,5) = ‘array’
                THEN ‘super’
                ELSE data_type
            END AS data_type ,
            ordinal_position
        FROM
            SVV_COLUMNS
        WHERE
            table_schema IN (‘your_external_schema’)

        AND COLUMN_NAME NOT IN (‘_date’)
        ORDER BY
            TABLE_NAME,
            ordinal_position
    )
SELECT
    TABLE_NAME,
    (comma :: VARCHAR || ' ' || COLUMN_NAME || ' ' || data_type :: VARCHAR) :: VARCHAR AS
                                                        table_definition,
    ltrim(TO_CHAR(GETDATE()::date -1 ,‘YYYYMM’),‘0’) AS partition_year_month
FROM
    temp_table;
SELECT
    TABLE_NAME,
    listagg(table_definition) + ' ) ;' table_definition,
    '
truncate your_schema_name.' + TABLE_NAME + ' ;
copy your_schema_name' + TABLE_NAME +
    '
from ‘’s3://path/to/data/’ + TABLE_NAME +
    '
iam_role ‘’replace_with_real_iam_role’'
FORMAT AS PARQUET SERIALIZETOJSON ;
'
FROM
    table_definition_temp
GROUP BY
    TABLE_NAME;

Finally, we have every ingredient for our recipe! But how do we execute it? Well, there are many ways to do it. One of the ways which we are using to execute the query given above in Apache Airflow is to create tasks of each row and execute it as an independent task.

Schema changes are always a topic for a DWH team. Those changes must be communicated in time by other teams and must be adapted by the DWH team in timely manner as well to avoid data load failures in the nightly ETL process. We are solving those issues, so DWH can focus on elaborated tasks and support the company in taking data driven decisions.

This process is start of bigger project, which will help Data Engineers to do faster and better data related work and helps Data Analyst to provide quicker insights to Business.

 


Amol Jog

Aug 22, 2022 . 3 min read