Author: Rahul Ramesh

  • How Apache Airflow Optimizes Complex Workflows in DataWeave’s Technology Platform

    How Apache Airflow Optimizes Complex Workflows in DataWeave’s Technology Platform

    As successful businesses grow, they add a large number of people, customers, tools, technologies, etc. and roll out processes to manage the ever-increasingly complex landscape. Automation ensures that these processes are run in a smooth, efficient, swift, accurate, and cost-effective manner. To this end, Workflow Management Systems (WMS) aid businesses in rolling out an automated platform that manages and optimizes business processes at large scale.

    While workflow management, in itself, is a fairly intricate undertaking, the eventual improvements in productivity and effectiveness far outweigh the effort and costs.

    At DataWeave, on a normal day, we collect, process and generate business insights on terabytes of data for our retail and brand customers. Our core data pipeline ensures consistent data availability for all downstream processes including our proprietary AI/ ML layer. While the data pipeline itself is generic and serves standard workflows, there has been a steady surge in customer-specific use case complexities and the variety of product offerings over the last few years.

    A few months ago, we recognized the need for an orchestration engine. This engine would serve to manage the vast volumes of data received from customers, capture data from competitor websites (which can range in complexity and from 2 to 130+ in number), run the required data transformations, execute the product matching algorithm through our AI systems, process the output through a layer of human verification, generate actionable business insights, feed the insights to reports and dashboards, and more. In addition, this engine would be required to help us manage the diverse customer use cases in a consistent way.

    As a result, we launched a hunt for a suitable WMS. We needed the system to satisfy several criteria:

    • Ability to manage our complex pipeline, which has several integrations and tech dependencies
    • Extendable system that enables us to operate with multiple types of databases, internal apps, utilities, and APIs
    • Plug and play interfaces to execute custom scripts, and QA options at each step
    • Operates with all cloud services
    • Addresses the needs of both ‘Batch’ and ‘Near Real Time’ processes
    • Generates meaningful feedback and stats at every step of the workflow
    • Helps us get away with numerous crontabs, which are hard to manage
    • Execute workflows repeatedly in a consistent and precise manner
    • Ability to combine multiple complex workflows and conditional branching of workflows
    • Provides integrations with our internal project tracking and messaging tools such as, Slack and Jira, for immediate visibility and escalations
    • A fallback mechanism at each step, in case of any subsystem failures.
    • Fits within our existing landscape and doesn’t mandate significant alterations
    • Should support autoscaling since we have varying workloads (the system should scale the worker nodes on-demand)

    On evaluating several WMS providers, we zeroed in on Apache Airflow. Airflow satisfies most of our needs mentioned above, and we’ve already onboarded tens of enterprise customer workflows onto the platform.

    In the following sections, we will cover our Apache Airflow implementation and some of the best practices associated with it.

    DataWeave’s Implementation

    Components

    Broker: A 3 node Rabbit-MQ cluster for high availability. There are 2 separate queues maintained, one for SubDags and one for tasks, as SubDags are very lightweight processes. While they occupy a worker slot, they don’t do any meaningful work apart from waiting for their tasks to complete.

    Meta-DB: MetaDB is one of the most crucial components of Airflow. We use RDS-MySQL for the managed database.

    Controller: The controller consists of the scheduler, web server, file server, and the canary dag. This is hosted in a public subnet.

    Scheduler and Webserver: The scheduler and webserver are part of the standard airflow services.

    File Server: Nginx is used as a file server to serve airflow logs and application logs.

    Canary DAG: The canary DAG mimics the actual load on our workers. It runs every 30 minutes and checks the health of the scheduler and the workers. If the task is not queued at all or has spent more time in the queued state than expected, then either the scheduler or the worker is not functioning as expected. This will trigger an alert.

    Workers: The workers are placed in a private subnet. A general-purpose AWS machine with two types of workers is configured, one for sub-DAGs and one for tasks. The workers are placed in an EC2-Autoscaling group and the size of the group will either grow or shrink depending on the current tasks that are executed.

    Autoscaling of workers

    Increasing the group size: A lambda is triggered in a periodic interval and it checks the length of the RMQ queue. The lambda also knows the current number of workers in the current fleet of workers. Along with that, we also log the average run time of tasks in the DAG. Based on these parameters, we either increase or decrease the group size of the cluster.

    Reducing the group size: When we decrease the number of workers, it also means any of the workers can be taken down and the worker needs to be able to handle it. This is done through termination hooks. We follow an aggressive scale-up policy and a conservative scale-down policy.

    File System: We use EFS (Elastic File System) of AWS as the file system that is shared between the workers and the controller. EFS is a managed NAS that can be mounted on multiple services. By using EFS, we have ensured that all the logs are present in one file system and these logs are accessible from the file server present in the controller. We have put in place a lifecycle policy on EFS to archive data older than 7 days.

    Interfaces: To scale up the computing platform when required, we have a bunch of hooks, libraries, and operators to interact with external systems like Slack, EMR, Jira, S3, Qubole, Athena, and DynamoDB. Standard interfaces like Jira and Slack also help in onboarding the L2 support team. The L2 support relies on Jira and Slack notifications to monitor the DAG progress.

    Deployment

    Deployment in an airflow system is fairly challenging and involves multi-stage deployments.

    Challenges:

    • If we first deploy the controller and if there are any changes in the DAG, the corresponding tasks may not be present in workers. This may lead to a failure.
    • We have to make blue-green deployments as we cannot deploy on the workers where tasks may still be running. Once the worker deployments are done, the controller deployment takes place. If it fails for any reason, both the deployments will be rolled back.

    We use an AWS code-deploy to perform these activities.

    Staging and Development

    For development, we use a docker container from Puckel-Airflow. We have made certain modifications to change the user_id and also to run multiple docker containers on the same system. This will help us to test all the new functionality at a DAG level.

    The staging environment is exactly like the development environment, wherein we have isolated our entire setup in separate VPCs, IAM policies, S3-Buckets, Athena DBs, Meta-DBs, etc. This is done to ensure the staging environment doesn’t interfere with our production systems. The staging setup is also used to test the infra-level changes like autoscaling policy, SLAs, etc.

    In Summary

    Following the deployment of Apache Airflow, we have onboarded several enterprise customers across our product suite and seen up to a 4X improvement in productivity, consistency and efficiency. We have also built a sufficient set of common libraries, connectors, and validation rules over time, which takes care of most of our custom, customer-specific needs. This has enabled us to roll out our solutions much faster and with better ROI.
    As Airflow has been integrated to our communications and project tracking systems, we now have much faster and better visibility on current statuses, issues with sub processes, and duration-based automation processes for escalations.
    At the heart of all the benefits we’ve derived is the fact that we have now achieved much higher consistency in processing large volumes of diverse data, which is one of DataWeave’s key differentiators.
    In subsequent blog posts, we will dive deeper into specific areas of this architecture to provide more details. Stay tuned!

  • Dataweave – CherryPy vs Sanic: Which Python API Framework is Faster?

    Dataweave – CherryPy vs Sanic: Which Python API Framework is Faster?

    Rest APIs play a crucial role in the exchange of data between internal systems of an enterprise, or when connecting with external services.

    When an organization relies on APIs to deliver a service to its clients, the APIs’ performance is crucial, and can make or break the success of the service. It is, therefore, essential to consider and choose an appropriate API framework during the design phase of development. Benefits of choosing the right API framework include the ability to deploy applications at scale, ensuring agility of performance, and future-proofing front-end technologies.

    At DataWeave, we provide Competitive Intelligence as a Service to retailers and consumer brands by aggregating Web data at scale and distilling them to produce actionable competitive insights. To this end, our proprietary data aggregation and analysis platform captures and compiles over a hundred million data points from the Web each day. Sure enough, our platform relies on APIs to deliver data and insights to our customers, as well as for communication between internal subsystems.

    Some Python REST API frameworks we use are:

    • Tornado — which supports asynchronous requests
    • CherryPy — which is multi-threaded
    • Flask-Gunicorn — which enables easy worker management

    It is essential to evaluate API frameworks depending on the demands of your tech platforms and your objectives. At DataWeave, we assess them based on their speed and their ability to support high concurrency. So far, we’ve been using CherryPy, a widely used framework, which has served us well.

    CherryPy

    An easy to use API framework, Cherrypy does not require complex customizations, runs out of the box, and supports concurrency. At DataWeave, we rely on CherryPy to access configurations, serve data to and from different datastores, and deliver customized insights to our customers. So far, this framework has displayed very impressive performance.

    However, a couple of months ago, we were in the process of migrating to python 3 (from python 2), opening doors to a new API framework written exclusively for python 3 — Sanic.

    Sanic

    Sanic uses the same framework that libuv uses, and hence is a good contender for being fast.

    (Libuv is an asynchronous event handler, and one of the reasons for its agility is its ability to handle asynchronous events through callbacks. More info on libuv can be found here)

    In fact, Sanic is reported to be one of the fastest API frameworks in the world today, and uses the same event handler framework as nodejs, which is known to serve fast APIs. More information on Sanic can be found here.

    So we asked ourselves, should we move from CherryPy to Sanic?

    Before jumping on the hype bandwagon, we looked to first benchmark Sanic with CherryPy.

    CherryPy vs Sanic

    Objective

    Benchmark CherryPy and Sanic to process 500 concurrent requests, at a rate of 3500 requests per second.

    Test Setup

    Machine configuration: 4 VCPUs/ 8GB RAM.
    Network Cloud: GCE
    Number of Cherrypy/Sanic APIs: 3 (inserting data into 3 topics of a Kafka cluster)
    Testing tool : apache benchmarking (ab)
    Payload size: All requests are POST requests with 2.1KB of payload.

    API Details

    Sanic: In Async mode
    Cherrypy: 10 concurrent threads in each API — a total of 30 concurrent threads
    Concurrency: Tested APIs at various concurrency levels. The concurrency varied between 10 and 500
    Number of requests: 1,00,000

    Results

    Requests Completion: A lower mean and a lower spread indicate better performance

     

    Observation

    When the concurrency is as low as 10, there is not much difference between the performance of the two API frameworks. However, as the concurrency increases, Sanic’s performance becomes more predictable, and the API framework functions with lower response times.

    Requests / Second: Higher values indicate faster performance

    Sanic clearly achieves higher requests/second because:

    • Sanic is running in Async mode
    • The mean response time for Sanic is much lower, compared to CherryPy

    Failures: Lower values indicate better reliability

    Number of non-2xx responses increased for CherryPy with increase in concurrency. In contrast, number of failed requests in Sanic were below 10, even at high concurrency values.

    Conclusion

    Sanic clearly outperformed CherryPy, and was much faster, while supporting higher concurrency and requests per second, and displaying significantly lower failure rates.

    Following these results, we transitioned to Sanic for ingesting high volume data into our datastores, and started seeing much faster and reliable performance. We now aggregate much larger volumes of data from the Web, at faster rates.

    Of course, as mentioned earlier in the article, it is important to evaluate your API framework based on the nuances of your setup and its relevant objectives. In our setup, Sanic definitely seems to perform better than CherryPy.

    What do you think? Let me know your thoughts in the comments section below.

    If you’re curious to know more about DataWeave’s technology platform, check out our website, and if you wish to join our team, check out our jobs page!