Hi everyone,
Recently, I have got an error from a user using AiiDA in my k8s cluster.
Whenever he runs or submits a task, the task is stuck at “Created” status with some error messages that I guess related to RabbitMQ
For example, using the script from AiiDA Tutorial
from aiida import engine, orm, plugins
from aiida.orm import load_node
from aiida import load_profile
load_profile()
ArithmeticAdd = plugins.CalculationFactory('core.arithmetic.add')
code = orm.load_code(label='add@local')
x = orm.Int(5)
y = orm.Int(10)
result = engine.run(ArithmeticAdd, code=code, x=x, y=y)
Here, add@local is the code running on the local computer.
The last line produces
---------------------------------------------------------------------------
DeliveryError Traceback (most recent call last)
/tmp/ipykernel_2134/917019779.py in <cell line: 0>()
----> 1 result = engine.run(ArithmeticAdd, code=code, x=x, y=y)
/opt/conda/lib/python3.11/site-packages/aiida/engine/launch.py in run(process, inputs, **kwargs)
46 runner = manager.get_manager().get_runner()
47
---> 48 return runner.run(process, inputs, **kwargs)
49
50
/opt/conda/lib/python3.11/site-packages/aiida/engine/runners.py in run(self, process, inputs, **kwargs)
275 :return: the outputs of the process
276 """
--> 277 result, _ = self._run(process, inputs, **kwargs)
278 return result
279
/opt/conda/lib/python3.11/site-packages/aiida/engine/runners.py in _run(self, process, inputs, **kwargs)
243
244 with utils.loop_scope(self.loop):
--> 245 process_inited = self.instantiate_process(process, **inputs)
246
247 def kill_process(_num, _frame):
/opt/conda/lib/python3.11/site-packages/aiida/engine/runners.py in instantiate_process(self, process, **inputs)
172 from .utils import instantiate_process
173
--> 174 return instantiate_process(self, process, **inputs)
175
176 def submit(self, process: TYPE_SUBMIT_PROCESS, inputs: dict[str, Any] | None = None, **kwargs: Any):
/opt/conda/lib/python3.11/site-packages/aiida/engine/utils.py in instantiate_process(runner, process, **inputs)
84 raise ValueError(f'invalid process {type(process)}, needs to be Process or ProcessBuilder')
85
---> 86 process = process_class(runner=runner, inputs=inputs)
87
88 return process
/opt/conda/lib/python3.11/site-packages/plumpy/base/state_machine.py in __call__(cls, *args, **kwargs)
203 """
204 inst: StateMachine = super().__call__(*args, **kwargs)
--> 205 inst.transition_to(inst.create_initial_state())
206 call_with_super_check(inst.init)
207 return inst
/opt/conda/lib/python3.11/site-packages/plumpy/base/state_machine.py in transition_to(self, new_state, **kwargs)
355 raise
356 self._transition_failing = True
--> 357 self.transition_failed(initial_state_label, label, *sys.exc_info()[1:])
358 finally:
359 self._transition_failing = False
/opt/conda/lib/python3.11/site-packages/plumpy/processes.py in transition_failed(self, initial_state, final_state, exception, trace)
1093 # If we are creating, then reraise instead of failing.
1094 if final_state == process_states.ProcessState.CREATED:
-> 1095 raise exception.with_traceback(trace)
1096
1097 new_state = self._create_state_instance(
/opt/conda/lib/python3.11/site-packages/plumpy/base/state_machine.py in transition_to(self, new_state, **kwargs)
341
342 try:
--> 343 self._enter_next_state(new_state)
344 except StateEntryFailed as exception:
345 new_state = exception.state
/opt/conda/lib/python3.11/site-packages/plumpy/base/state_machine.py in _enter_next_state(self, next_state)
412 next_state.do_enter()
413 self._state = next_state
--> 414 self._fire_state_event(StateEventHook.ENTERED_STATE, last_state)
415
416 def _create_state_instance(self, state_cls: Hashable, **kwargs: Any) -> State:
/opt/conda/lib/python3.11/site-packages/plumpy/base/state_machine.py in _fire_state_event(self, hook, state)
309 def _fire_state_event(self, hook: Hashable, state: Optional[State]) -> None:
310 for callback in self._event_callbacks.get(hook, []):
--> 311 callback(self, hook, state)
312
313 @super_check
/opt/conda/lib/python3.11/site-packages/plumpy/processes.py in <lambda>(_s, _h, from_state)
359 cast(process_states.State, state)
360 ),
--> 361 state_machine.StateEventHook.ENTERED_STATE: lambda _s, _h, from_state: self.on_entered(
362 cast(Optional[process_states.State], from_state)
363 ),
/opt/conda/lib/python3.11/site-packages/aiida/engine/processes/process.py in on_entered(self, from_state)
491 # is possible that they will read the old process state and outputs that they check may not yet have been
492 # attached.
--> 493 super().on_entered(from_state)
494
495 @override
/opt/conda/lib/python3.11/site-packages/plumpy/processes.py in on_entered(self, from_state)
754 self.logger.info('Process<%s>: Broadcasting state change: %s', self.pid, subject)
755 try:
--> 756 self._communicator.broadcast_send(body=None, sender=self.pid, subject=subject)
757 except (ConnectionClosed, ChannelInvalidStateError):
758 message = 'Process<%s>: no connection available to broadcast state change from %s to %s'
/opt/conda/lib/python3.11/site-packages/plumpy/communications.py in broadcast_send(self, body, sender, subject, correlation_id)
178 correlation_id: Optional['ID_TYPE'] = None,
179 ) -> futures.Future:
--> 180 return self._communicator.broadcast_send(body, sender, subject, correlation_id)
181
182 def is_closed(self) -> bool:
/opt/conda/lib/python3.11/site-packages/kiwipy/rmq/threadcomms.py in broadcast_send(self, body, sender, subject, correlation_id)
230 def broadcast_send(self, body, sender=None, subject=None, correlation_id=None):
231 self._ensure_open()
--> 232 result = self._loop_scheduler.await_(
233 self._communicator.broadcast_send(body=body, sender=sender, subject=subject, correlation_id=correlation_id)
234 )
/opt/conda/lib/python3.11/site-packages/pytray/aiothreads.py in await_(self, awaitable, name)
162 """
163 try:
--> 164 return self.await_submit(awaitable).result(timeout=self.task_timeout)
165 except concurrent.futures.TimeoutError as exc:
166 # Try to get a reasonable name for the awaitable
/opt/conda/lib/python3.11/concurrent/futures/_base.py in result(self, timeout)
454 raise CancelledError()
455 elif self._state == FINISHED:
--> 456 return self.__get_result()
457 else:
458 raise TimeoutError()
/opt/conda/lib/python3.11/concurrent/futures/_base.py in __get_result(self)
399 if self._exception:
400 try:
--> 401 raise self._exception
402 finally:
403 # Break a reference cycle with the exception in self._exception
/opt/conda/lib/python3.11/asyncio/tasks.py in __step(***failed resolving arguments***)
277 result = coro.send(None)
278 else:
--> 279 result = coro.throw(exc)
280 except StopIteration as exc:
281 if self._must_cancel:
/opt/conda/lib/python3.11/site-packages/pytray/aiothreads.py in coro()
176
177 async def coro():
--> 178 res = await awaitable
179 if asyncio.isfuture(res):
180 future = ThreadFuture()
/opt/conda/lib/python3.11/site-packages/kiwipy/rmq/communicator.py in broadcast_send(self, body, sender, subject, correlation_id)
518 async def broadcast_send(self, body, sender=None, subject=None, correlation_id=None):
519 publisher = await self.get_message_publisher()
--> 520 result = await publisher.broadcast_send(body, sender, subject, correlation_id)
521 return result
522
/opt/conda/lib/python3.11/site-packages/kiwipy/rmq/communicator.py in broadcast_send(self, msg, sender, subject, correlation_id)
62 )
63 # Send as mandatory=False because we don't expect the message to be routable to anyone
---> 64 return await self.publish(message, routing_key=defaults.BROADCAST_TOPIC, mandatory=False)
65
66
/opt/conda/lib/python3.11/site-packages/kiwipy/rmq/messages.py in publish(self, message, routing_key, mandatory)
206 :return:
207 """
--> 208 result = await self._exchange.publish(message, routing_key=routing_key, mandatory=mandatory)
209 return result
210
/opt/conda/lib/python3.11/site-packages/aio_pika/exchange.py in publish(self, message, routing_key, mandatory, immediate, timeout)
197
198 channel = await self.channel.get_underlay_channel()
--> 199 return await channel.basic_publish(
200 exchange=self.name,
201 routing_key=routing_key,
/opt/conda/lib/python3.11/site-packages/aiormq/channel.py in basic_publish(self, body, exchange, routing_key, properties, mandatory, immediate, timeout, wait)
697 return None
698
--> 699 return await countdown(confirmation)
700
701 async def basic_qos(
/opt/conda/lib/python3.11/site-packages/aiormq/tools.py in __call__(self, coro)
93
94 if self.deadline is None and not timeout:
---> 95 return await coro
96 return await asyncio.wait_for(coro, timeout=timeout)
97
/opt/conda/lib/python3.11/asyncio/futures.py in __await__(self)
285 if not self.done():
286 self._asyncio_future_blocking = True
--> 287 yield self # This tells Task to wait for completion.
288 if not self.done():
289 raise RuntimeError("await wasn't used with future")
/opt/conda/lib/python3.11/asyncio/tasks.py in __wakeup(self, future)
347 def __wakeup(self, future):
348 try:
--> 349 future.result()
350 except BaseException as exc:
351 # This may also be a cancellation.
/opt/conda/lib/python3.11/asyncio/futures.py in result(self)
201 self.__log_traceback = False
202 if self._exception is not None:
--> 203 raise self._exception.with_traceback(self._exception_tb)
204 return self._result
205
DeliveryError: (None, <Basic.Nack object at 0x7f85d9eaf570>)
Here is the information from verdi status
✔ version: AiiDA v2.7.1
✔ config: /home/jovyan/.aiida
✔ profile: default
✔ storage: Storage for 'default' [open] @ postgresql+psycopg://***
✔ broker: RabbitMQ v3.11.26 @ amqp://default_user_Cv34FrkZjt8eVjH0d3g:ZD0-85mad5Us2OFxmvsr7FS7EbblevAh@rabbitmq-cluster.rabbitmq:5672?heartbeat=600
✔ daemon: Daemon is running with PID 2548
Can anyone help to figure out what the problem is here? This problem only happens for this particular user only, other users in my cluster do not experience that issue.
Thank you very much.
Hung Dang