Data Ingestion: Load Dynamic Files from S3 to Snowflake

Situation: A csv lands into AWS S3 every month. The vendor adds/removes/modifies columns from the file as they please. So the schema is not known ahead of time. The requirement is to create a table on-the-fly in Snowflake and load the data into said table. Matillion is our ELT tool.

This is what I have done so far.

  1. Setup a Lambda to detect the arrival of the file, convert it to JSON, upload to another S3 dir and adds filename to SQS.
  2. Matillion detects SQS message and loads the file with the JSON Data into Variant column in a SF table.
  3. SF Stored proc takes the variant column and generates a table based on the number of fields in the JSON data. The VARIANT column in SF only works in this way if its JSON data. CSV is sadly not supported.

This works with 10,000 rows. The problem arises when I run this with a full file which is over 1GB, which is over 10M rows. It crashes the lambda job with an out of disk space error at runtime.

These are the alternatives I have thought of so far:

  1. Attach an EFS volume to the lambda and use it to store the JSON file prior to the upload to S3. JSON data files are so much larger than their CSV counterparts, I expect the json file to be around 10-20GB since the file has over 10M rows.
  2. Matillion has an Excel Query component where it can take the headers from an excel file and create a table based on the fields and load the file. I was thinking I can convert the header row from the CSV into a XLX file within the Lambda, pass it to over to Matillion, have it create the table structures and then load the csv file once the structure is created. Its sad that Matillion supports this for XLX but not CSV!!!

 

What are my other options here? Considerations include a nice repeatable design pattern to be used for future large CSVs or similar requirements, costs of the EFS, am I making the best use of the tools that I are avaialable to me? Thanks!!!

Hi, I love your integration pattern - S3, Lambda, SQS, Matillion, Snowflake.

This is what i am doing with Fixed Length file ( while, if you dont consider the delimiter for a while, your whole record is a field and snowflake will be happy to put that under a VARCHAR() )

1) Loading unparsed record into a column in a snowflake table - Landing Table.

2) SPLIT(), SPLIT_PART(), ARRAY_SIZE() functions can bring out essential values like column header name, number of the columns, position of the columns. You can pull that information using matillion 'Query to Scalar' & then you can form the create table statement & INSERT INTO <new table> ( SELECT <col1>, <col2>... from LANDING_TABLE) and load it into final table.

3) I am doing the same, but since it is fixed length file, i have field position + length in file, to table column name mapping in a SF table. I use matillion to retrieve the information from SF and orchestrate the execution. a single shared job handled multiple fixed length file layouts.

 

p.s.: in my humble opinion, csv to json is a huge task, lambda should not be used. i do similar thing EBCDIC--> csv conversion for few files, but i invoke a EC2 instance & python code to convert. Matillion does the whole orchestration.