python

Scheduling Background Tasks in Python with Celery and RabbitMQ

Daniel Easterman

Daniel Easterman on

Scheduling Background Tasks in Python with Celery and RabbitMQ

It's important and useful to schedule background tasks for your Python application. Tasks allow your app to perform time-based or long-running operations without blocking the main thread or slowing down the user-facing functionality of your app.

Background tasks can be used for anything from running recurring jobs like data cleanup or reporting, to sending asynchronous emails or other notifications.

In this article, we will build background tasks using Celery and RabbitMQ to create a weather notification service that will deliver rain alerts with Slack.

But first, let's briefly explore why we would want to use Celery or RabbitMQ in the first place.

Why Celery?

Celery is a widely used distributed task queue framework for Python applications. Celery handles distributing background tasks to worker processes or nodes, manages their execution reliably, and can scale to hundreds or thousands of jobs per second in production environments.

Celery is especially popular in the Python ecosystem because it fits naturally with the language’s syntax, and it's commonly used in frameworks like Django or Flask. Celery uses familiar concepts — decorators for tasks, Python data structures for messages — and integrates easily with Python’s logging, error handling, and testing tools. Its configuration is also Python-based, avoiding the need to learn an entirely separate domain-specific language.

Lastly, Celery’s architecture is highly flexible. It works with different message brokers such as RabbitMQ (which we will use in this tutorial), Redis, or Amazon SQS, letting the developer pick a broker that best suits their workload and infrastructure.

Why RabbitMQ?

RabbitMQ is a robust, battle-tested message broker known for its reliability and flexibility. It supports complex routing patterns via exchanges and queues, offers strong delivery guarantees (including persistence and acknowledgments), and has excellent interoperability across languages and protocols (e.g., AMQP, MQTT, STOMP).

Despite its strengths, like Celery, RabbitMQ can be complex to set up and maintain compared to alternatives like Redis or cloud-native solutions like AWS SQS. Also, RabbitMQ stores messages on disk by default, which can introduce latency. For simple pub/sub or fire-and-forget patterns, lighter brokers like NATS or Redis Streams might perform better and could be easier to manage.

Now let's move on to building our weather notification service.

What We Will Build

In this practical example, we will create a Slack rain notification service that will check the weather forecast every day at 12pm to see whether rain is expected for the next 5 days.

We'll need to add 3 different APIs to our project: the OpenWeatherMap API (for getting our rain data), the GitHub API (which will act as a simple JSON database to ensure we don't get duplicate notifications), and the Slack API for creating messages.

In the last section of this article, we will integrate Celery and RabbitMQ into our application to create the automatic 12pm daily task schedule. Find the full code for this tutorial on GitHub.

The file structure of our Python project will look like the diagram below. main.py is the main entrypoint for our app and will include the code for making the initial request to the OpenWeatherMap API. In the apis folder, we have two separate files — github_data.py and slack.py. These handle saving data to GitHub and creating messages in Slack.

plaintext
. ├── apis │ ├── github_data.py │ └── slack.py ├── celerybeat-schedule ├── main.py └── requirements.txt

Prerequisites and Requirements

  • The steps in this project have been created on macOS running Apple's M2 chip. In particular, later in the Implement Celery and RabbitMQ section, I install and verify RabbitMQ using Homebrew for Mac. But since RabbitMQ is cross-platform, you can also use Linux and Windows. See the official RabbitMQ documentation for more details on installation using other operating systems.
  • This project uses Python 3.10.7, but you can also safely use any later Python version. Everything will be installed with pip inside a virtual environment.
  • You need basic knowledge of Python, experience working with APIs in Python, and some basic knowledge of terminal commands.
  • You need knowledge of GitHub token creation and setting repository permissions.

Initial Python Project Setup

First, let's create a Python project in a new directory called python-celery-rabbitmq. We will also create the core main.py file.

Shell
mkdir python-celery-rabbitmq cd python-celery-rabbitmq touch main.py

Next, before we start installing our packages with pip, we need to set up our virtual environment. First run:

Shell
python -m venv .venv

Activate your virtual environment:

Shell
source .venv/bin/activate

Now we can install all our required packages:

Shell
pip install requests celery python-dotenv

In the next section, we will add the code in main.py to make requests to the OpenWeatherMap API.

Query the OpenWeatherMap API with a New Python Project

Head to the OpenWeatherMap API page, create a new account, and subscribe to the cheapest pay-as-you-go plan called: "One Call API 3.0". The website has a number of other monthly subscription and professional-tier products below the basic "One Call API 3.0" plan. But we can safely ignore those options for the purposes of this article.

Also, confusingly, the OpenWeatherMap website says that the first 1000 API calls are free for One Call API. But once you go to the Stripe payment page, it says only the first 100 calls are free. Anyway, whatever the case, even with a lot of project testing, we should stay way below this limit.

Once we are subscribed, grab the API key from your dashboard, and we can start the actual coding!

At this point, it's important that we don't expose our API key. Let's add the key to our .env file and also make sure the .env file has been added to the project's .gitignore so it is not tracked in version control.

In main.py, add the code below (note: this will change once we implement the Celery functionality in a later section of this post):

py
import os import requests from celery import Celery from celery.schedules import crontab from apis.github_data import check_github_json from dotenv import load_dotenv load_dotenv() def weather_task(city): api_key = str(os.getenv('OPEN_WEATHER_MAP_API_KEY')) url = "https://api.openweathermap.org/data/2.5/forecast" params = {"q": city, "appid": api_key, "units": "metric"} response = requests.get(url, params=params) if response.status_code != 200: print(f"Error {response.status_code}: {response.text}") return data = response.json() summaries = [] for entry in data["list"]: if "12:00:00" in entry["dt_txt"]: main = entry["weather"][0]["main"] description = entry["weather"][0]["description"] if "rain" in description.lower() or "rain" in main.lower() or "rain" in entry: date = entry["dt_txt"].split(" ")[0] summaries.append({ "Date": date, "Weather Description": description }) if summaries: check_github_json(summaries) else: check_github_json(None) if __name__ == "__main__": weather_task("London")

There are three main things to break down in this code snippet:

  1. First of all, we should note that the request is using the API's 5-day forecast endpoint. Since this endpoint returns multiple forecasts in three-hour increments over the 5 days, we need to reduce the amount of data returned. We can do this by just getting a "summary" for the forecast at 12pm, which simplifies things greatly and is sufficient for our purposes here. So we use an if statement to only filter entries containing 12:00:00.

  2. Next, we need to find any instances of rain in the API data. So we'll use another three-part if statement to look for any mention of rain in three possible parts of the data: if "rain" in description.lower() or "rain" in main.lower() or "rain" in entry. If rain is indeed found, we append the date and a plain language weather description to the summaries list.

  3. Lastly, we have one final, important if statement. If we have some rain summary data, we will pass this through to the check_github_json function. But if not, we will just pass None into that function.

Let's see exactly what check_github_json does and how it works together with the other two associated functions in the github_data.py file.

Use the GitHub API to Automatically De-duplicate Notifications

As mentioned earlier, we will be using the GitHub API as a convenient/no-frills database for storing JSON data from the OpenWeatherMap API every time there is a new or unique rain forecast. By storing the data with GitHub, we can check any incoming forecasts from the OpenWeatherMap API and ensure that the new forecasts are actually unique. This will allow us to prevent any duplicate notifications from taking place on Slack.

Create an empty GitHub repository called Weather Data. Add a folder called json and then create a file called data.json inside that. Inside data.json, add an empty array (identical to a Python list): [].

Next, create your GitHub public access token for your repository and set the repository permissions.

Copy the token and add it to your .env file with the key GITHUB_PERSONAL_ACCESS_TOKEN:

plaintext
GITHUB_PERSONAL_ACCESS_TOKEN=your-long-alpha-numeric-github-token-here

Back in our code editor, let's add all the import statements and three functions, called check_github_json, load_github_json, and update_github_json, in a new file called github_data.py:

py
import os import logging import json from github import Github from datetime import datetime, timezone from dotenv import load_dotenv load_dotenv() logger = logging.getLogger(__name__) def load_github_json(): GITHUB_PERSONAL_ACCESS_TOKEN = str(os.getenv('GITHUB_PERSONAL_ACCESS_TOKEN')) github = Github(GITHUB_PERSONAL_ACCESS_TOKEN) try: repo = github.get_user().get_repo('weather-data') file = repo.get_contents("json/data.json") db_data = json.loads(file.decoded_content.decode('utf-8')) return repo, file, db_data except Exception as e: logger.exception("An error occurred") def check_github_json(forecast): if forecast is None: return repo, file, db_data = load_github_json() new_items = [] is_first_run = len(db_data) == 0 for item in forecast: if item not in db_data: print("Make JSON DB entry") db_data.append(item) new_items.append(item) else: print("Skip, found in DB") if new_items: if is_first_run: update_github_json(repo, file, db_data, slack_items=new_items) else: update_github_json(repo, file, db_data, slack_items=[new_items[-1]]) else: print("No new items to commit") def update_github_json(repo, file, updated_data, slack_items): bytes_data = json.dumps(updated_data).encode('utf-8') commit_msg = datetime.now(timezone.utc).strftime("Update weather data - %Y-%m-%d %H:%M:%S UTC") try: repo.update_file(file.path, commit_msg, bytes_data, file.sha) print("Github data updated!") # create_slack_message(slack_items) except Exception as e: logger.exception("An error occurred")

First, take a quick look at the load_github_json function. The code inside this function makes the request to get our weather_data repository, then we get the content of our data.json file and load the file as JSON. The variables repo, file, and db_data can then be reused in check_github_data and update_github_data functions.

In the check_github_json function, we use a for loop to iterate through the forecast data we received from the OpenWeatherMap data in main.py. First, we check if the forecast item is already in our GitHub db_data JSON. If it's not, we append to db_data and the new_items variables. For the first run, we want to send all our new_items to the update_github_json function. On subsequent runs, we just want to pass the last item (using -1) to that function.

Lastly, in update_github_json, we execute the built-in update_file method, which requires a commit message like all changes to a GitHub repository. Here, we create a unique commit message each time by recording the exact time and date of the commit using Python's datetime module.

At this point, let's check that everything is working as intended by running python main.py in our project root.

If we are successful, first, we should see something similar to this printed in our terminal:

Shell
Make JSON DB entry Make JSON DB entry Make JSON DB entry Github data updated!

This is already a pretty good sign that things have worked correctly! Now, go to your GitHub repository and click on the Raw button to see your data more clearly. Your empty array should be replaced with the new weather data:

JSON
[ { "Date": "2025-06-05", "Weather Description": "light rain" }, { "Date": "2025-06-07", "Weather Description": "light rain" }, { "Date": "2025-06-08", "Weather Description": "light rain" } ]

In our next section on the Slack API, we will implement code to fire off a notification once we get a new and unique rain forecast.

Create Message Notifications with the Slack API

In this section, we will use the Slack WebClient SDK.

First, let's go to the Slack API page to get our "Bot" credentials so we can implement our notifications feature:

  • Go to https://api.slack.com/apps, click on Create New App and select From Scratch.
  • In the next modal window, give your app a name and select the workspace you want to deploy it to (it's a good idea to create your own private workspace dedicated to this project).
  • Click on OAuth & Permissions, and then under Bot Token Scopes, update the following three permissions: channels:join, channels:read, and chat:write.

(It will soon become clear why we need the channel permissions in addition to the more obvious chat:write permission).

  • Lastly, near the top of the page, click on Install to Workspace. This will generate your new Bot User OAuth Token. Again, copy the token and add it to your .env file with the key SLACK_BOT_USER_TOKEN:
plaintext
SLACK_BOT_USER_TOKEN=your-long-alpha-numeric-bot-token-here

Now we can create a new file in the project root called slack.py and add the code below:

py
import os from slack_sdk import WebClient from dotenv import load_dotenv load_dotenv() def create_slack_message(items): SLACK_BOT_TOKEN = str(os.getenv('SLACK_BOT_USER_TOKEN')) client = WebClient(token=SLACK_BOT_TOKEN) channel = "C0211DW58JD" if isinstance(items, str): text_to_send = items else: message_lines = [f"- {item['Date']}: {item['Weather Description']}" for item in items] text_to_send = "New rain alert(s):\n" + "\n".join(message_lines) try: client.conversations_join(channel=channel) except Exception: pass client.chat_postMessage(channel=channel, text=text_to_send)

After instantiating the WebClient with our SLACK_BOT_USER_TOKEN, we specify the Slack channel we are going to use for our notifications. This must be in ID form as the Slack SDK doesn't understand plain language channel names. So first, in order to get your own unique channel ID, you need to temporarily run the for loop below in your code (if you use the channel ID above, it will not work for your own unique installation):

py
# response = client.conversations_list() # for channel in response['channels']: # print(channel['name'], channel['id'])

Next, the if isinstance(items, str) line checks if the items variable is a plain language string to simply pass through the strings: "No rain expected in the next 5 days" or "No change in the forecast found over the next 5 days". If it's not a string, the code will process the list of dictionaries and display it as a nicely-formatted message for the Slack notification.

The next try/except block is just a convenience measure to ensure that the bot is a member of the channel. If it's not, it makes sure that the bot can automatically join the channel. This saves us from having to manually add the bot as a channel member in the Slack UI (if the bot is not a channel member, we will get errors).

Lastly, with just the line client.chat_postMessage(channel=channel, text=text_to_send), we will execute the chat_postMessage and actually create the message notification.

Now let's go back to github_data.py, import the create_slack_message function, and call it in the three places inside check_github_json and update_github_json, as shown below (annotated with #New for clarity):

py
import os import logging import json from github import Github #New from apis.slack import create_slack_message from datetime import datetime, timezone from dotenv import load_dotenv load_dotenv() logger = logging.getLogger(__name__) ... def check_github_json(forecast): if forecast is None: #New create_slack_message("No rain expected in the next 5 days.") return repo, file, db_data = load_github_json() new_items = [] is_first_run = len(db_data) == 0 for item in forecast: if item not in db_data: print("Make JSON DB entry") db_data.append(item) new_items.append(item) else: print("Skip, found in DB") if new_items: if is_first_run: update_github_json(repo, file, db_data, slack_items=new_items) else: update_github_json(repo, file, db_data, slack_items=[new_items[-1]]) else: print("No new items to commit") #New create_slack_message("No change in the forecast found over the next 5 days.") def update_github_json(repo, file, updated_data, slack_items): bytes_data = json.dumps(updated_data).encode('utf-8') commit_msg = datetime.now(timezone.utc).strftime("Update weather data - %Y-%m-%d %H:%M:%S UTC") try: repo.update_file(file.path, commit_msg, bytes_data, file.sha) print("Github data updated!") #New create_slack_message(slack_items) except Exception as e: logger.exception("An error occurred")

Also, it might be a good idea to revert back to an empty array in our GitHub weather_data repository, so we can run our next test from a clean starting point.

Once that's done, run python main.py again.

If everything is working smoothly, this should trigger a new Slack message notification:

Slack example showing a rain message notification

Implement Celery for Python and RabbitMQ

Before we can start work on our Celery background task system, we need to make sure RabbitMQ is installed and running on our system.

For Mac, the easiest way to install RabbitMQ is with Homebrew. To install on other operating systems, see the official RabbitMQ documentation.

First, run these two installation commands:

Shell
brew install erlang brew install rabbitmq

To start RabbitMQ as a background service, run this command:

Shell
brew services start rabbitmq

You can also stop RabbitMQ at any time with:

Shell
brew services stop rabbitmq

Since we already installed Celery in our project with pip earlier, we can now move onto editing main.py and adding the Celery functionality. Here is our new main.py, with comments indicating where new code has been added:

py
import os import requests from celery import Celery from celery.schedules import crontab from github_data import check_github_json from dotenv import load_dotenv load_dotenv() # New Celery Code: app = Celery('main', broker='amqp://localhost') app.conf.timezone = 'Europe/London' app.conf.broker_pool_limit = 1 app.conf.beat_schedule = { 'check-weather-daily-at-noon': { 'task': 'main.weather_task', 'schedule': crontab(hour=12, minute=0), 'args': ('London',) }, } # New decorator @app.task(name='main.weather_task') def weather_task(city): api_key = str(os.getenv('OPEN_WEATHER_MAP_API_KEY')) url = "https://api.openweathermap.org/data/2.5/forecast" params = {"q": city, "appid": api_key, "units": "metric"} response = requests.get(url, params=params) if response.status_code != 200: print(f"Error {response.status_code}: {response.text}") return data = response.json() summaries = [] for entry in data["list"]: if "12:00:00" in entry["dt_txt"]: main = entry["weather"][0]["main"] description = entry["weather"][0]["description"] if "rain" in description.lower() or "rain" in main.lower() or "rain" in entry: date = entry["dt_txt"].split(" ")[0] summaries.append({ "Date": date, "Weather Description": description }) if summaries: check_github_json(summaries) else: check_github_json(None) # Below used before Celery, commented-out now but kept if you want to test the API without Celery # if __name__ == "__main__": # weather_task("London")

The main code that's of interest in the above snippet is the block labelled "# New Celery Code". On the first line of this block, we create a new Celery application instance and connect it to the local RabbitMQ message broker using the AMQP protocol. Under this line, I've set the timezone to 'Europe/London' (since this is where I'm based), but you should change this to your own timezone.

Next, we limit the number of RabbitMQ broker connections to 1 with app.conf.broker_pool_limit = 1. (Note: you can increase this connection number if you encounter performance issues down the line.)

With app.conf.beat_schedule, we create a configuration dictionary for our Celery scheduler. We call our overall "job": check-weather-daily-at-noon, specify that the task we want to run is the main.weather_task function, and, using crontab, set the task to run every day at 12pm. Finally, the args key passes "London" as an argument to the function, so we receive updates on London weather.

Now, instead of running python main.py, we run our Celery beat and worker commands in one line:

Shell
celery -A main worker --loglevel=info --beat

If everything is working correctly, you should see something similar to the following output:

Shell
-------------- celery@Daniels-MacBook-Pro-4.local v5.5.2 (immunity) --- ***** ----- -- ******* ---- macOS-10.16-x86_64-i386-64bit 2025-06-06 12:14:23 - *** --- * --- - ** ---------- [config] - ** ---------- .> app: main:0x10c163a60 - ** ---------- .> transport: amqp://guest:**@localhost:5672// - ** ---------- .> results: disabled:// - *** --- * --- .> concurrency: 8 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . main.weather_task

In order to be 100% sure that the task will actually fire, you can play around with the hour and minute settings to make it close to your current time (which should finally trigger the message notification in Slack).

And that's it!

Wrapping Up

If you've made it this far, well done — we've done a lot of work together!

In this tutorial, we brought together Celery, RabbitMQ, and three external APIs to create an automated rain notification service. We learned how to fetch weather forecasts with OpenWeatherMap, avoid duplicate alerts using GitHub as a lightweight JSON store, and deliver notifications via the Slack API. Finally, we used Celery and RabbitMQ to run the task automatically every day at noon.

Whether you're monitoring weather, syncing data, or triggering reminders, this project can serve as a foundation to help you build more background automation tasks in the near future.

Happy automating!

Wondering what you can do next?

Finished this article? Here are a few more things you can do:

  • Share this article on social media
Daniel Easterman

Daniel Easterman

Our guest author Daniel is a Technical Writer and Software Developer, writing mainly about Python, Flask, and all things tech.

All articles by Daniel Easterman

Become our next author!

Find out more

AppSignal monitors your apps

AppSignal provides insights for Ruby, Rails, Elixir, Phoenix, Node.js, Express and many other frameworks and libraries. We are located in beautiful Amsterdam. We love stroopwafels. If you do too, let us know. We might send you some!

Discover AppSignal
AppSignal monitors your apps