Dynamically generate column list from incoming source file

Hello All,

 

Here is my scenario: we have a csv input file that comes in 4 times a year. But each time, the vendor will add/remove fields without informing us. We are expected to read that file and merge with an existing dimension table.

 

Has anyone ever dealt with a similar situation?

 

My initial thinking is to use a Matillion component to read in just the first line, transpose the column names to rows and load into a table. Then match to the existing dimension table using a metadata query against the snowflake table.

 

If col A is present in the file but not in Snowflake

Then issue an alter table command to add the column. (how? dunno know yet)

 

If col B is not in the file but is in Snowflake

Then do not do anything.

 

So, in theory we only need to worry about the ADDITION of new columns and how to best handle this.

 

Any thoughts appreciated!! TY!

We do this all the time but with JSON data. The good part is that Snowflake is basically additive in nature. Handling files like CSV, JSON, XML, etc. is best done by utilizing Snowflake's strengths. Meaning, your best option is to ingest the data in raw form into a variant column in Snowflake and then materialize and transform the data via views or transformations processes that move it into other tables. The benefit of bringing the data into Snowflake in it's native form and the transforming it is that you always know what the content looked like before you start manipulating it. Your data lineage is much more complete and you can leverage Snowflake's extreme capabilities around unstructured data.

 

In some cases we don't even fully materialize the data but rather just wrap a view containing formatting and light transformations around the unstructured data in the variant column(s). I hope this helps in some way. Please post back and let us know if you need more help!

This is a must for us as well. We require 'base' columns for our customers files, but optionally allow them to provide extra columns, so we have to support both validating the base columns and additional columns they might have provided. With AWS S3, our biggest challenge is often that the files are simply too large to try and download just to get at the header row.

 

However, if you use a bash CLI command, you can interrogate the S3 file in such a way that you can get just the first row of data.

This is accomplished using an oft overlooked switch on the aws s3 api get-object cli command: range_bytes.

 

range_bytes lets you make an HTTP request for the first nth bytes of the get-object target and this allows you to pull the header from even the largest s3 files without having to fetch the entire thing.

 

In your orchestration, you'll need a couple of job variables setup, buffer_bytes and byte_sample.

 

Buffer bytes is important because, depending on the 'width' of the target file, you may need to use a bigger range of bytes to ensure you get everything up to the line break character in the file. It is okay to overshoot the line break, but you must at least include it to determine the 'header' row from the next row in the file. It may take some experimentation, but we use 0000-2111 as the value of the buffer_bytes variable.

 

byte_sample is used to house the actual data from the file, and we accomplish getting this data in two fairly easy bash steps.

 

first bash step

rm -f ./header_content

aws s3api get-object --bucket $<variable of bucket name> --key $<variable of s3 key as /dir/target_file> --range bytes=$buffer_bytes ./header_content

tail -n +11 ./header_content

 

The bash component can be a bit tricky with its output so we use two steps but there might be a way to shorten this into one bash step.

In this first step we're removing any file named 'header_content' to ensure a clean slate.

We then run the aws s3api get-object command which pipes it's results to the now new file header_content. The last command is really just for task output.

 

second bash step

The second bash step just does a cat ./header_content which really does nothing other than populate the built in 'message' variable of the component step with the output from the cat command.

 

Now, simply export the components Message variable to the byte_sample job variable.

 

Now that you have the data available to your job, you can use it a number of ways. We use a Python step to populate a job variable (header_string) with the actual header data which is then consumed downstream by another orchestration task.

 

import codecs

stringa = byte_sample.split('\n')[0]

stringa = stringa.encode('ascii', 'ignore').decode('ascii')

header = stringa.replace('\"','')

context.updateVariable('header_string',header)

print("Incoming header row:" + header_string)

 

This is not the most efficient Python code but it is pretty easy to understand what it is doing;

  1. Split the data captured into byte_sample on the line break \n (replace with other linefeed symbols as needed), and taking the first element of the resulting array.
  2. Because of how this data arrives in the previous step, we apply an encoding to it that helps it ignore illegal characters or BOM.
  3. We eliminate any double quotes that might be contained in the string
  4. We update our job variable with the result.

 

That's it. From there, you have the header data and can do whatever you wish with it to compare it to the headers you are expecting. Python does not reorder lists (arrays) so your output should be a correct ordinal representation of the file columns.

 

Hope this helps!

 

Edward Hunter

 

 

Thank you for this tip, Bryan!!! I will do some more research and post my findings here!

This is absolutely fantastic! I tried it out straight away. Did you have to add any additional permissions on the bucket or on the matillion instance? I'm getting an error when the command tried to write to ./header_content: [Errno 13 ] Permission denied: u './header_content'

Great explanation @EdwardHunter​​! I tend to lean towards Python because it's completely portable to virtually any cloud, function, API, UI, etc. I specifically try to stay away from Bash scripts because it's only native to Unix and Linux OS's and very limited on it's capabilities compared to Python. If you ever wanted to port that Bash script to a Lambda, Azure Function, etc. you are going to have to rewrite it in a differently language anyway.

If you really wanted to parse and pull the first line of a file from S3 for really large files that wouldn't otherwise fit in a 16mb compressed variant field in Snowflake, you can use the approach @EdwardHunter​ ​ provided but I would suggest doing it in Python for portability sakes. This would be the Python approach:

import codecs

import boto3

import csv

aws_key = 'YOUR_AWS_KEY'

aws_secret = 'YOUR_AWS_SECRET'

s3 = boto3.resource('s3', aws_access_key_id=aws_key, aws_secret_access_key=aws_secret)

line_stream = codecs.getreader("utf-8")

bucket = 'my-bucket-name'

file_name = 'mycsvfile.csv'

s3_obj = s3.Object(bucket, file_name)

first_line = line_stream(s3_obj.get()['Body']).readline()

csv_content = csv.reader(first_line.splitlines())

col_names = []

for row in csv_content:

for col in row:

if col != None and col != '':

col_names.append([col])

context.updateGridVariable('gridVariable', col_names)

The above method allows you to leverage the GridVariable capabilities of Matillion if that's you're preferred way of processing each column. I hope this helps.

BTW: The boto3 AWS module reads 1024 bytes at a time but it really doesn't matter because the "readline" function is creating the abstraction layer over the stream of data and it's stopping at the point where the entire first line is read into memory. Thus the most efficient way to read just the first part of the file. It literally works identically to @EdwardHunter​ bash script as it pertains to pulling the first line.

Great reply @Bryan​. Do you have some example code using Boto3 where you can use readline in the manner you describe? My preference would absolutely be to use Python, but I could swear I ran into issues using boto3 - this might have been several major version iterations back. This would allow me to consolidate the entire thing into a single orchestration step.

Super helpful, and I 100% agree with the pitfalls of using bash. @Adwait_K​ to write locally using a bash step the user you are logged into Matillion with needs write access to the file system, that's what it sounds like to me, but Bryan hit this on the head. If boto3 in a Python task can be used it's definitely the way to go.

Geez @Bryan​ ignore my last comment - I had to actually click on your post to see the sample code. Thank you!

One other quick clarification - the solution I describe here really only applies to Matillion and it's bash component specifically. I can't really comment on any ability to being portable to other applications, environments or solutions. The bash step in Matillion has some interesting nuances though, making my multi-step approach a bit messy. Any Python step that can snag that first file line without having to download an entire file locally first would definitely be a superior approach as long as it doesn't compromise security.

Hi @Adwait_K​,

There are a couple different ways of doing this. I would backup one step to where you are storing the columns as a pipe delimited value in Snowflake. If you need to store the columns in Snowflake for a specific reason then that makes sense. If you are only storing them there so that you can do the comparison, then you don't necessarily need to.

You could just take the columns and dump them into a Grid Variable that has a single column. Then use an iterator on the Grid Variable to execute a check for each column. To add the column names to a grid variable you can use this method:

col_names = []

for col_name in row:

if col_name != None and col_name != '':

col_names.append([col_name])

context.updateGridVariable('yourGridVariable', col_names)

I hope this helps. Please let us know if you need more help. Thanks!

Hi,

@Adwait_K​ , were you able to add the column using 'Alter table' statement? And if yes, how did you pass the data type of the column dynamically?

Would be great to hear how did you achieve it.

Thank you,

Neelam Macwan

@Bryan​ , could you please elaborate a bit more on how one could leverage SF's capabilities here? Lets say I have a table with a variant column where I'm able to load in JSON data. Please see the attached sample JSON file.

Could you please share some insights into the materialization and data views that you speak of? How can these views be made flexible to adjust to the addition or removal of columns? The dynamic nature of the file needs to be handled by the code, as we wouldn't want someone to manually check each file and making coding changes.

I'm just looking at all my options here since we also need to think about what would be easiest to maintain and support once its in Production.

@Bryan​ Now I remember. Unless we can completely secure that AWS secret somehow, at least in our environment, we can't use it. We can't (and don't recommend) those keys being hard coded for a bunch of reasons. Another question I had was, are you saying the file size is limited to 16mb and the file must be downloaded first? That file size aspect is a key factor for us as many files we deal with are upwards of 500mb to 1gb+.

As far as the keys are concerned, I see this in the documentation: "The AWS credentials defined in Matillion ETL are automatically made available, therefore it is not recommended (or necessary) to put security keys in the script."

 

https://documentation.matillion.com/docs/2234735

 

 

Hi @EdwardHunter​ , there are various ways to secure the secret but I would check out AWS Secrets Manager. We use the Secrets Manager. You bring up a good point that you shouldn't ever hard code the secrets in your Python code. I should have prefaced my example with you shouldn't do that. You can implement handling of secrets a couple different ways in Python code. One is use the Secrets Manager and the other by storing it in the Matillion Password Manager and then passing it into the Python script. Both ways are secure and acceptable in most security scenarios.

Great question on the 16mb limit. The 16 mb limit is simply a Snowflake threshold in that if you wanted to ingest the contents of a file into a variant field in Snowflake it would have to fit within 16mb compressed. The power is that once it's in Snowflake even if it is still in CSV format you can then leverage the great power Snowflake has around querying and surfacing unstructured data.

If you are interested in this approach there are several methods of taking large files and breaking them into more manageable pieces for raw ingestion. One of the approaches is that you have a Bucket with 2 folders (i.e. "in" and "out"). Files show up in the "in" folder. They get read and broken it into smaller files that land in the "out" folder. Once those files are in the "out" folder it's as simple as running a Snowflake COPY INTO statement and they are in Snowfake and ready to be used. This whole process can be automated via Matillion or by utilizing out of the box integrations between S3 and Lambda. Once the data is in Snowflake, the sky is the limit as to what you can do with that data. It also means the your data lineage more complete.

I hope this helps!

@Adwait_K​, you are correct. You don't have to supply keys and secrets if you are using the IAM roles and policies for security on the S3 bucket you are going after. You can simply just leave that part of the code out that I supplied and it will work. Again, as long you have given your Matillion role the proper access to the S3 bucket or bucket objects.

There may be situations where others in your company have an S3 bucket of data that you need data from. They may not be willing to add access directly to your Matillion IAM role. This can happen when dealing with App/Dev teams that have a current security model in place and don't want to necessarily deviate from it. I suspect this doesn't happen a lot but it's definitely a possibility.

Happy new year @Bryan​ and @EdwardHunter

This is a pretty commonly seen task with SF objects, checking for additional columns and accommodating them in your ETL table. I'm assuming here that if they do not at least match, then the whole thing is moot because somehow a column in the SF table is no longer present - and that is a whole different issue.

 

Let's say the columns you're expecting to exist are in a list called listExpected, and the columns you are detecting are in a list called listActual. If the columns are in a variable as col | col |, you can create those lists using variable.split('|').

 

listExpected = ["column1","column2","column3"]

listActual = ["column1","column2","column3","column4","column5"]

temp = set(listExpected)  

listMatched = [val for i, val in enumerate(listActual) if val in temp] 

listExtras = [val for i, val in enumerate(listActual) if val not in temp]

print("Extra columns:",listExtras)

 

You can then save the output of these lists as needed to add any additional columns to your table.

 

Hope this helps!

 

Edward Hunter

 

 

@Bryan​ , I may have found a useful explanation about how to interpret JSON data through views.

https://www.snowflake.com/blog/automating-snowflakes-semi-structured-json-data-handling/