Break a Loop iterator?

I currently try to break a Loop iterator.

e.g. I am looping with 100 Iterations and want to stop if the inner job meets a condition.

 

Current Workarounds

I could use the retry component and the inner jobs ends with "Failed" until the condition is met. I don't like this workaround because this produces a lot failed jobs.

 

Currently preferred Workaround

Use a environment variable that indicates the break of this loop.

The inner job checks for this variable and as soon the break condition is met the inner job will skip everything and Matillion will loop pretty fast to the end without doing anything.

Not perfect but at least working.

 

Any other Ideas?

I tried to use the environment variable as loop variable and change this variable inside the inner job to the end value. But that did not work. (Tested copied and shared)

 

 

Hi @Nils​, you bring up a very interesting subject and we have ran into a similar issue and had to rethink how to handle it. Your current preferred workaround is what we have used in the past. I have thought about a way to resolve this in the past and I think it's now time for me to submit an idea to the ideas portal. The ultimate way to solve this which would allow almost any method to work is to have Matillion create an API method that we can call to execute an orchestration. This would allow us to loop via Python and run orchestrations. After each orchestration run we could check a variable that would hold the state of execution. That would allow us to determine if we were going to continue looping or break the loop continue. The other way that Matillion could allow this is to extend the existing iterators and add a property to watch for a break condition to be met. I like this method but it may not be as flexible and meet everyone's situation.

With that said I could imagine another way of doing this by using SQS and Python but I can't imagine it would be any faster than what you have. You could technically use a Python script for the outer iteration which would post a message to a FIFO SQS that triggers a Matillion orchestration. The result of the orchestration would update a variable or write a value to a table. Python would wait for completion of the orchestration and then on the next loop within the Python script it would check the value in the variable or table and determine if it was going to send another SQS message to kick the process off again. It would take quite a bit to get this complicated setup working and I can't imagine it performing better or faster than the preferred workaround that you mentioned. I hope this helps in some way. Have a great weekend!

Hey @Bryan​ thanks for your detailed reply.

It is always good to know that I am not the only one with this problem and I was not missing a easy solution.

Thanks for posting the idea you already got my upvote.

I think regarding the solution of Matillion to fix this it is always a trade off 'flexibility' vs 'easy of use'.

Adding the ability to build this using python is nice to have for us but I think for other customers a more integrated solution is the better approach.

In my opinion the easiest and cleanest way to fix this is:

Why not design this like every programming language does.

E.g. in Python I can call "break" to exit the loop.

So why not add another flow component "End Break"

You can build any condition inside the job and when the component "End Break" is called the job will break. (Job end should be success)

The Loop Iterator has the option to "Break on Failure" so hopefully there is a listener to the job result.

Maybe nice to have is that the new "End Break" flow component can pass a variable that can be extracted and used by the parent job.

Thanks for the other workaround ideas. Also good to know what might be possible.

But I think sticking with the current solution is the most simpel way to deal with this.

Hi both, full disclosure in advance, this is one of my videos :-) but with a microbatching approach governed by SQS you can come close to a breakable loop...

Hi @Nils​,

I would definitely agree that a Python solution would not be as feasible for many people. That solution helps anyone that is trying to do something fairly custom via Python. A solution that is baked into a component like the iterators would be more widely used. I personally don't see how either of the options would be hard to implement since the iterator components are fundamentally using looping logic in a programming language (Java "I think"). So, it could be a quick win.

Not to hijack your thread but here's a question for you. Do you find yourself wanting to attach more than 1 component to an iterator? If I am trying to do something small or quick and dirty, there are times that I want to just attach a Python Script and then maybe a SQL script to run on each iteration instead of creating another orchestration and using it with the iterator. Usually I am doing things like dynamically forming a SQL statement using Python and then executing it with the SQL Script. To me, it would be handy to be able to attach at least 2 or 3 components to be ran in sequential order on an iterator. Anything more than that you are probably going to want a separate orchestration for anyway. What are your thoughts? I think I am going to pose the idea in the portal since it's something I have been dwelling on for a while now.

Hey Ian, always nice to get some feedback from you.

SQS in combination with Matillion is something we are not using yet.

I think humans tend usually to stick to solution they know and are used to.

That is the reason I tried to solve this only using Matillion components.

Evaluate SQS in combination with Matillion is something on our list for the future.

We have done quite bit with SQS and microservices in the context of application development. We did some initial testing with Matillion about a year ago but decided to pass on it's usage. We purposely chose to stayed away from SQS on the data side as it adds another level of complexity to an already complex environment for data processing. Even though we have automated a ton of our infrastructure for deploying environments it just one more service and dependency that needs to be automated, managed, and implemented. It also adds another cost which you have to include in the cost of doing data work. Meaning, you are no longer just paying for Matillion and the EC2 instance. You are also paying for all the other services it uses like SQS. The little charges add up to be real money at the end of the year. It's the main reason we chose Snowflake. They do a great job in rolling up all the services into a package where you just have to concentrate on warehouse usage and storage from a cost perspective.

Hey @Bryan​,

in most cases we try to isolate logic into jobs.

In my opinion this has some advantages regarding reusability, standardization and monitoring.

But I can also understand your argument that this might be overhead for some simple jobs.

More features and abilities are always nice to have. This feature might be tricky to implement for Matillion with not so much benefit.

From my side no urgent need but nice to have.

Maybe another approach combines both aspects at once.

Your main issue with the current solution is the annoying workflow to create another orchestration job right?

Maybe a solution to solve this is to add a feature to create an orchestration job based and marked components.

So in your case you keep creating your components in one window and can mark e.g. the python and sql script and with a right click you can use a feature like "create orchestration job for selection" When this feature creates an orchestration job with these components inside and maybe even the same job variables the outer job has, it gets pretty simple to restructure larger orchestration jobs.

But maybe completely useless feature ;)

@Nils​, this feature is already available in Matillion:

@ian.funnell

I just want to give an update:

Let me share first what I actually wanted to do and why.

We are using Sagemaker as machine learning framework.

I was challenged with the task to prepare the data for 26 models that share more or less the same data but with different target variables.

So we are preparing 26 different data sets using Matillion and Snowflake.

This data is uploaded to S3 and than we need to trigger 26 Sagemaker Batch Transform Jobs that score this data using previously trained models.

We currently orchestrating everything using Matillion. We looked at Airflow but at the moment we can still handle everything using Matillion. Maybe we will switch with more complex scheduling tasks.

That was the reason I needed Matillion to monitor my Batch Transform Jobs. I skipped the approach to run one python script for every job that is polling this job. Because with this solution I would lock a lot of resources just for polling.

So my other approach was to run a loop and poll every few seconds all the still running jobs at once and logging them to our database. As soon as all the jobs are finished I want to pull all the results back from s3 into our database. I was not happy with this solution because I was using Matillion for tasks it was not built for.

So my current more sophisticated approach is the following:

We are using Matillion with python/boto3 to trigger 26 AWS Step Functions.

Before starting this functions we log into our database what job was started for which task.

These Step Functions have a sagemaker batch transform sync object.

So AWS is creating a cloudwatch event listener for these triggered jobs and will do the polling for us.

At this point Matillion is already done by sending 26 requests to the AWS api and writing 26 entries into our logging table.

When a Step Function hits an error or the batch transform job is finished it will trigger a lambda function that will send a message to an SQS Queue Matillion is listening to.

This will trigger an monitoring job that will write into the same log table the information which batch transform job finished.

After every insert into this logging table this monitoring job will check if the last status for all this 26 batch transform jobs is completed.

When every job is completed we will start the final job to pull all the S3 data and integrate all the data into one table.

It is not on production yet but I like this solution so far.

It feels kind of strange to have this asynchronous communication, but I think for this purpose it is the best approach.

Btw adding a dead letter queue to the lister would also be nice.

I already posted the idea: https://metlcommunity.matillion.com/s/idea/0874G000000kBIPQA2/detail

I love the project @Nils​! Pretty awesome I must say. I don't see that you mentioned which database system you are using. If you are using Snowflake you could further automate this and cut down on the code by setting up Snowpipes that auto-ingest your S3 data as it's delivered.

Referring to this part:

"When a Step Function hits an error or the batch transform job is finished it will trigger a lambda function that will send a message to an SQS Queue Matillion is listening to.

This will trigger an monitoring job that will write into the same log table the information which batch transform job finished.

After every insert into this logging table this monitoring job will check if the last status for all this 26 batch transform jobs is completed."

If you have Snowflake you could change this so that it's writing out the pertinent logging/monitoring information to an S3 bucket where it would automatically be picked up and ingested into a table. This might get around the need for SQS to automate Matillion to do this work.

"When every job is completed we will start the final job to pull all the S3 data and integrate all the data into one table."

In the above statement the S3 to table ingestion could also be done using Snowpipe in Snowflake.

Just some thoughts. I had as I read through your solution. Great work!

Hi @[Nils]​ / @[Bryan]​

I came across this recently and wanted to let you know that the related idea Bryan raised here will soon be evaluated by one of our technical teams to understand feasibility, with the hope that it will be added to our roadmap for delivery in the coming months.

Any questions, please let us know :)

Thanks!

The Matillion Product Team

Hey Bryan,

 

thanks for your good feedback!

 

I will take a look at Snowpipe for this use case.

 

What might be an issue with your approach is to track when all the jobs are finished.

So I have 26 Batch Transform Jobs running in parallel. I need to run a job when all the 26 Jobs are finished. I think your solution is perfect when these 26 Jobs are independent.

But without getting a trigger for a Matillion Jobs that checks if all the jobs are finished I don't see how I can synchronize my workload again. But maybe I am missing something.

 

Btw another advantage is that I currently have a catch block inside Step Functions and will reroute any error to the logging table. So Matillion is checking the message send by my Lambda function and will send an email/webhook to inform the team when something bad happend inside AWS.

The main reason for this is that we are not using AWS services so much so it is nice to have every monitoring and error messaging at one place.

 

But still I like the idea to be more event driven and less "batchy".

So maybe you can further describe how you would handle this part:

"In the above statement the S3 to table ingestion could also be done using Snowpipe in Snowflake."

 

Hey @Nils​,

I get the logging idea and wanting all the logging in one place. We like doing the same as well. We try to stay away from CloudWatch as much as possible for the Data Warehouse type work. It's just too complicated and in my opinion not a very well put together logging system.

Describing the S3 to table ingestion comment I made. The method I had in my head as I was thinking through it was to have a pipe setup that sits in a paused state. You could then have a Snowflake Task that runs on a schedule where it checks every 5 mins during a 2 or 3 hour window for all 23 transform jobs to be finished. Once the Task find them to be done it can alter the pipe and set the status to running. Architecturally it seems feasible. I have not done this scenario myself but I know the functionality of Pipes and Tasks and it seems doable. If it's something you wanted to test out the feasibility, you could throw together a separate database that has the Pipe, Stage(s), and Task(s) and do the ingestion based on the Pipe which is fired via a Task. That way if you decide that it won't work or you don't want to use it, then you just delete the database and everything is cleaned up. I hope this helps.