Async Report Generation with Cloud Tasks and Cloud Functions

Why you should offload long running tasks to a distributed queue

October 27, 20219 min read
Async Report Generation with Cloud Tasks and Cloud Functions banner

Introduction

I was previously working on a geospatial web app that deals with big data. There was this feature I worked on where we had to generate CSV reports for datasets that contained at most 300k rows of data at a time. The user also had the ability to filter this data using different filters in the UI, and extract the report based on what you filtered.

This was my initial implementation: First, the UI would send the user's selected filters to the API. The API then sends an Elasticsearch query to our index which then returns all the filtered results. Once the API has received all of them, it would write each result as a row in a CSV file. Finally, the generated file is returned as an attachment for the user which is downloaded directly from the browser.

Previous Extracts Architecture

While I was creating the feature initially, everything worked locally. I was able to download the generated report without any issues, albeit the app taking a few minutes to process the report.

The Problem

The problem arose, however, when we deployed this to production. When you attempt to generate the report, the page loaded for a very, very long time before actually erroring out. It resulted in a poor user experience. Not good.

Behind the scenes, our backend API (hosted on Google App Engine) couldn't handle the sheer amount of data it was writing to the file. This was also a blocking operation, meaning that you wouldn't be able to do anything in the app until this finished. This was the big problem—since our API instance ran on limited resources, we quickly ran into memory issues and server errors when the feature was executed.

At first, we tried to patch this by upgrading the instance and turning on auto scaling. However, we ran into the same problems. It was clear that we needed to find a better way.

The Solution: Task Queues and Message Brokers

When I consulted my seniors for advice on how to make this feature scalable and performant, the initial suggestion was to use Celery. Celery is a distributed task queue perfect for handling asynchronous processes. Prior to this, I actually haven't heard of Celery before and how useful it was for handling these use cases. When I started planning, there had been a few posts about Celery handling (1) asynchronous (2) downloads (3), and this made me feel confident that I was going in the right direction. After a lot of researching and docs-diving, I went ahead and designed the infrastructure for it.

Extracts Architecture with Celery

I found this great article by Tara Yoo and built upon her idea. First, our UI would send the needed filters to the API. The API then receives the request and calls the Celery task for it. Meanwhile, to not block execution, the API would send a 202 Response to the UI. In the background, the Celery task queries our Elasticsearch index with the given filters and returns the results accordingly. The task would then create the CSV report and upload this to Google Cloud Storag (we're using Google Cloud Platform, btw). Finally, the Celery task saves the task id alongside the needed payload such as the signed URL, datetime generated, etc. in the database. While all of this is happening, the UI would poll an API endpoint which would return the task id associated with the current task. When the API returns the payload for that task id, the UI then automatically downloads the file based on the signed URL. I went ahead and presented this to my teammates, and they loved it!

When the implementation came, I quickly ran into some infra-level questions:

  1. Which Google Cloud Platform (GCP) resource should I host my Celery and RabbitMQ instances?
  2. If report generation occurs multiple times in parallel, how do I instantiate my Celery workers accordingly? Do I create a minimum number of workers?
  3. Should I keep using this or can I use GCP resources for it?

Which Google Cloud Platform (GCP) resource should I host my Celery and RabbitMQ instances?

Choosing which GCP resources to deploy our Celery and RabbitMQ instances to was tricky (and the fact that I had very limited experience with infra work made it trickier). The initial plan was to use Cloud Run, but Cloud Run isn’t suitable for handling background tasks since its' containers only start running when they are triggered. I needed something which was available for on-the-fly generation, so I looked at Compute Engine instead. I quickly passed on Compute Engine as well since it had a higher cost and it had a lot of configuration overhead.

If report generation occurs multiple times in parallel, how do I instantiate my celery workers accordingly? Do I create a minimum number of workers?

There was certainly a way to scale the Celery instance, but it required a lot of configuration. I looked around and Kubernetes seemed like the way to go. However, I only had so much time to work on this fix and this was also a huge overhead for me.

Should I keep using this or can I use GCP resources for it?

My manager, which helped me realize that maybe we can explore other options for this, suggested that maybe we can look into other infra resources native to GCP which could significantly help solve our problem.

The [updated] Solution

I found that the most recommended way to use Celery on Google Cloud, was, wait for it—to not use it at all. A bunch of StackOverflow articles recommended Cloud Tasks instead, since it basically does everything that Celery is doing, just more compatible with GCP. With these in mind, I decided to scrap my initial plan. I replaced Celery with Cloud Tasks as my distributed task queue. But what about the Celery tasks? Enter Cloud Functions. These were nifty serverless functions, meaning you don't need to manage a dedicated server for these to work. You can also configure them to use up to 8GB of RAM and 540 seconds of timeout per function. Scalability is not an issue as these automatically scale based on load.

Revised Architecture with GCP resources

In the new architecture, the backend API (hosted on App Engine) would send an HTTP target task to our task queue. The task queue then automatically triggers the Cloud Function, which is responsible for querying the Elasticsearch index, creating the report, uploading it to GCS, and updating the database with the relevant task id dispatched from the UI.

The Implementation

For context, our backend API is written in Django, while the UI is written in React.

I started by creating the model for saving the task id, its’ result in JSON output, and the date it was created.

class AsyncExtractResult(models.Model):
    task_id = models.CharField(blank=False, max_length=255, null=False, db_index=True)
    result = models.TextField(blank=False)
    date_created = models.DateTimeField(auto_now_add=True)

Then, I proceeded to build the API endpoint using Django’s ViewSet. In a nutshell, this simply tries to get the object from the database with the associated task id and returns it as a response. If it doesn't exist yet, respond with a 202 status.

class AsyncExtractViewSet(viewsets.ViewSet):
    serializer = AsyncExtractResultSerializer
    lookup_field = "task_id"

    def retrieve(self, request, *args, **kwargs):
        task_id = self.kwargs.get("task_id", None)
        try:
            async_result = AsyncExtractResult.objects.get(task_id=task_id)
            json_body = json.loads(async_result.result)
            state = json_body.get("status", None)

            if state == "SUCCESS":
                return Response(status=status.HTTP_200_OK, data=json_body)

            elif state == "FAILED":
                return Response(
                    status=status.HTTP_500_INTERNAL_SERVER_ERROR, data=json_body
                )

        except AsyncExtractResult.DoesNotExist:
            return Response(status=status.HTTP_202_ACCEPTED, data={"status": "PENDING"})

The Cloud Function would do all the heavy lifting. Here, it does a handful of things:

  1. Get the params/filters from the request.
  2. Get the Elasticsearch index from where we’ll be extracting the data from.
  3. Extract the index data, and return a generator function for it. We are assuming that the size of the data being extracted is large, and a generator function is perfect for it since it yields one result at a time.
  4. The result yielded from the generator function would be used to create the CSV file, until all of the results have been yielded.
  5. Upload the report to GCS.
  6. Finally, update the database with the task id.
def async_extract(request):
  """
  Cloud Function that processes asynchronous extractions
  """
  request_json = request.get_json()

  task_id = request_json.get("task_id")
  filename = request_json.get("filename", "")
  index = get_es_index()
  gen_fn = extract_index_data(request_json, index)
  export_csv(task_id, filename, gen_fn)
  update_database(task_id, filename)

Aside from these, I also refactored an existing API where our Google App Engine would send a task to our Cloud Tasks. I also configured the necessary connections in GCP such that the task queue would know which cloud function to trigger, some networking stuff, etc.

In the UI, we would poll the backend and ask if the report has been generated. If generation is successful (noted by the response’s status property), download the report with the generated URL. Otherwise, do something with the error. Using WebSockets is also a viable solution over polling for a specific interval, but I didn't know much about WebSockets at the time to be confident enough to implement it (teehee).

useInterval(() => {
  if (extractStatus === ExtractStatus.IN_PROGRESS) {
    getGeneratedReports();
  }
}, 5000);
const getGeneratedReport = async () => {
  try {
    const { data } = await api.get(`/async-extract/${taskId}/`);

    if (data.status === "PENDING") {
      return;
    } else if (data.status === "SUCCESS") {
      downloadReport(data.download_url);
      setExtractStatus(ExtractStatus.SUCCESS);
    }
  } catch (error) {
    logError(error);
    setExtractStatus(ExtractStatus.FAILED);
  }
};

The Conclusion

Overall, I learned a lot about handling long running tasks, the power of queues, and asynchronicity. When dealing with long running tasks, it’s best that you create a queue for it and let a worker do the heavy lifting asynchronously. Never let your server get clogged with a long running task again.

Updated Insights

(07/2023) In my original post, my junior self architected this in a way that was very impractical to test and debug locally. Since the GCP resources we were using didn't have local emulation at the time, it was quite difficult to debug any issues that arose. In hindsight, I could've used Redis Queues instead! It's easier to integrate with and was very much debuggable than the setup I made. I could've also directly used the BigQuery SDK to interface with our data instead of extracting it from Elasticsearch (ES was just a sink for our BQ data anyway).

Also, if we were using Node, I could've implemented this using Server-Sent Events to more efficiently manage HTTP resources, as well as gain the benefits of having the updates real-time.