Stream enrichment with Flink SQL
In today's world, real-time data processing is essential for businesses that want to remain competitive and responsive. The ability to obtain results…
Read moreAs we probably know, the biggest strength of Apache Nifi is the large amount of ready-to-use components. There are, of course, situations where we need something custom that we can’t do with the use of available processors or it will make our flow unreadable, messy, ugly, the kind that you wouldn’t like to introduce to your parents. If you have any experience with Nifi then you have probably found some ways to handle this particular problem and (some of them are presented in the article here). The solution depends largely on the complexity of the processing. Available external components are also an important factor, but today we will focus on the internal features of Nifi. If it’s something simple, you could do with one or two bash commands then the ExecuteStreamCommand processor will do the rest of the job for you. It's very simple, it usually only requires one line of code and you’re good to go. If that’s not enough, it’s time to get out the big guns. ExecuteGroovyScript or ExecuteScript both provide a way to insert code directly into the processor and make it do whatever we want it to. It’s almost the perfect solution… Almost.
Even though script executors are great tools, they do have some downsides. We can more or less separate them into two kinds, based on their nature, explicitness and phase of development in which they appear.
These are some that you can encounter while designing your flow or in the implementation, to name a few:
As we can see, these are strictly related to the development phase. If we want to have more relationships, we need to set some attribute with status and later route the flowfile with RouteOnAttribute processor and as a rule we would like to keep the size of our flow to a minimum. If we want to update dependency jars, we also need to invalidate the processors that use them, otherwise they will use a cached (old) version of the jar. Dynamic properties make it impossible for us to use sensitive properties. Finally, the manner of working is problematic if we want to achieve functionality of e.g. a controller service.
It’s possible to make workarounds for those but they’re more like hacks rather than solutions.
Here instead of clear and explicit lack in functionality, we can see secondary consequences of the design. While they may not seem that important to the developer that creates the first PoC version of the flow, they definitely can make someone trying to make changes in the flow a year later question the qualifications and sanity of the creator. Among others:
The first one is quite obvious, we want to test our changes and do regression tests. Frameworks for automatic tests are a blessing that we often don't appreciate enough in normal programming. Two of the following require more explanation.
As we know the Nifi is full of implicit values that we pass on throughout the flow in the form of attributes. Using attributes is convenient, but we need to remember that if we create a custom component, we need to make it visible somehow, that we are in fact using them.
Let’s imagine a situation where you are a maintainer of the flow and you want to make some change which will modify the value of one attribute. You need to check if it's not used anywhere in the flow, fortunately each processor has documentation that contains the information which attributes it uses… except for those scripts. You need to find all the scripts that use the attribute inside the script body. Even more-so, they might call some method that has flowfile reference as an argument. Then you need to find the dependency code and check there.
It can happen with any custom component in Nifi but in the case of scripts, there is no incentive to make values explicit right away. What’s more, taking value from attributes is in fact the most convenient way to get it.
Imagine a different scenario; you want to use a script someone else created in your flow. The creator was reasonable enough to take all the values from dynamic properties, so you can see which ones are used right away, but what for? The script is big and properties have generic names. Guess it’s time to study the code and figure out the author's intentions. That’s usually not a pleasant experience.
Apache Nifi provides API for all types of components, so the user can easily create custom processors, controller services and so on. To put it bluntly, they solve all the problems mentioned above. The question is then, why don’t we use them by default?
We can divide this question into two separate questions that are easier to answer. First of all - why use scripts in the first place and secondly, why not migrate to custom components?
The reason is simple, they’re faster to implement. To write a custom processor you need to create a project, compile it and make sure all the libraries are added correctly. It’s quite a lot of hassle. On the other hand if it’s something simple, the script will be more than enough. The problem is, we start with scripts because the logic they implement is simple, but later it gets more and more complicated and we have already made the decision to use scripts. The only way then, is migration.
Migrations to better solutions and non-functional improvements are something we would all like to have done in our project. I could say that we all would like to do it but let's face it, we don’t necessarily want to… It will not improve functionality of the solution and will take a lot of time, the only people who will see this are the developers, so a business will not notice. What’s more, they want new functionalities so we have a thousand items in the backlog. If that wasn’t enough, a new solution means changes in deployment so it’s not only a job for developers but for devopses as well… and we don’t even know whether it will be worth it or not.
Some of the factors mentioned above will not change, but if we could create a quick PoC that works, we would have more arguments to push for such migration.
Nifi provides several components that can be a half-way solution that is able to solve a few of the migration issues.
These components work in a straightforward way. You provide custom implementation in the component body and then it transforms accordingly.
Let's go with the example of the processor that has two properties and two relationships.
class ExampleProc implements Processor {
public static final PropertyDescriptor REQUIRED_PROPERTY = new PropertyDescriptor.Builder()
.name("required property")
.displayName("Required Property")
.description("Description of the required property, can be as detailed as we want it to be")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build()
public static final PropertyDescriptor OPTIONAL_PROPERTY = new PropertyDescriptor.Builder()
.name("optional property")
.displayName("Optional Property")
.description("Description of the optional property, can be as detailed as we want it to be")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build()
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Description of the success relationship, can be as detailed as we want it to be")
.build()
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Description of the failure relationship, can be as detailed as we want it to be")
.build()
@Override
void initialize(ProcessorInitializationContext processorInitializationContext) {}
@Override
Set<Relationship> getRelationships() {
return new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE));
}
@Override
void onTrigger(ProcessContext processContext, ProcessSessionFactory processSessionFactory) throws ProcessException {
}
@Override
Collection<ValidationResult> validate(ValidationContext validationContext) {
return validationContext.getProperties().entrySet().stream()
.map{e -> e.getKey().validate(e.getValue(), validationContext)}
.collect(Collectors.toSet())
}
@Override
PropertyDescriptor getPropertyDescriptor(String s) {
return getPropertyDescriptors().find {p -> p.getName().equalsIgnoreCase(s)}
}
@Override
void onPropertyModified(PropertyDescriptor propertyDescriptor, String s, String s1) {}
@Override
List<PropertyDescriptor> getPropertyDescriptors() {
return Arrays.asList(REQUIRED_PROPERTY, OPTIONAL_PROPERTY)
}
@Override
String getIdentifier() {
return "Example Processor"›
}
}
This is the code of a processor that does literally nothing, but if we paste it into a Script Body property of InvokeScriptedProcessor we will see this:
So what are the changes? All the properties defined in the code are visible in the processor, they are not dynamic and have documentation in the Nifi UI. We can also see relationships defined in the code, also with documentation.
If we assume that scripted processors are placed somewhere between scripts and custom components, then testing frameworks are between scripted and custom components. It’s because you can’t do them in the processor's body, you have to set up a project with unit tests. Example:
class ExampleProcSpec extends Specification {
def setup() {
runner = TestRunners.newTestRunner(new ExampleProcessor())
runner.setProperty(ExampleProcessor.REQUIRED_PROPERTY, “some-value”)
}
def "test"(){
given:
runner.enqueue(input.getBytes("UTF-8"))
when:
runner.run(1)
then:
runner.getFlowFilesForRelationship(ExampleProc.REL_SUCCESS).size() == 1
}
}
Ok, looks good but now what? So as stated previously this is a step towards custom components, for it to be good it has to have two features:
There are few benefits on the migration side. Let’s go through them
Let’s also have a look at what improvements can be made compared to using scripts.
The moment you think that maybe InvokeScripted* could replace the custom components, well… here is your bucket of cold water.
In conclusion, if you feel like your scripts could use an upgrade, there you have few arguments to back it up. Remember to take into consideration that scripts are in general faster to create and have their place in the Nifi ecosystem. In the end, every project has its own specification and the decision rests on yours and your teammates shoulders. Cheers!
Would you like to read something more about Apache NiFi? Check out our blog series NiFi Ingestion Blog Series
In today's world, real-time data processing is essential for businesses that want to remain competitive and responsive. The ability to obtain results…
Read moreThe Kubeflow Pipelines project has been growing in popularity in recent years. It's getting more prominent due to its capabilities - you can…
Read moreOne of the biggest challenges of today’s Machine Learning world is the lack of standardization when it comes to models training. We all know that data…
Read moreIn this episode of the RadioData Podcast, Adama Kawa talks with Jonas Björk from Acast. Mentioned topics include: analytics use cases implemented at…
Read moreWelcome to the next instalment of the “Power of Big Data” series. The entire series aims to make readers aware of how much Big Data is needed and how…
Read moreThe need for a unified format for geospatial data In recent years, a lot of geospatial frameworks have been created to process and analyze big…
Read moreTogether, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.
What did you find most impressive about GetInData?