RabbitMQ NACK on kubernetes setup for single user

Hi everyone,

Recently, I have got an error from a user using AiiDA in my k8s cluster.

Whenever he runs or submits a task, the task is stuck at “Created” status with some error messages that I guess related to RabbitMQ

For example, using the script from AiiDA Tutorial

from aiida import engine, orm, plugins
from aiida.orm import load_node

from aiida import load_profile
load_profile()

ArithmeticAdd = plugins.CalculationFactory('core.arithmetic.add')
code = orm.load_code(label='add@local')
x = orm.Int(5)
y = orm.Int(10)

result = engine.run(ArithmeticAdd, code=code, x=x, y=y)

Here, add@local is the code running on the local computer.

The last line produces

---------------------------------------------------------------------------
DeliveryError                             Traceback (most recent call last)
/tmp/ipykernel_2134/917019779.py in <cell line: 0>()
----> 1 result = engine.run(ArithmeticAdd, code=code, x=x, y=y)

/opt/conda/lib/python3.11/site-packages/aiida/engine/launch.py in run(process, inputs, **kwargs)
     46         runner = manager.get_manager().get_runner()
     47 
---> 48     return runner.run(process, inputs, **kwargs)
     49 
     50 

/opt/conda/lib/python3.11/site-packages/aiida/engine/runners.py in run(self, process, inputs, **kwargs)
    275         :return: the outputs of the process
    276         """
--> 277         result, _ = self._run(process, inputs, **kwargs)
    278         return result
    279 

/opt/conda/lib/python3.11/site-packages/aiida/engine/runners.py in _run(self, process, inputs, **kwargs)
    243 
    244         with utils.loop_scope(self.loop):
--> 245             process_inited = self.instantiate_process(process, **inputs)
    246 
    247             def kill_process(_num, _frame):

/opt/conda/lib/python3.11/site-packages/aiida/engine/runners.py in instantiate_process(self, process, **inputs)
    172         from .utils import instantiate_process
    173 
--> 174         return instantiate_process(self, process, **inputs)
    175 
    176     def submit(self, process: TYPE_SUBMIT_PROCESS, inputs: dict[str, Any] | None = None, **kwargs: Any):

/opt/conda/lib/python3.11/site-packages/aiida/engine/utils.py in instantiate_process(runner, process, **inputs)
     84         raise ValueError(f'invalid process {type(process)}, needs to be Process or ProcessBuilder')
     85 
---> 86     process = process_class(runner=runner, inputs=inputs)
     87 
     88     return process

/opt/conda/lib/python3.11/site-packages/plumpy/base/state_machine.py in __call__(cls, *args, **kwargs)
    203         """
    204         inst: StateMachine = super().__call__(*args, **kwargs)
--> 205         inst.transition_to(inst.create_initial_state())
    206         call_with_super_check(inst.init)
    207         return inst

/opt/conda/lib/python3.11/site-packages/plumpy/base/state_machine.py in transition_to(self, new_state, **kwargs)
    355                 raise
    356             self._transition_failing = True
--> 357             self.transition_failed(initial_state_label, label, *sys.exc_info()[1:])
    358         finally:
    359             self._transition_failing = False

/opt/conda/lib/python3.11/site-packages/plumpy/processes.py in transition_failed(self, initial_state, final_state, exception, trace)
   1093         # If we are creating, then reraise instead of failing.
   1094         if final_state == process_states.ProcessState.CREATED:
-> 1095             raise exception.with_traceback(trace)
   1096 
   1097         new_state = self._create_state_instance(

/opt/conda/lib/python3.11/site-packages/plumpy/base/state_machine.py in transition_to(self, new_state, **kwargs)
    341 
    342             try:
--> 343                 self._enter_next_state(new_state)
    344             except StateEntryFailed as exception:
    345                 new_state = exception.state

/opt/conda/lib/python3.11/site-packages/plumpy/base/state_machine.py in _enter_next_state(self, next_state)
    412         next_state.do_enter()
    413         self._state = next_state
--> 414         self._fire_state_event(StateEventHook.ENTERED_STATE, last_state)
    415 
    416     def _create_state_instance(self, state_cls: Hashable, **kwargs: Any) -> State:

/opt/conda/lib/python3.11/site-packages/plumpy/base/state_machine.py in _fire_state_event(self, hook, state)
    309     def _fire_state_event(self, hook: Hashable, state: Optional[State]) -> None:
    310         for callback in self._event_callbacks.get(hook, []):
--> 311             callback(self, hook, state)
    312 
    313     @super_check

/opt/conda/lib/python3.11/site-packages/plumpy/processes.py in <lambda>(_s, _h, from_state)
    359                 cast(process_states.State, state)
    360             ),
--> 361             state_machine.StateEventHook.ENTERED_STATE: lambda _s, _h, from_state: self.on_entered(
    362                 cast(Optional[process_states.State], from_state)
    363             ),

/opt/conda/lib/python3.11/site-packages/aiida/engine/processes/process.py in on_entered(self, from_state)
    491         # is possible that they will read the old process state and outputs that they check may not yet have been
    492         # attached.
--> 493         super().on_entered(from_state)
    494 
    495     @override

/opt/conda/lib/python3.11/site-packages/plumpy/processes.py in on_entered(self, from_state)
    754             self.logger.info('Process<%s>: Broadcasting state change: %s', self.pid, subject)
    755             try:
--> 756                 self._communicator.broadcast_send(body=None, sender=self.pid, subject=subject)
    757             except (ConnectionClosed, ChannelInvalidStateError):
    758                 message = 'Process<%s>: no connection available to broadcast state change from %s to %s'

/opt/conda/lib/python3.11/site-packages/plumpy/communications.py in broadcast_send(self, body, sender, subject, correlation_id)
    178         correlation_id: Optional['ID_TYPE'] = None,
    179     ) -> futures.Future:
--> 180         return self._communicator.broadcast_send(body, sender, subject, correlation_id)
    181 
    182     def is_closed(self) -> bool:

/opt/conda/lib/python3.11/site-packages/kiwipy/rmq/threadcomms.py in broadcast_send(self, body, sender, subject, correlation_id)
    230     def broadcast_send(self, body, sender=None, subject=None, correlation_id=None):
    231         self._ensure_open()
--> 232         result = self._loop_scheduler.await_(
    233             self._communicator.broadcast_send(body=body, sender=sender, subject=subject, correlation_id=correlation_id)
    234         )

/opt/conda/lib/python3.11/site-packages/pytray/aiothreads.py in await_(self, awaitable, name)
    162         """
    163         try:
--> 164             return self.await_submit(awaitable).result(timeout=self.task_timeout)
    165         except concurrent.futures.TimeoutError as exc:
    166             # Try to get a reasonable name for the awaitable

/opt/conda/lib/python3.11/concurrent/futures/_base.py in result(self, timeout)
    454                     raise CancelledError()
    455                 elif self._state == FINISHED:
--> 456                     return self.__get_result()
    457                 else:
    458                     raise TimeoutError()

/opt/conda/lib/python3.11/concurrent/futures/_base.py in __get_result(self)
    399         if self._exception:
    400             try:
--> 401                 raise self._exception
    402             finally:
    403                 # Break a reference cycle with the exception in self._exception

/opt/conda/lib/python3.11/asyncio/tasks.py in __step(***failed resolving arguments***)
    277                 result = coro.send(None)
    278             else:
--> 279                 result = coro.throw(exc)
    280         except StopIteration as exc:
    281             if self._must_cancel:

/opt/conda/lib/python3.11/site-packages/pytray/aiothreads.py in coro()
    176 
    177         async def coro():
--> 178             res = await awaitable
    179             if asyncio.isfuture(res):
    180                 future = ThreadFuture()

/opt/conda/lib/python3.11/site-packages/kiwipy/rmq/communicator.py in broadcast_send(self, body, sender, subject, correlation_id)
    518     async def broadcast_send(self, body, sender=None, subject=None, correlation_id=None):
    519         publisher = await self.get_message_publisher()
--> 520         result = await publisher.broadcast_send(body, sender, subject, correlation_id)
    521         return result
    522 

/opt/conda/lib/python3.11/site-packages/kiwipy/rmq/communicator.py in broadcast_send(self, msg, sender, subject, correlation_id)
     62         )
     63         # Send as mandatory=False because we don't expect the message to be routable to anyone
---> 64         return await self.publish(message, routing_key=defaults.BROADCAST_TOPIC, mandatory=False)
     65 
     66 

/opt/conda/lib/python3.11/site-packages/kiwipy/rmq/messages.py in publish(self, message, routing_key, mandatory)
    206         :return:
    207         """
--> 208         result = await self._exchange.publish(message, routing_key=routing_key, mandatory=mandatory)
    209         return result
    210 

/opt/conda/lib/python3.11/site-packages/aio_pika/exchange.py in publish(self, message, routing_key, mandatory, immediate, timeout)
    197 
    198         channel = await self.channel.get_underlay_channel()
--> 199         return await channel.basic_publish(
    200             exchange=self.name,
    201             routing_key=routing_key,

/opt/conda/lib/python3.11/site-packages/aiormq/channel.py in basic_publish(self, body, exchange, routing_key, properties, mandatory, immediate, timeout, wait)
    697                 return None
    698 
--> 699         return await countdown(confirmation)
    700 
    701     async def basic_qos(

/opt/conda/lib/python3.11/site-packages/aiormq/tools.py in __call__(self, coro)
     93 
     94         if self.deadline is None and not timeout:
---> 95             return await coro
     96         return await asyncio.wait_for(coro, timeout=timeout)
     97 

/opt/conda/lib/python3.11/asyncio/futures.py in __await__(self)
    285         if not self.done():
    286             self._asyncio_future_blocking = True
--> 287             yield self  # This tells Task to wait for completion.
    288         if not self.done():
    289             raise RuntimeError("await wasn't used with future")

/opt/conda/lib/python3.11/asyncio/tasks.py in __wakeup(self, future)
    347     def __wakeup(self, future):
    348         try:
--> 349             future.result()
    350         except BaseException as exc:
    351             # This may also be a cancellation.

/opt/conda/lib/python3.11/asyncio/futures.py in result(self)
    201         self.__log_traceback = False
    202         if self._exception is not None:
--> 203             raise self._exception.with_traceback(self._exception_tb)
    204         return self._result
    205 

DeliveryError: (None, <Basic.Nack object at 0x7f85d9eaf570>)

Here is the information from verdi status

 ✔ version:     AiiDA v2.7.1
 ✔ config:      /home/jovyan/.aiida
 ✔ profile:     default
 ✔ storage:     Storage for 'default' [open] @ postgresql+psycopg://***
 ✔ broker:      RabbitMQ v3.11.26 @ amqp://default_user_Cv34FrkZjt8eVjH0d3g:ZD0-85mad5Us2OFxmvsr7FS7EbblevAh@rabbitmq-cluster.rabbitmq:5672?heartbeat=600
 ✔ daemon:      Daemon is running with PID 2548

Can anyone help to figure out what the problem is here? This problem only happens for this particular user only, other users in my cluster do not experience that issue.

Thank you very much.

Hung Dang

Thanks for the detailed error report. It looks like when publishing the message, it is directly rejected by the rmq service (negative ACK). There is no information in the error msg that tells me why it was rejected (AFAIK the Basic.Nack does not contain any information that could be unwrapped). That makes it very hard to debug. Is it possible to get the log of the rabbitmq service, maybe there is more information about why it was rejected. You can find the location of the log with rabbitmqctl status

[...]
Log file(s)

 * /.../var/log/rabbitmq/rabbit@m2air.log
[...]
2 Likes

The database has been locked again. However, this mistake is not exactly the same as the previous one. “sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked
[SQL: PRAGMA main.table_info(“alembic_version”)]”. And only “Unable to connect to broker” this time, the Storage is normal. Sorry, it seems I forgot to save the log of rabbitmqctl status, but this problem seems to be somewhat easier to solve than the previous one. Thank you very much for your prompt reply.

Thank you for your reply.

A week after my post, I made a mistake when trying to upgrade calico (a container network interface for Kubernetes) to a wrong version. So I redeployed previous version of calico, then this NACK error was gone.

Now I don’t know what happened to this particular user, as he is the only one who experienced that error. I did try rabbitmqctl statuswhen debugging, but didn’t see anything suspicious. Unfortunately, I cannot reproduce this error now, so it’ll be a mystery.

@dongxinyue45-design Could you please open a new topic? This is a very different error. This gives you also the opportunity that more people look at it, since people will decide on the initial issue if they can help or not. It just looks like aiida does not find rabbitmq on that address. In the new topic please let us now how you started the rabbitmq service so we can help you with setting it up.

@hungdt Hmm… might be something related also with the interaction of rabbitmq and kubernetes. It looked to me like a permission problem. It is hard to say. I’ll keep the topic up some new info is available but update the title.

Thank you for your reminder. This problem can be solved by reopening the terminal, and it seems to be normal now. Thank you very much for your suggestions and discussions. I will continue to explore and use aiida-.

1 Like