Shutdown RabbitMQ consumer gracefully

Recently, I need to implement graceful shutdown feature in a message queue consumer handling a large number of transactions in production that can’t be terminated immediately by a simple kill command. Implementation was tricky, though, it’s a great chance to solid my concurrent programming foundation.

Theory

So, the basic idea is to utilize Unix (Unix-like, and POSIX-compliant) signal, a form of IPC (Inter-process communication). User can notify the consumer by the signal() system call from other processes or command kill [PID]. By default, kill command sends SIGTERM, the signal causing program termination, to the target. We just follow the convention.

PHP has a PCNTL extension to manipulate signals. The thought of implementation is to preinstall a signal handler which set the global flag, then free resources and exit after the flag was detected by main loop.

The difficulty here is that, system-level signals would interrupt normal execution of current process, which continues after the signal handler returns. The concurrent execution flow could result in issues such as synchronization and race condition. The buggy code might affect the system ferociously after a long uptime.

A fairly straightforward and simple code is shown below:

Version 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
$running = true;
$connection = new AMQPStreamConnection(...);
$channel = $connection->channel();
$callback = function ($message) {
pcntl_sigprocmask(SIG_BLOCK, [SIGTERM]);
// Critical section entry
// ...
// Critical section exit
pcntl_sigprocmask(SIG_UNBLOCK, [SIGTERM]);
pcntl_signal_dispatch();
};
// Initialization code
// ...
$channel->basic_consume(..., $callback);
$signalHandler = function ($signo) use (&$running) {
$running = false;
echo "Received SIGTERM signal\n";
};
// Install signal handler
pcntl_signal(SIGTERM, $signalHandler);
while (count($channel->callbacks) && $running) {
$channel->wait();
pcntl_signal_dispatch();
}
// Restore signal handler to avoid unexpected interruption
pcntl_signal(SIGTERM, SIG_DFL);
$channel->close();
$connection->close();

The callback function calls pcntl_sigprocmask() to disable all interrupts, or just SIGTERM (depends on scenes) during the critical section, and re-enable interrupts after that. This procedure avoids the possibility that a kill signal breaks the transaction.

Pending signal will not be processed until pcntl_signal_dispatch() is called. An alternate way is to use declare(ticks=1) to check signal periodically.

However, when there is no message handling, the consumer is in idle state waiting for a new one. If we signal the program at this time. It’s quite a great chance to receive the follow warning:

1
stream_select(): unable to select [4]: Interrupted system call (max_fd=7)

The reason is that the $channel->wait() call will eventually invoke the select() Unix function which is interruptable by the SIGTERM.

We can let the warning slipped past of course. However, for a robust application, it’s a good practice to treat all PHP warnings as an exception (as PHP frameworks did such as Laravel and Swoole)

Assume that the warning throws an ErrorException if the stream_select was interrupted. Here is the remedy:

Version 2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// ...
while (count($channel->callbacks) && $running) {
try {
$channel->wait();
} catch (ErrorException $ex) {
if (!preg_match('~stream_select\\(\\)~i', $ex->getMessage())) {
throw $ex;
}
}
pcntl_signal_dispatch();
}
// ...

As there is no specific exception type for the stream_select() interruption, an unfriendly way is to check the message using regular expression.

Seems great! Now go get ready for launch.

Ehh, Houston, we still have a problem.

A race condition is introduced right after the while condition test and before the wait invocation.

1
2
3
4
5
while (count($channel->callbacks) && $running) {
// Interruption here will be hanged until next loop
$channel->wait();
// ...

An Unix style solution is to use pselect() function:

1
2
3
4
5
6
7
8
9
10
11
12
while (count($channel->callbacks)) {
pcntl_sigprocmask(SIG_BLOCK, [SIGTERM]);
pcntl_signal_dispatch();
if (!$running) {
break;
}
$channel->pselect_wait([SIGTERM]);
// ...

pselect() is equivalent to atomically executing the following calls:

1
2
3
pthread_sigmask(SIG_SETMASK, &sigmask, &origmask);
ready = select(nfds, &readfds, &writefds, &exceptfds, timeout);
pthread_sigmask(SIG_SETMASK, &origmask, NULL);

Unfortunately, there is no pselect() or pselect_wait() in PHP, this code is infeasible. We have no way to choose but implementing an inelegant mechanism by passing timeout parameter to the $channel->wait() method.

Version 3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
$running = true;
$connection = new AMQPStreamConnection(...);
$channel = $connection->channel();
$callback = function ($message) {
pcntl_sigprocmask(SIG_BLOCK, [SIGTERM]);
// Critical section entry
// ...
// Critical section exit
pcntl_sigprocmask(SIG_UNBLOCK, [SIGTERM]);
pcntl_signal_dispatch();
};
// Initialization code
// ...
$channel->basic_consume(..., $callback);
$signalHandler = function ($signo) use (&$running) {
$running = false;
echo "Received SIGTERM signal \n";
};
// Install signal handler
pcntl_signal(SIGTERM, $signalHandler);
while (count($channel->callbacks) && $running) {
try {
$channel->wait(null, false, 15);
} catch (ErrorException $ex) {
if (!preg_match('~stream_select\\(\\)~i', $ex->getMessage())) {
throw $ex;
}
} catch (\PhpAmqpLib\Exception\AMQPTimeoutException $ex) {
//
}
pcntl_signal_dispatch();
}
// Restore signal handler to avoid unexpected interruption
pcntl_signal(SIGTERM, SIG_DFL);
$channel->close();
$connection->close();

The $channel->wait() will time out periodically and throw an AMQPTimeoutException. We just ignore it and continue our next loop if no termination signal issued. Of course, the exception might be raised by other reasons as it’s a general timeout exception. Further inspection has to be taken if necessary.

Also, if the SIGTERM is called just before the $channel->wait(), there will be an unavoidable timeout seconds delay before normal exit.

Epilogue

Well, I implemented this feature based on several references, including one project using this mechanism. Still, I don’t think this is an elegant and perfect solution. If you have any better ideas, or there are potential problems persisting, feel free to leave comments below.

References