Shutdown RabbitMQ consumer gracefully

Recently, I need to implement the graceful shutdown feature in a project, where there is a message queue consumer handling a large number of transactions in production, which can’t be terminated immediately by a simple kill command. Implementation was tricky, though, it’s a great chance to solidify my concurrent programming foundation.

Theory

The idea is to utilize Unix (Unix-like, and POSIX-compliant) signal, a form of IPC (Inter-process communication). Users can notify the consumer by the signal() system call from other processes or command kill [PID]. By default, the kill command sends SIGTERM, the signal causing program termination, to the target. We just follow the convention.

PHP has a PCNTL extension to manipulate signals. The thought of implementation is to preinstall a signal handler that set a global flag, then frees resources and exits after the flag was detected by the main loop.

The difficulty here is that, system-level signals would interrupt the normal execution of the current process, which continues after the signal handler returns. The concurrent execution flow could result in issues such as synchronization and race condition. The buggy code might affect the system ferociously after a long uptime.

A fairly straightforward code is shown below.

Version 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
$running = true;

$connection = new AMQPStreamConnection(...);
$channel = $connection->channel();

$callback = function ($message) {
pcntl_sigprocmask(SIG_BLOCK, [SIGTERM]);

// Critical section entry
// ...
// Critical section exit

pcntl_sigprocmask(SIG_UNBLOCK, [SIGTERM]);
pcntl_signal_dispatch();
};

// Initialization code
// ...
$channel->basic_consume(..., $callback);

$signalHandler = function ($signo) use (&$running) {
$running = false;
echo "Received SIGTERM signal\n";
};

// Install signal handler
pcntl_signal(SIGTERM, $signalHandler);

while (count($channel->callbacks) && $running) {
$channel->wait();
pcntl_signal_dispatch();
}

// Restore signal handler to avoid unexpected interruption
pcntl_signal(SIGTERM, SIG_DFL);

$channel->close();
$connection->close();

The callback function calls pcntl_sigprocmask() to disable all interrupts, or just SIGTERM (depending on arguments) during the critical section, and re-enable interrupts after that. This procedure avoids the possibility that a kill signal breaks the transaction.

The pending signal will not be processed until pcntl_signal_dispatch() is called. An alternate way is to use declare(ticks=1) to check the signal periodically.

However, when there is no message handling, the consumer enters an idle state waiting for a new one. If we signal the program at this time. It’s possible to receive the following warning:

1
stream_select(): unable to select [4]: Interrupted system call (max_fd=7)

The reason is that the $channel->wait() call will eventually invoke the select() Unix function, which is interruptable by the SIGTERM.

We can let the warning slip past of course. However, for a robust application, it’s a good practice to treat all PHP warnings as an exception (as some PHP frameworks did, such as Laravel and Swoole)

Assume that the warning throws an ErrorException if the stream_select was interrupted. Here is the remedy:

Version 2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// ...

while (count($channel->callbacks) && $running) {
try {
$channel->wait();
} catch (ErrorException $ex) {
if (!preg_match('~stream_select\\(\\)~i', $ex->getMessage())) {
throw $ex;
}
}

pcntl_signal_dispatch();
}

// ...

As there is no specific exception type for the stream_select() interruption, an unfriendly way is to check the message using regular expression.

Looks great! Now go get ready for launch.

Eh, Houston, we still have a problem.

A race condition is introduced right after the while condition test and right before the wait invocation.

1
2
3
4
5
while (count($channel->callbacks) && $running) {
// Interruption here will be hanged until next loop
$channel->wait();

// ...

A Unix-style solution is to use the pselect() function.

1
2
3
4
5
6
7
8
9
10
11
12
13
// pseudocode: pselect_wait() does not existed
while (count($channel->callbacks)) {

pcntl_sigprocmask(SIG_BLOCK, [SIGTERM]);
pcntl_signal_dispatch();

if (!$running) {
break;
}

$channel->pselect_wait([SIGTERM]);

// ...

Here, pselect() is equivalent to atomically executing the following calls:

1
2
3
pthread_sigmask(SIG_SETMASK, &sigmask, &origmask);
ready = select(nfds, &readfds, &writefds, &exceptfds, timeout);
pthread_sigmask(SIG_SETMASK, &origmask, NULL);

Unfortunately, there is no pselect() or pselect_wait() in PHP, this code is infeasible. We have no choice but to implement an inelegant mechanism by passing timeout parameter to the $channel->wait() method.

Version 3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
$running = true;

$connection = new AMQPStreamConnection(...);
$channel = $connection->channel();

$callback = function ($message) {
pcntl_sigprocmask(SIG_BLOCK, [SIGTERM]);

// Critical section entry
// ...
// Critical section exit

pcntl_sigprocmask(SIG_UNBLOCK, [SIGTERM]);
pcntl_signal_dispatch();
};

// Initialization code
// ...
$channel->basic_consume(..., $callback);

$signalHandler = function ($signo) use (&$running) {
$running = false;
echo "Received SIGTERM signal \n";
};

// Install signal handler
pcntl_signal(SIGTERM, $signalHandler);

while (count($channel->callbacks) && $running) {
try {
$channel->wait(null, false, 15);
} catch (ErrorException $ex) {
if (!preg_match('~stream_select\\(\\)~i', $ex->getMessage())) {
throw $ex;
}
} catch (\PhpAmqpLib\Exception\AMQPTimeoutException $ex) {
//
}

pcntl_signal_dispatch();
}

// Restore signal handler to avoid unexpected interruption
pcntl_signal(SIGTERM, SIG_DFL);

$channel->close();
$connection->close();

The $channel->wait() will time out periodically and throw an AMQPTimeoutException. We just ignore it and continue our next loop if no termination signal is issued. Of course, the exception might be raised for other reasons as it’s a general timeout exception. Further action has to be taken if necessary in your project.

Also, if the SIGTERM is called just before the $channel->wait(), there will be an unavoidable delay with timeout seconds before normal exit.

Epilogue

I implemented this feature based on several references, including one project using this mechanism. Still, I won’t say this is the best or perfect solution. If you have any better ideas or find any problems, feel free to reply.

References

https://github.com/php-amqplib/php-amqplib/issues/165
https://linux.die.net/man/2/pselect
https://en.wikipedia.org/wiki/Signal_(IPC)