Search
Omer Katz

SE Radio 502: Omer Katz on Distributed Task Queues Using Celery

Omer Katz, a software consultant and core contributor to the Celery discusses the Celery task processing framework with host Nikhil Krishna. Discussion covers in depth: the Celery task processing framework, it’s architecture and the underlying messaging protocol libraries on which it it is built; how to setup Celery for your project, and examine the various scenarios for which Celery can be leveraged; how Celery handles task failures, scaling;; weaknesses of Celery, what is next for the Celery project and the improvements planned for the project.

This episode sponsored by OpenZiti.


Show Notes

Related Links

Transcript

Transcript brought to you by IEEE Software magazine.
This transcript was automatically generated. To suggest improvements in the text, please contact [email protected] and include the episode number and URL.

Nikhil Krishna 00:01:05 Hello, and welcome to Software Engineering Radio. My name is Nikhil and I am going to be your host today. And today we are going to be talking to Omer Katz. Omer is a software consultant based in Tel Aviv, Israel. A passionate open source enthusiast, Omer has been programming for over a decade and is a contributor to multiple open source product software projects like Celery, Mongo engine and Oplab. Omer currently is also a committer to the Celery project and is one of the administrators of the project. And he is the founder and CEO of the Katz Consulting Group. He supports high-tech enterprises and startups and inspire by providing solutions to software architecture problems and technical debt. Welcome to the show, Omer. Do you think I have covered your extensive resume? Or do you feel that you need to add something to it?

Omer Katz 00:02:01 Well, I’m married to a beautiful wife, Maya and I have a son, a two-year-old son, which I’m very proud of, and it’s very hard to work on Open Source projects when you have these conditions, with the pandemic and you know, life.

Nikhil Krishna 00:02:24 Cool. Thanks. So, to the topic of discussion today, we are going to be talking about Distributed Task Queues, and how Celery — which is a Python implementation of a distributed task queue — is set up, right? So, we’re going to do a deep dive into how Celery works. Just so that audience understands, can you tell us what is a distributed task queue and for what use cases would one use a distributed task queue?

Omer Katz 00:02:54 Right? So a task queue would be a fiction, in my opinion. A task queue is just a worker that consumes messages and executes code in consequence. It’s a really weird concept to use it as a type of software instead of as a type of architectural building block.

Nikhil Krishna 00:03:16 Okay. So, you mentioned it as an architectural building block. Is the task queue just another name for the job queue?

Omer Katz 00:03:27 No, naturally no, you can use a task queue to execute jobs, but you can use a message queue to publish messages that aren’t necessarily jobs. They could be just data or logs that are not actionable by themselves.

Nikhil Krishna 00:03:48 Okay. So, from a simple perspective, so as a software engineer, can I think of a task queue sort of like an engine, or a means to execute tasks that are not synchronous? So can I make it something about asynchronous execution of tasks?

Omer Katz 00:04:10 Yeah, I guess that’s the right description of the architectural component, but it’s not really a queue of tasks. It’s not a single queue of tasks. I think the term does not really reflect what Celery or other workers do because the complexity behind it is not just a single key. You have a one task queue when you are a startup with two people. But the right term would be a “task processing framework” because Celery can process tasks from one queue, multiple queues. It can utilize the broker topologies that broker allows. For example, RabbitMQ allows fan out. So, you can send the same task to different workers and each worker would do something completely different. As long as the function name is the tasks name is the same. Queue create topic exchanges, which also worked in Redis. So, you can route a task to a specific cluster of workers, which handle it differently than another cluster just by the routing key. Routing key is essentially a string that contains name spaces in it. And a topic exchange can provide a routing key as a glob, so you could exclude or include certain patterns.

Nikhil Krishna 00:05:46 So let’s dig into that a little bit. So just to contrast this a little bit more, so there is, and when you talk about messaging there are other models also in messaging, right? So, for example, the actor model and actors that are running in an actor model. Can you tell us what would be the difference between the architectural pattern of an actor model and the one that we’re talking about today, which is the task queue?

Omer Katz 00:06:14 Yes, well, the actual model as axions where task execution, that platform or engine doesn’t have any accents, you can run, whatever you want with it. One task can do many things or one thing. And after a maintenance, the single responsibility principle, it only does one thing and they communicate with each other. What Celery allows is to execute arbitrary code that you’ve written in Python, asynchronous, using a message broker. There are no really constraints or requirements to what you can or can’t do, which is a problem because people try to run their machine learning pipelines which ever you and I, far better tools for the task.

Nikhil Krishna 00:07:04 So, as I say that a task queue, so given this, can you talk about some of the advantages or why would you actually want to use something like Celery or a distributed task queue for say, a simple job manager or a crown job of some sort?

Omer Katz 00:07:24 Well, Celery is very, very simple to set up, which will always be the case because I think we need a tool that can grow from the startup stage to the enterprise stage. At this point, Celery is for the startup stage and the growing company stage because after that, things start to fail or cause unexpected bugs because it conditions that the Celery is in, is something that it was not designed for when the project started. I mean, you have to remember, we haven’t dealt with this scale back in the day, even not in 2010.

Nikhil Krishna 00:08:07 Right. And yeah, so one of the things about Celery that I noticed is that it is, like pointed out very easy to set up and it is also not a single library, right? So, it uses a messaging protocol, a message broker to kind of run the actual queue itself and the messaging itself. So, Celery was built on top of this other library, called kombu. And as I understand it, kombu is also a message. It’s a wrapper around the messaging protocol for AMQP, right? So, can we step back a little bit and talk about AMQP? What is AMQP and why is it a good fit for something like what Celery does?

Omer Katz 00:08:55 Okay, AMQP is the Advance Message Queuing Protocol, but it has two different protocols under that name. 0.9.1, which is the protocol rather than queue implements. And 1.0, which is the protocol that not many message broker implement, but Apache active and Q does, which we don’t support. Celery does not support it yet. Also, QP Proton supports it, but we don’t support that yet. So basically, we have a concept where there’s a protocol that defines how we communicate with our queues. How do we route tasks to queues? What happens when they are consumed? Now that protocol is not well-defined and it is apparent because RabbitMQ has an addendum as an errata for it. So things have changed. And what you read in the protocol, isn’t the reference implementation because RabbitMQ is those cells that were not known when 0.9.1 was conceived, which for example, is the replication of queues. Now, rather than Q introduced quorum queues. Very, very recently in previous days, you could not keep the availability of RabbitMQ easily.

Nikhil Krishna 00:10:19 Can we go a little bit simpler about, okay, so why is Celery using a messaging protocol as opposed to, like a, you could just have some entries in a database that are just complete. Why messaging protocol?

Omer Katz 00:10:35 So AMQP guarantees delivery, at least as far as delivery. And that is a very interesting property for anyone who wants to run something asynchronously. Because otherwise you’d have to take care of it with yourself. The CP does not guarantee an acknowledgement that the application level. So the most fundamental thing about AMQP is that it was one of the protocols that allowed you to report on the state of the message. It is acknowledged because it’s done, it’s not acknowledged, so we return it to the queue. It can also be rejected and rejected and we deliver it or not. And that is a useful concept because let’s say for example, Celery wants to reject the message, whenever the message fails. That is helpful because you can then route the message where messages go when they fail. So, let’s talk a bit about exchanges and AMQP 0.9.1. And I’ll explain that concept further and why that’s useful.

Omer Katz 00:11:42 So exchanges are basically where tasks land and decide where to go. You have a direct exchange, which just delivers the task to the queue. It is bound on. You can create bindings between exchanges and queues. And if you bind a queue together in exchange and the message is received in that exchange, the queue will get it. You can have a fan out exchange, which is how you deliver one message to multiple queues. Now, why is this useful in general? Let’s imagine you have a social network with feeds. So you want everyone who is following someone to know that a new post was created so you can review their feed in the cache. So, you can fan out that post to all the followers of that user from a fan out exchange that was created just for that user. And then after you’re done, just delete all of the topology. That would cause the message to be consumed from each and every queue, and it would be inserted to each and every user’s feed cache, for example.

Nikhil Krishna 00:12:58 So that’s a big point because that kind of allows one to see that Celery, which is built on top of this messaging library, can also be configured to support these types of scenarios, right? So, you have a fan out scenario or you have a pubsub scenario or you have that queue consumption scenario. So, it’s not just that you have to have one Celery. So, can we talk about a little bit about the Celery library itself? Because one thing I noticed about it is that it has got a plugin architecture, right? So, the Celery library itself has got plugins for the Celerybeat, which is a shadowing option, and then it has kombu. You can also support multiple different types of backends. So maybe we can just step back a little bit and talk about the basic components that somebody would have to do, install or set up in order to implement Celery.

Omer Katz 00:13:56 Well, if you implement Celery, you’d need a framework that maintains its different services logically. And that’s what we have in Celery. We have had out of up framework for running different processes in the same process. So, for example, Celery has its own event group that was within to make the communication with the broker asynchronous. And that is a component and Celery has a consumer, which is also a component. It has Gossip, Mingo, et cetera, et cetera. All of these are plaudible. Now we control the starting of stop and stopping of components using bootstraps. So, you decide which steps you want to run in order, and these steps require other steps. So you basically get an initialization

Nikhil Krishna 00:14:49 So we have the application which would be a phone application we can import Celery into it. And then we have this message broker. Is this message broker have to be a RabbitMQ? Or is that a, what are the other types of message backends that Celery can support?

Omer Katz 00:15:09 We have many, and we have Redis, we have SQS, and we have many more, which are not very well-maintained. So they’re still in experimental state and everybody is welcome to contribute.

Nikhil Krishna 00:15:24 So RabbitMQ obviously is the AMQP message broker. And it’s probably the primary message broker. Does Redis also support AMQP or how do you actually support Redis as a backend?

Omer Katz 00:15:41 So unlike Celery, where there are a lot of design bugs and problems and obstruction problems, kombu’s design is brilliant. What it does is that it emulates AMQP 0.9.1 logically in code. So we create a virtual transport with virtual channels and bindings. And since Redis is programmable, you can use LUA or you can just use a pipeline, then you can just implement whatever you need within Redis. Redis provides a lot of fundamental constructs for storing messages in order, or in some order, which provides you a way to implement it and emulate it. Now, do I understand the implementation? Partially because the reality of an Open Source project is that some things are not well-maintained. But it works and there are many other ASQ platforms as execution platforms, which use Redis as the sole message broker such as RQ, they’re a lot simpler than Celery.

Nikhil Krishna 00:16:58 Awesome. So obviously that means that I misspoke when I said Celery kind of supports RabbitMQ and Redis is basically standing on top of kombu and kombu is the one that actually manages this. So, I think we have kind of like a reasonable idea of what the various parts of Celery is, right? So, can we maybe take an example, right? So, to say, let’s say I’m trying to set up a simple online website for my shop and I want to kind of sell some basic clothing or some wares, right? And I want to also have this feature where I want to send order confirmation email, there are various kind of notifications to my customers about the status of their order, right? So, as you kind of built this simple website in Flask, and now for these notification emails and notifications, maybe by SMS. There are two or three different types of notification, I want to use seven, right? So, for the simple thing, maybe I’ve set it up in a Kubernetes cluster, somewhere on a cloud, maybe Google or Amazon or something. And I want to implement Celery. What would you recommend is the simplest Celery set up that can be used to support this particular requirement?

Omer Katz 00:18:27 So if you’re sending out emails, you’re probably doing that by communicating with an API, because there are providers that do it for you.

Nikhil Krishna 00:18:38 Yeah, something like Twilio or maybe MailChimp or something like that. Yes.

Omer Katz 00:18:44 Something like that. So what I’d recommend is to asynchronous SEO. Now Celery provides concurrency by brief working. So you’d have multiple processes, but you can also use gevent or eventlet which may task execution asynchronous by monkey patching the sockets. And if this is your use case, and you’re mostly Io bound, what I suggest is starting multiple Celery processes in one cluster, which consumed from the same message broker. And that way you’d have concurrency both in the CPU level and the Io level. So you’d be able to run and be able to deliver hundreds of thousands of emails per second, because it’s just calling an API and calling an API asynchronously is very light on the system. So, there will be a lot of contact switch between green threads and you’d be able to utilize multiple CPU’s by starting new processes.

Nikhil Krishna 00:19:52 So the way that’s said, so then that means is that I’ll set up maybe a new container or something in which I will run the Celery worker. And that will be reading from a message broker?

Omer Katz 00:20:02 But if you mention Kubernetes you can also auto scale based on the queue size. So, let’s say you have one Docker container with one process that takes one CPU, but it only process 200 tasks at a time. Now you said that as a threshold before the auto scaler and we’d we to just start new containers and process more. So if you have 350 tasks, all of them will be concurrent now, and then we’ll shut down that instance once we’re done.

Nikhil Krishna 00:20:36 So, as I understand that the scaling will be on the Celery workers, right? And you will have say maybe one instance of the RabbitMQ or Redis or the message broker that kind of handles the queues, correct? So how do I actually post a message onto the queue? Do I have to use a Celery plant or can I use just post a message somehow? Is that a particular standard that I need to use?

Omer Katz 00:21:02 Well, the Celery has a protocol and obligation protocol on top of the AMQP, which should pass over the messages body. You can’t just publish any message to Celery and expect it to work. You need to use Celery client. There is a client for noGS. There is a client for PHB. There was a client for Go. A lot of things are Celery protocol compatible that most people have been using Celery for Python ended.

Nikhil Krishna 00:21:33 So from my Flask website container, I will use this, I will install the Celery client module and then just post the task to the message broker and then the workers will pick it up. So let’s take this example one step further. So, suppose I have kind of gotten a little successful and I’m kind of tasting and my website is becoming popular and I would like to get some analytics on say, how many emails am I sending or how many times that this particular, how many orders people are actually making for a particular product. So I want to do some sort of analysis and I design okay, fine. We will have a separate analysis with data that I cannot build a solution. But now I have a step, this asynchronous step where in addition to creating the order in my regular database, I need to now copy that data, or I need to transform the data or extract it to my data router, right? Do you think that is something that should be done or that can be done good Celery? Or do you think that’s something that’s not very suited for Celery and a better solution might be kind of like a proper ETL pipeline?

Omer Katz 00:22:46 Well, you can, in simple cases, it’s very, very easy, even in course. So let’s say you want to send a confirmation email and then write the record to the DB that says this email was sent. So you update some, the order with a confirmation email send. This is very, very typical, but performing tenancy, ETL or queries that takes hours to complete is simply unnecessary. What you’re doing essentially is hogging the capacity of the cluster for something that one complete for a couple of hours and is performed elsewhere. So at the very least you occupy one core routine. But most users do is occupy one process because they use pre-fork.

Nikhil Krishna 00:23:34 So basically what you’re saying is that it is possible to run that it’s just that you will kind of stop using processes and kind of locking up some of your Celery availability into this. And so basically that might be a problem. Okay. So, let’s kind of get into a little bit of, so we’ve been talking about the best-case scenario so far, right? So, what happens when, say, for some reason my, I don’t know, there was a sale on my website, Black Friday or something, and a lot of orders came in. And my orders kind of came and went and started putting up a lot of Celery workers and it reached the limit that I set by my cloud provider. My cloud provider basically started a Kubernetes cluster started killing and evicting the parts. So what actually happens when a Celery worker is killed externally, running out of MBF gets killed. What kind of recovery or re-tries are possible in these kinds of scenarios?

Omer Katz 00:24:40 Right. So when series queue, generally speaking, when series queue is entered at warm shutdown where it is a time out for all tasks to complete and then shuts down. But Celery also has a cold shutdown, which says heal old tasks and exit immediately. So it really depends on the signal you send. If you send, say quick, you’ll get a cold shut down, and if you say SIG in, that warm shut down. It will send SIG in twice, you’ll get a cold shutdown instead. Which makes sense because usually you just create compulsive twice. We want to exit Celery when it’s running in the program. So, when Kubernetes does this, it also has a timeout on when it considers that container to be shut down gracefully. So you should be setting that to the timeout that you set for Celery to shut down. Give it even a little buffer for a few more seconds, just so you won’t get the alerts because these containers were shut down improperly, and if you don’t manage that, it will cause alert fatigue, and you won’t know what’s happening in your cluster.

Nikhil Krishna 00:25:55 So, what actually happens to the task? So, if it’s a long running task, for example, does that mean that the task can be retried? What guarantees does Celery provides?

Omer Katz 00:26:10 Yeah, it does mean it can be retried, but it really depends on how you configure Celery. Celery by default acknowledges tasks early, it’s a reasonable choice for LE2000 and 2010, but nowadays having it the other way around where you acknowledge late has some merits. So, late acknowledgements are very, very useful for creating tasks, which can be re-queued in case of failure, or if something happened. Because you acknowledged the task only if it is complete. You acknowledge early in case where the task execution does not matter, you’ve got the message and you acknowledged it and then something went wrong and you don’t want it to be in the queue again.

Nikhil Krishna 00:27:04 So if it’s not item potent, that would be something that you want to acknowledge early.

Omer Katz 00:27:10 Yeah. And the fact that Celery chose the default that makes tasks not idempotent, allowed to be not idempotent, is my opinion a bad decision, because if tests are idempotent, they can be retried very, very easily. So, I think so we should encourage that by design. So, if you have late acknowledgement, you acknowledge the task by the end of it, if it fails, or if it succeeds. And that allows you to just get the message back in case it was not acknowledged. So RabbitMQ and Redis has a visibility Donald of some sort. And we use different terms, but they have the visibility Donald where the message is still considered delivered and not acknowledged. After that, while it returns the message to queue back, and it says that you can consume it. Now RabbitMQ also has something interesting when you just shut down a connection, so when you kill it, so you shut down the connection and you shut down the channel, the connection was bound to, which is the way for RabbitMQ to multiplex messages over one connection. No, not the fan out scenario. In AMQP you have a connection and you have a channel. Now you can have one TCP connection, but a channel, multiplexes that connection for multiple queues. So logically, if you look at the channel logically, it’s like a virtual private network.

Nikhil Krishna 00:28:53 So you’re kind of like toggling through the same TCP connection, you’re sharing it between multiple queues, okay, understood.

Omer Katz 00:29:02 Yes and so when we close the channel, RabbitMQ remembers which tasks were delivered to that channel, and it immediately pops it back.

Nikhil Krishna 00:29:12 So if you have for whatever reason, if you have multiple workers on multiple machines, multiple Docker containers, and one of them is killed, then what you’re saying is that RabbitMQ knows that channel has died or closed. And it remembers the tasks that were on that channel and puts it on the other channel so that the other worker can work on it.

Omer Katz 00:29:36 Yeah. This is called a Knock, where a message is not acknowledged, if it’s not acknowledged, it’s returned back to the queue it originated from.

Nikhil Krishna 00:29:46 So, you’re saying that, there is a similar visibility mechanism for Redis as well, correct?

Omer Katz 00:29:53 Yeah, not similar because Redis does not really have channels. And we do not track which tasks we delivered, where, which, because that could be disastrous for the scalability of the system on top of Redis. So, what we do is only provide the time-outs and maximum time out. This is also relevant in SQS as well, because both of them has the same concept of visibility, timeout, where if the task does not get processed, let’s say 360 seconds it’s returned back to the queue. So, it’s a basic timeout.

Nikhil Krishna 00:31:07 So, is that something that as a developer, so in my earliest scenarios, say for example we were doing an ETL as well as a notification. Notifications usually will happen quickly while an ETL can take, say a couple of hours as well. So is that a case where we can go to Redis so we can configure out in Celery for this type of task, increase the visibility time out so that it doesn’tÖ

Omer Katz 00:31:33 No, unfortunately no. Actually that’s a good idea, but what you can do is create two Celery processes, Celery processes which have different configurations. And I’d say actually that these are two different projects with two different code bases in my opinion.

Nikhil Krishna 00:31:52 So basically separate them into two workers, one worker that is just handling the long running task and the other worker doing the notifications. So obviously where there are failures and there are things like this, you obviously also want to have some kind of visibility into what is happening inside the Celery book alright? So can you talk a little bit about how we can monitor tasks and how maybe that of logging in tasks?

Omer Katz 00:32:22 Currently, the only monitoring tool we have is Flower, which is another Open Source project that listens to the events protocol Celery publishes to the broker and gets a lot of meta from there. But basically, the resolved backend is where you monitor, how tasks are going. You can report the state of the task. You can provide custom states, you can provide progress, context, whatever context you have to the progress of the task. And that could allow you to monitor rates within external system that just listens to changes just like Flower. If for example, you have something that translates these two stats D you could have monitoring as well. Celery is not very observable. One of the goals of Celery NextGen would be to integrated it completely with open telemetry, so it will just provide a lot more data into what’s going on. Right now, the only monitoring we provide is through the event system. You can also inspect to check the current status of the Celery process, so you can see how many active tasks there are. You can get that in Json too. So if you do that periodically, and push that to your logging system, maybe make that of use.

Nikhil Krishna 00:33:48 So obviously if you don’t have that much visibility in monitoring, how does Celery handle logging? So, is it possible to kind of extend the logging of Celery so that we can add more logging to maybe try and see if we can get more data information on what is happening from that perspective?

Omer Katz 00:34:08 Well, logging is configurable as much as Django’s logging is configurable.

Nikhil Krishna 00:34:13 Ah okay so it’s like general extension of the Python locking libraries?

Omer Katz 00:34:17 Yes, pretty much. And one of the things that Celery does is that it tries to be compatible with Django, so it can take Django configuration and apply it to Celery, for logging. And that’s why they work the same way. As far as logging more data that’s entirely possible because Celery is very extensible when it’s user-facing. So, you could just override the tasks class and override the hooks before start after start, stuff like that. You could register to signals and log data from the signals. You could actually implement open telemetry. And I think in the complete package of open telemetry, there is an implementation for Celery. Not sure that’s the state right now. So, it is entirely possible to do that. It’s just that it wasn’t implemented yet.

Nikhil Krishna 00:35:11 So it’s not kind of like native to Celery per se, but it is, it provides extension points and hooks so that you can implement it yourself as you see fit. So moving on to a little bit more about how to scale a Celery implementation, earlier you had mentioned and you had said that Celery is a good option for startups. But as you grows you start seeing some of the problems of the limitations of a Celery implementation. Obviously when you’re in a startup, more than any other developer there, you kind of want to maximize, you said, you wonder what choice you made. So, if you made Celery choice, then basically would want to first try to see how far you can take it before then go with another alternative. So, what other typical bottlenecks that usually occur with Celery? What is the first thing that kind of starts failing? One of the first warning signs that your Celery set up is not working as you thought it would be?

Omer Katz 00:36:22 Well, for starters, very large workflows. Celery has a concept of canvases, which are building blocks for creating a workflow dynamically, not declaratively by, but by just composing tasks together at the hook and delaying them. Now, when you have a very large workflow, a very large canvas that is serialized back into a message broker, things get messy because Celery’s protocol was not designed for that scale. So, it could just turn up to be 10 gigabytes or 20 gigabytes, and we’ll try to push that to the broker. We’ve had an issue about it. And I just told the user to use compression. Celery’s supports compression of its protocol. And it’s something I encourage people to use when they start growing from the startup stage to the growing stage and have requirements that are not up to what Celery was designed for.

Nikhil Krishna 00:37:21 So when you say compression, what exactly does that mean? Does that mean that I can actually take a Celery message and zip it and send it and they will automatically pick it up? So, if your message size becomes too large, or if you’ve got too many parameters in your message, like I said, you created canvas or it’s a set of operations that you’re trying to do, then you can kind of zip it up and send it out. That’s interesting. I did not know that. That’s very interesting.

Omer Katz 00:37:51 Another thing is trying to run machine learning pipelines because machine learning pipelines, for the most part use pre-fork themselves in Python to parallelize work and that doesn’t work well with pre-fork. It sometimes does, it sometimes doesn’t, billiard is new to me and very much not documented. Billiard is series implementation of multiprocessing that fork allows you to support multiple Python versions in the same library with some extensions to it that I really don’t know how they work. Billiard was the component that was never, ever documented. So, the most important component of Celery right now is something we don’t know what to do with.

Nikhil Krishna 00:38:53 Interesting. So billiard essentially would be something you’d want to use if you have some components that are for different portion, Python portion, or if they are not standard kind of implementations?

Omer Katz 00:39:09 Yeah. Joblib has a similar project called Loky, which does a very similar thing. And I’ve actually thought about dumping billiard and using their implementation, but that would require a lot of work. And given that item has now a viable way to remove the global interpreter lock. Then maybe we don’t need to invest that much in proof of work anymore. Now, for those that don’t know, Python and Ruby and Lua and noJS and other interpreted languages have a global interpreter lock. This is a single arm Utex, which controls the entire program. So, when two threads try to rob a Python byte code, only one of them succeeds because a lot of operations in Python are atomy. So, if you have a list and we append to it, you expect that to happen without an additional lock.

Nikhil Krishna 00:40:13 How does that kind of affect Celery? Is that one of the reasons why using an event loop for reading from the message queue?

Omer Katz 00:40:23 Yeah. That’s one of the reasons for using an event loop for reading from the message queue, because we don’t want to use a lot of CPU power to pull and block.

Nikhil Krishna 00:40:35 That’s also probably why Celery implementation favor process working versus threads.

Omer Katz 00:40:46 Apparently having one Utex is better than having infinite amount of media, because for every list you create, you’ll have to create a lock to make or to ensure all operations that are guaranteed to be atomic, to be atomic. And it’s at least one lock. So removing the GIL is very hard. And someone found an approach that appears very, very promising. I’m very much hoping that Celery could by default work with threads because it will simplify the code base greatly. And we could leave out pre-forking as an extension for someone else to implement.

Nikhil Krishna 00:41:26 So obviously we talked about these kinds of bottlenecks, and we obviously know that the threading approach is simpler. Other than Celery, obviously they kind of preferred to, there are other approaches to doing this particular task so the whole idea of message queuing and task execution is not new. We have other orchestration tools, right? There are things called workflow orchestration tools. In fact, I think some of them use Celery as well. Can you maybe talk a little bit about what is the difference between a workflow orchestration tool and a library like Celery?

Omer Katz 00:42:10 So Celery is a lower-level library. It’s a building log of those tools because as I said, it’s a fast execution platform. You just say, I want this stuff to be executed. And at some point it will, and if it Won’t you will know about it. So, these tools can use Celery as a building block for publishing their own tasks and executing something that they need to do.

Nikhil Krishna 00:42:41 On top of that.

Omer Katz 00:42:41 Yeah, on top of that.

Nikhil Krishna 00:42:43 So given that, there is these options like Airflow and Luigi, which had a couple of the work orchestration tools, we talked about the canvas object, right? Where you can actually do multiple tasks or kind of orchestrate multiple tasks. Do you think that it might be better to maybe use these higher-level tools to do that kind of orchestration? Or do you feel that it’s something that can be handled by Celery as well?

Omer Katz 00:43:12 I don’t think Celery was meant for a workflow orchestration. The canvases were meant to be something very simple. You want each task to maintain the single responsibility principle. So, what you do is just separate the functionality we discussed or sending them information email, and updating the database to two tasks and you would launch a chain of the sending of the email and then updating the database. That helps because each operation can be retried separately. So that’s why canvases exist. They were not meant to run your daily BI batch jobs with 5,000 tasks in parallel that return one response.

Nikhil Krishna 00:44:03 So that’s obviously, like I said, I think we’ve talked about machine learning is not something that is a good fit with Celery.

Omer Katz 00:44:15 Regarding Apache Airflow, did you know that it can run over Celery? So, it actually uses Celery as a building block, as a potential building block. Now task is another system that is related more to non-.py that can also run in Celery because Joblib, which is the job runner for Dusk can run tasks in Celery to process them in parallel. So many, many tools actually use Celery as a foundational building block.

Nikhil Krishna 00:44:48 So Dusk, if I’m not mistaken, is also a task parallelization, let’s say it’s a way to kind of break up your process or your machine learning thing into multiple parallel processes that can run in parallel. So, it’s interesting that it uses Celery underneath it. So, it kind of gives you that idea that okay, as we kind of grow up and become more sophisticated in our workflows and in our pipelines that there are these larger constructs that you can probably build on top of Celery, that kind of handle that. So, one kind of different thought that I was thinking about when looking at Celery, was the idea of event-driven architectures? So, there are entire architectures nowadays that basically are driven around this idea of, okay, you put an event in a, in a Buster, in a queue, or you have some kind of broker and everything is events and you basically have things kind of resolved as you go through all these events. So maybe let’s talk a little bit about, is that something that Celery can fit into, or is that something that is better handled by a specialized enterprise service bus or something like that?

Omer Katz 00:46:04 I don’t think anyone thought it’s crude, but it can. So, as I mentioned regarding the topologies, the message topologies that NQP provides us, we can use those to implement an event driven architecture using Celery. You have different workers with different projects using the same task name. So, when you just delay the task, when you send it, what will happen will depend on the routing key. Because if you bind too huge to a topic exchange and you provide a routing key for each one, you’d be able to route it to the right direction and have something that responds to an event in a certain way, just because of the routing key. You could also fan out, which is again, you use it posted something and then, well, everybody needs to know about it. So, in essence, this task is actually an event, but it’s still treated as a job.

Omer Katz 00:47:08 Instead of as an event, this is something that I intend to change. In Enterprise Integration Patterns, there are three types of messages. The enterprise integration pattern is a very good book about messaging in general. It’s a little bit outdated, but not by very much. It’s still run today. And it defines three types of messages. You have a command, you have an event and you have a document. A command is a task. This is what we’re doing today. And an event is what it describes, what happened. Now Celery in response to that should execute multiple tasks. So, when Celery gets an event, it should publish multiple tasks to the message broker. That’s what it should do. And document message is just data. This is very common with Kafka, for example. You just push the log, the exact logline that you received, and someone else will do something with it, who knows what?

Omer Katz 00:48:13 Maybe they’ll push it to the elastic search, maybe they’ll transform it, maybe they’ll run an analytic on it. You don’t care, you just push the data. And that’s also something Celery is missing because with these three concepts, you can define workflows that do a lot more than what Celery can do. So, if you have a document message, you essentially have a result of a task that is muddled in messaging terms. So, you can send the result to another queue and there would be a transformer that transforms it to a task that is the next in line for execution, we didn’t work through.

Nikhil Krishna 00:48:58 So you can basically create hierarchies of Celery workers that handle different types of things. So, you have one event that comes in and that kind of triggers a Celery worker which broadcast more works or more tasks. And then that is kind of picked up by others. Okay, very interesting. So that seems to be a pretty interesting towards implementing event-driven architectures, to be honest, sounds like it’s something that we can do very simply without actually having to buy or invest in a huge message queuing or an enterprise service bus or something like that. And it sounds kind of great way to check out or experiment with event-driven architecture. So just to look back a little bit to earlier in the beginning, when we talked about the difference between actors and Celery worker. And we mentioned that, Hey, an actor basically is a single responsibility principle and does a single thing and it sends one message.

Nikhil Krishna 00:50:00 Another interesting thing about actors is the fact that they have supervisors and they have this whole impact where you know when something and an actor dies. So, when something happens, it has a way to automatically restart in Celery. Are there any kind of faults or design, any ideas around doing something like that for Celery? Is that kind of like a way to say, okay, I’m monitoring my Celery workers, this one goes down, this particular task is not running appropriately. Can I restart it, or can I create a new work? Or is that something that we kind of right now, I know you mentioned that you can have Kubernetes do that by doing the worker shut down, but then that assumes that the work is shutting down. If it’s not shutting down or it’s just stuck or something like that. Then how do we handle that? Yes, if the process is stuck, maybe it’s running for too long or if it’s running out of memory or something like that.

Omer Katz 00:51:01 You can limit to the amount of memory each task takes. And if it exceeds it, the worker goes down, you can say how many tasks you want to execute before a worker process goes down, and we can retry tasks. That is if a task failed and you’ve configured a retry, you’ve configured automatic retries, or just exclusively called a retry. You can retry a task that is entirely possible.

Nikhil Krishna 00:51:29 Within the task itself. You can kind of specify that, okay, this task needs to be a retried if it fails.

Omer Katz 00:51:35 Yeah. You can retry for certain exceptions or explicitly call retry by binding the function by just say, bind equals true, and you get the self, off the task instance, and then you can call the tasks classes methods of that task. So you can just call retry. There’s also another thing about that, that I didn’t mention, Replacing. In 4.4 I think, someone added a feature that allows you to replace a canvas mid-flight. So, let’s say you decided not to save the confirmation in the database, but instead, since everything failed and you haven’t sent a single confirmation email just yet, then you replace the task with another task that calls your alerting solution for example. Or you could branch out essentially. So, this gives you a condition. If this happens, run for the rest of the canvas, run this, run this workflow for this task. Or else run this workflow for the end of the task.

Omer Katz 00:52:52 So, we were talking about actors, Celery had an attempt to write an actual framework on top of the existing framework. It’s called FEL. Now, it was just an attempt, no one developed it very far, but I think it’s the wrong approach. Celery was designed with ad hoc framework that had patches over patches over the years. And it’s almost actual like, but it’s not. So, what I thought was that we could just create an actual framework in Python, that will be the facto. I’ll go to actual framework in Python for backup packages. And that framework would be easy enough to use for occasional contributors to be able to contribute to Celery. Because right now the case is that in order to contribute to Celery, you need to know a lot about the code and how it interacts. So, what we want is to replace the internals, but keep the same public API. So, if we bump a major version, everything still works.

Nikhil Krishna 00:54:11 That sounds like a great approach.

Omer Katz 00:54:16 Yeah. That is a great approach. It’s called a project jump starter the repository can be found inside our organization and all are welcome to contribute. It might be to speak a little bit more about the idea or not.

Nikhil Krishna 00:54:31 Absolutely. So I was just going to ask, is there a roadmap for this jump starter, or is this something that is still in the early thinking of prototyping phase?

Omer Katz 00:54:43 Well it’s still in the early prototyping, but there is a direction where we’re going. The focus is on observability and ergonomics. So, you need to be able to know how to write a DSL, for example, in Python. Let me give you the basic concepts of jump starter. Jump starter is a special actual framework because each actor is modeled by an erahi state machine. In a state machine, you have transitions from A to B and from B to C and C to E, et cetera, et cetera, et cetera. Or from A to Z skipping all the rest, but you can’t have conditions for which state can transition to another state. In a hierarchical state machine, you can have State A which can only transition to B and C because they’re child state of state A. We can have state D which cannot transition to B and C because they’re not children states.

Nikhil Krishna 00:55:52 So it’s like a directional, almost like a directed cyclical.

Omer Katz 00:55:58 No, child states of D that was it, not A.

Nikhil Krishna 00:56:02 So, it’s almost like a directed cyclic graph, right?

Omer Katz 00:56:10 Exactly. It’s like a cyclic graph that you can attach hooks on. So, you can attach a hook before the transition happens. After the transition happens, when you exited the state, when you enter the states, when an error occurs, so you can model the entire life cycle of the worker, is it the state machine? Now the basic definition of an actor has a state wishing with a lifecycle in it, just that batteries included you come with batteries included. You have the state machine already configured to starting and stopping itself. So, you have a star trigger and stopped trigger. You can also change the state of the actor to healthy or unhealthy or degraded. You could restart it. And everything that happens, happens through the state machine. Now on top of that, we add two important concepts. The concepts of actor tasks and resources. Actor tasks are tasks that extend the actor’s state machine.

Omer Katz 00:57:20 You can only run one task at a time. So, what that provides you is essentially a workflow where you can say I’m pulling for data. And once I am done polling for data, I’m going to transition to processing data. And then it goes back again to pulling data because you can define loops in the state machine. It’s going complete. It’s not actually a DAB, it’s a graph where you can make loops and cycles and essentially model any, any programming logic you want. So, the actor does not violate the basic free axioms of actors, which is having a single responsibility, having the ability to spawn other actors and massive passing. But it also has this new feature where you can manage the execution of the actor by defining states. So, let’s say when you are integrated state, your integrated state because the actor held checks, that checks S3 fails.

Omer Katz 00:58:28 So you can’t do anything, but you can still process the task that you have. So, this allow running the poll tasks from the degraded state, but you can transition from degraded to processing data. So that models everything you need. Now, in addition to that, I’ve managed to create an API that manages resources, which are complex managers in a declarative way. So, you just define a function, you return the context manager and asking context manager and decorated with a resource, and it will be available to the actor as an attribute. And it will be automatically clean when the actor goes down.

Nikhil Krishna 00:59:14 Okay. But one question I have was that, so you had mentioned that this particular model will be dealt or jumpstart without actually changing the major API of Celery, right? So how does this kind of map into a task? Or does it mean that okay, the after task basically or the classes that we have will remain unchanged and they kind of mapping to actors now and sort of just function?

Omer Katz 00:59:41 So Celery has a task registry, which registers all the tasks in the app, right? So, this is very easy to model. You have an actor which defines one unit of concurrency and has all the tasks, Celery was registered to in the actor. And therefore, when that actor gets a message, it can process that task. And it’s busy, you know, it’s busy because it’s in the state, the tasks is in.

Nikhil Krishna 01:00:14 So it’s almost like you’re building a signaling of the whole framework itself, the context in which the task run is now inside the actor. And so now the active model on top then allows you to kind of understand the state of that particular processing unit. So, is there anything else that we have not covered today that you’d like to talk about in terms of the topic?

Omer Katz 01:00:44 Yeah. It’s been very, very hard to work on this project during the pandemic. And if I were to do it without the support of my clients, I’d have much less time to actually give the attention this project’s needs. This project needs to be revamped and we very much like to be involved. And if you can be involved and use Celery, please donate. Right now, we only have a budget of $5,000 a year or $5,500, something like that. And we will do very much like to reach a budget that allows us to reach more resources in. So, if you have problems with Celery or if you have something that you want to fix and Celery or a feature to add, you can just contact us. We’ll be very much happy to help you with it.

Nikhil Krishna 01:01:41 So that’s a great point. How can our listeners get in touch about the Celery project? Is that something that is there in the main website regarding this donation aspect of it? Or it that’s one aspect of it?

Omer Katz 01:01:58 Yes, it is. And we can just go to our open collective or to a given depository. We have set up the funding from there.

Nikhil Krishna 01:02:07 In that case, when we post this onto the Software Engineering Radio website, I will make sure that these links are there and that our listeners can access them. So, thank you very much Omer. This was a very enjoyable session. I really enjoyed speaking with you about this. Have a great day. End of Audio]


SE Radio theme: “Broken Reality” by Kevin MacLeod (incompetech.com — Licensed under Creative Commons: By Attribution 3.0)

Join the discussion

More from this show