Oops! Something went wrong while submitting the form.
We use cookies to improve your browsing experience on our website, to show you personalised content and to analize our website traffic. By browsing our website, you consent to our use of cookies. Read privacy policy.
The objective of this article is to design an ETL workflow using Apache NiFi that will scrape a web page with almost no code to get an endpoint, extract and transform the dataset, and load the transformed data into a Hive table.
Problem Statement
One potential use case where we need to create a data pipeline would be to capture the district level COVID-19 information from the COVID19-India API website, which gets updated daily. So, the aim is to create a flow that collates and loads a dataset into a warehouse system used by various downstream applications for further analysis, and the flow should be easily configurable for future changes.
Prerequisites
Before we start, we must have a basic understanding of Apache NiFi, and having it installed on a system would be a great start for this article. If you do not have it installed, please follow these quick steps. Apache Hive should be added to this architecture, which also requires a fully functional Hadoop framework. For this article, I am using Hive on a single cluster installed locally, but you can use a remote hive connection as well.
Basic Terminologies
Apache NiFi is an ETL tool with flow-based programming that comes with a web UI built to provide an easy way (drag & drop) to handle data flow in real-time. It also supports powerful and scalable means of data routing and transformation, which can be run on a single server or in a clustered mode across many servers.
NiFi workflow consists of processors, the rectangular components that can process, verify, filter, join, split, or adjust data. They exchange pieces of information called FlowFilesthrough queues named connections, and the FlowFile Controller helps to manage the resources between those components.
Web scraping is a process to extract and collect structured web data with automation. It includes extracting and processing underlying HTML code using CSS selectors and the extracted data gets stored into a database.
Apache Hive is a warehouse system built on top of Hadoop used for data summarization, query, and ad-hoc analysis.
Steps for ETL Workflow
The above flow comprises multiple processors each performing different tasks at different stages to process data. The different stages are Collect (InvokeHTTP - API Web Page, InvokeHTTP - Download District Data), Filter (GetHTMLElement, ExtractEndPoints, RouteOnAttribute - District API, QueryRecord), Transform (ReplaceHeaders, ConvertJSONToSQL), Load (PutHiveQL), and Logging (LogAttribute). Each processor is connected through different relationship connections and gets triggered on success until the data gets loaded into the table. The entire flow is scheduled to run daily.
So, let’s dig into each step to understand the flow better.
1. Get the HTML document using the Remote URL
The flow starts with an InvokeHTTP processor that sends an HTTP GET request to the COVID19-India API URL and returns an HTML page in the response queue for further inspection. The processor can be used to invoke multiple HTTP methods (GET, PUT, POST, or PATCH) as well.
2. Extract listed endpoints
The second step occurs when the GETHTMLElementprocessor targets HTML table rows from the response where all the endpoints are listed inside anchor tags using the CSS selector as tr > td > a. and extracts data into FlowFiles.
After the success of the previous step, the ExtractText processor evaluates regular expressions against the content of the FlowFile to extract the URLs, which are then assigned to a FlowFile attribute named data_url.
Note: The layout of the web page may have changed in the future. So, if you are reading this article in the future, configure the above processors as per the layout changes if any.
3. Pick districts API and Download the dataset
Here, the RouteOnAttribute processor filters out an API for district-level information and ignores other APIs usingApache NiFi Expression since we are only interested in district.csv.
And this time, the InvokeHTTP processor downloads the data using the extracted API endpoint assigned to the attribute data_url surrounded with curly braces and the response data will be in the CSV format.
4. Transform and Filter the dataset
In this stage, the header of the response data is changed to lowercase using the ReplaceTextprocessor with Literal Replace strategy, and the first field name is changed from date to recorded_date to avoid using reserved database keywords.
Since the data is being updated daily on an incremental basis, we will only extract the data from the previous day using the QueryRecord processor. It will also convert the CSV data into JSON FlowFile using the CSVReader and JsonRecordSetWriter controller services.
Please note that both the CSVReader and JsonRecordSetWriter services can have the default settings for our use. You can check out this blog for more reading on the controller services.
And as mentioned, QueryRecord evaluates the below query to get data from the previous day out of the FlowFile and passes it to the next processor.
select * from FlowFile where recorded_date='${now():toNumber():minus(86400000):format('yyyy-MM-dd')}'
5. Establish JDBC connection pool for Hive and create a table
Let’s set up the Hive JDBC driver for the NiFi flow using HiveConnectinPool with required local/remote configurations (database connection URL, user, and password). Hive Configuration Resources property expects Hive configuration file path, i.e., hive-site.xml.
Now, we need an empty table to load the data from the NiFi flow, and to do so, you can use the DDL structure below:
In this step, the JSON-formatted FlowFile is converted into an SQL statement using ConvertJSONToSQL to provide a SQL query as the output FlowFile. We can configure the HiveConnectinPool for the JDBC Connection Pool property along with the table name and statement type before running the processor. In this case, the statement would be an inserttype since we need to load the data into the table.
Also, please note that when preparing a SQL command, the SQL Parameter Attribute Prefix property should be hiveql. Otherwise, the very next processor will not be able to identify it and will throw an error.
Then, on success, PutHiveQL executes the input SQL command and loads the data into the table. The success of this processor marks the end of the workflow and the data can be verified by fetching the target table.
7. Schedule the flow for daily updates
You can schedule the entire flow to run at any given time using different NiFi scheduling strategies. Since the first InvokeHTTP is the initiator of this flow, we can configure it to run daily at 2 AM.
8. Log Management
Almost every processor has been directed to the LogAttribute processor with a failure/success queue, which will write the state and information of all used attributes into the NiFi file, logs/nifi-app.log. By checking this file, we can debug and fix the issue in case of any failure. To extend it even further, we can also set up a flow to capture and notify error logs using Apache Kafka over email.
9. Consume data for analysis
You can use various open-source visualization tools to start off with the exploratory data analysis on the data stored in the Hive table.
You can download the template covid_etl_workflow.xml and run it on your machine for reference.
Future Scope
There are different ways to build any workflow, and this was one of them. You can take this further by allowing multiple datasets (state_wise, test_datasets) from the list with different combinations of various processors/controllers as a part of the flow.
You can also try scraping data from a product listing page of multiple e-commerce websites for a comparison between goods and price or you can even extract movie reviews and ratings from the IMDb website and use it as a recommendation for users.
Conclusion
In this article, we discussed Apache NiFi and created a workflow to extract, filter, transform, and load the data for analysis purposes. If you are more comfortable building logics and want to focus on the architecture with less code, then Apache NiFi is the tool for you.
Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Building an ETL Workflow Using Apache NiFi and Hive
The objective of this article is to design an ETL workflow using Apache NiFi that will scrape a web page with almost no code to get an endpoint, extract and transform the dataset, and load the transformed data into a Hive table.
Problem Statement
One potential use case where we need to create a data pipeline would be to capture the district level COVID-19 information from the COVID19-India API website, which gets updated daily. So, the aim is to create a flow that collates and loads a dataset into a warehouse system used by various downstream applications for further analysis, and the flow should be easily configurable for future changes.
Prerequisites
Before we start, we must have a basic understanding of Apache NiFi, and having it installed on a system would be a great start for this article. If you do not have it installed, please follow these quick steps. Apache Hive should be added to this architecture, which also requires a fully functional Hadoop framework. For this article, I am using Hive on a single cluster installed locally, but you can use a remote hive connection as well.
Basic Terminologies
Apache NiFi is an ETL tool with flow-based programming that comes with a web UI built to provide an easy way (drag & drop) to handle data flow in real-time. It also supports powerful and scalable means of data routing and transformation, which can be run on a single server or in a clustered mode across many servers.
NiFi workflow consists of processors, the rectangular components that can process, verify, filter, join, split, or adjust data. They exchange pieces of information called FlowFilesthrough queues named connections, and the FlowFile Controller helps to manage the resources between those components.
Web scraping is a process to extract and collect structured web data with automation. It includes extracting and processing underlying HTML code using CSS selectors and the extracted data gets stored into a database.
Apache Hive is a warehouse system built on top of Hadoop used for data summarization, query, and ad-hoc analysis.
Steps for ETL Workflow
The above flow comprises multiple processors each performing different tasks at different stages to process data. The different stages are Collect (InvokeHTTP - API Web Page, InvokeHTTP - Download District Data), Filter (GetHTMLElement, ExtractEndPoints, RouteOnAttribute - District API, QueryRecord), Transform (ReplaceHeaders, ConvertJSONToSQL), Load (PutHiveQL), and Logging (LogAttribute). Each processor is connected through different relationship connections and gets triggered on success until the data gets loaded into the table. The entire flow is scheduled to run daily.
So, let’s dig into each step to understand the flow better.
1. Get the HTML document using the Remote URL
The flow starts with an InvokeHTTP processor that sends an HTTP GET request to the COVID19-India API URL and returns an HTML page in the response queue for further inspection. The processor can be used to invoke multiple HTTP methods (GET, PUT, POST, or PATCH) as well.
2. Extract listed endpoints
The second step occurs when the GETHTMLElementprocessor targets HTML table rows from the response where all the endpoints are listed inside anchor tags using the CSS selector as tr > td > a. and extracts data into FlowFiles.
After the success of the previous step, the ExtractText processor evaluates regular expressions against the content of the FlowFile to extract the URLs, which are then assigned to a FlowFile attribute named data_url.
Note: The layout of the web page may have changed in the future. So, if you are reading this article in the future, configure the above processors as per the layout changes if any.
3. Pick districts API and Download the dataset
Here, the RouteOnAttribute processor filters out an API for district-level information and ignores other APIs usingApache NiFi Expression since we are only interested in district.csv.
And this time, the InvokeHTTP processor downloads the data using the extracted API endpoint assigned to the attribute data_url surrounded with curly braces and the response data will be in the CSV format.
4. Transform and Filter the dataset
In this stage, the header of the response data is changed to lowercase using the ReplaceTextprocessor with Literal Replace strategy, and the first field name is changed from date to recorded_date to avoid using reserved database keywords.
Since the data is being updated daily on an incremental basis, we will only extract the data from the previous day using the QueryRecord processor. It will also convert the CSV data into JSON FlowFile using the CSVReader and JsonRecordSetWriter controller services.
Please note that both the CSVReader and JsonRecordSetWriter services can have the default settings for our use. You can check out this blog for more reading on the controller services.
And as mentioned, QueryRecord evaluates the below query to get data from the previous day out of the FlowFile and passes it to the next processor.
select * from FlowFile where recorded_date='${now():toNumber():minus(86400000):format('yyyy-MM-dd')}'
5. Establish JDBC connection pool for Hive and create a table
Let’s set up the Hive JDBC driver for the NiFi flow using HiveConnectinPool with required local/remote configurations (database connection URL, user, and password). Hive Configuration Resources property expects Hive configuration file path, i.e., hive-site.xml.
Now, we need an empty table to load the data from the NiFi flow, and to do so, you can use the DDL structure below:
In this step, the JSON-formatted FlowFile is converted into an SQL statement using ConvertJSONToSQL to provide a SQL query as the output FlowFile. We can configure the HiveConnectinPool for the JDBC Connection Pool property along with the table name and statement type before running the processor. In this case, the statement would be an inserttype since we need to load the data into the table.
Also, please note that when preparing a SQL command, the SQL Parameter Attribute Prefix property should be hiveql. Otherwise, the very next processor will not be able to identify it and will throw an error.
Then, on success, PutHiveQL executes the input SQL command and loads the data into the table. The success of this processor marks the end of the workflow and the data can be verified by fetching the target table.
7. Schedule the flow for daily updates
You can schedule the entire flow to run at any given time using different NiFi scheduling strategies. Since the first InvokeHTTP is the initiator of this flow, we can configure it to run daily at 2 AM.
8. Log Management
Almost every processor has been directed to the LogAttribute processor with a failure/success queue, which will write the state and information of all used attributes into the NiFi file, logs/nifi-app.log. By checking this file, we can debug and fix the issue in case of any failure. To extend it even further, we can also set up a flow to capture and notify error logs using Apache Kafka over email.
9. Consume data for analysis
You can use various open-source visualization tools to start off with the exploratory data analysis on the data stored in the Hive table.
You can download the template covid_etl_workflow.xml and run it on your machine for reference.
Future Scope
There are different ways to build any workflow, and this was one of them. You can take this further by allowing multiple datasets (state_wise, test_datasets) from the list with different combinations of various processors/controllers as a part of the flow.
You can also try scraping data from a product listing page of multiple e-commerce websites for a comparison between goods and price or you can even extract movie reviews and ratings from the IMDb website and use it as a recommendation for users.
Conclusion
In this article, we discussed Apache NiFi and created a workflow to extract, filter, transform, and load the data for analysis purposes. If you are more comfortable building logics and want to focus on the architecture with less code, then Apache NiFi is the tool for you.
Velotio Technologies is an outsourced software product development partner for top technology startups and enterprises. We partner with companies to design, develop, and scale their products. Our work has been featured on TechCrunch, Product Hunt and more.
We have partnered with our customers to built 90+ transformational products in areas of edge computing, customer data platforms, exascale storage, cloud-native platforms, chatbots, clinical trials, healthcare and investment banking.
Since our founding in 2016, our team has completed more than 90 projects with 220+ employees across the following areas:
Building web/mobile applications
Architecting Cloud infrastructure and Data analytics platforms