Shutdown RabbitMQ consumer gracefully
Recently, I need to implement the graceful shutdown feature in a project, where there is a message queue consumer handling a large number of transactions in production, which can’t be terminated immediately by a simple kill command. Implementation was tricky, though, it’s a great chance to solidify my concurrent programming foundation.
Theory
The idea is to utilize Unix (Unix-like, and POSIX-compliant) signal, a form of IPC (Inter-process communication). Users can notify the consumer by the signal()
system call from other processes or command kill [PID]
. By default, the kill
command sends SIGTERM
, the signal causing program termination, to the target. We just follow the convention.
PHP has a PCNTL extension to manipulate signals. The thought of implementation is to preinstall a signal handler that set a global flag, then frees resources and exits after the flag was detected by the main loop.
The difficulty here is that, system-level signals would interrupt the normal execution of the current process, which continues after the signal handler returns. The concurrent execution flow could result in issues such as synchronization and race condition. The buggy code might affect the system ferociously after a long uptime.
A fairly straightforward code is shown below.
Version 1
1 | $running = true; |
The callback function calls pcntl_sigprocmask()
to disable all interrupts, or just SIGTERM
(depending on arguments) during the critical section, and re-enable interrupts after that. This procedure avoids the possibility that a kill signal breaks the transaction.
The pending signal will not be processed until pcntl_signal_dispatch()
is called. An alternate way is to use declare(ticks=1)
to check the signal periodically.
However, when there is no message handling, the consumer enters an idle state waiting for a new one. If we signal the program at this time. It’s possible to receive the following warning:
1 | stream_select(): unable to select [4]: Interrupted system call (max_fd=7) |
The reason is that the $channel->wait()
call will eventually invoke the select()
Unix function, which is interruptable by the SIGTERM
.
We can let the warning slip past of course. However, for a robust application, it’s a good practice to treat all PHP warnings as an exception (as some PHP frameworks did, such as Laravel and Swoole)
Assume that the warning throws an ErrorException
if the stream_select
was interrupted. Here is the remedy:
Version 2
1 | // ... |
As there is no specific exception type for the stream_select()
interruption, an unfriendly way is to check the message using regular expression.
Looks great! Now go get ready for launch.
Eh, Houston, we still have a problem.
A race condition is introduced right after the while
condition test and right before the wait
invocation.
1 | while (count($channel->callbacks) && $running) { |
A Unix-style solution is to use the pselect()
function.
1 | // pseudocode: pselect_wait() does not existed |
Here, pselect()
is equivalent to atomically executing the following calls:
1 | pthread_sigmask(SIG_SETMASK, &sigmask, &origmask); |
Unfortunately, there is no pselect()
or pselect_wait()
in PHP, this code is infeasible. We have no choice but to implement an inelegant mechanism by passing timeout parameter to the $channel->wait()
method.
Version 3
1 | $running = true; |
The $channel->wait()
will time out periodically and throw an AMQPTimeoutException
. We just ignore it and continue our next loop if no termination signal is issued. Of course, the exception might be raised for other reasons as it’s a general timeout exception. Further action has to be taken if necessary in your project.
Also, if the SIGTERM
is called just before the $channel->wait()
, there will be an unavoidable delay with timeout seconds before normal exit.
Epilogue
I implemented this feature based on several references, including one project using this mechanism. Still, I won’t say this is the best or perfect solution. If you have any better ideas or find any problems, feel free to reply.
References
https://github.com/php-amqplib/php-amqplib/issues/165
https://linux.die.net/man/2/pselect
https://en.wikipedia.org/wiki/Signal_(IPC)