Excepted workchains, due to strange error from kiwipy/plumpy?

Hello,

While running a workflow which calls sub-workflows (in particular PwBaseWorkChain), the calculation excepts due to a very weird behaviour (see error below).

Here the versions:

Python 3.9.18
aiida-core 2.4.2
aiida-hyperqueue 0.1.1
aiida-pseudo 1.4.0
aiida-quantumespresso 4.4.0
kiwipy 0.7.8
plumpy 0.21.10
RabbitMQ 3.7.28

Also looking at the workflow, something stranged happens for the PwBaseWorkChain, where to WorkChains have been called at the same time (?).

    └── PwBaseWorkChain<3345273> Excepted [2:while_(should_run_process)(1:run_process)]
        β”œβ”€β”€ create_kpoints_from_distance<3345274> Finished [0]
        β”œβ”€β”€ PwCalculation<3345278> Finished [410]
        β”œβ”€β”€ PwCalculation<3345356> Waiting
        └── PwCalculation<3345369> Created
2023-12-08 17:00:53 [842570 |  REPORT]: [3345273|PwBaseWorkChain|inspect_process]: PwCalculation<3345278> failed but a handler dealt with the problem, restarting
2023-12-08 17:00:53 [842571 | WARNING]: Process<3345273>: no connection available to broadcast state change from running to running
2023-12-08 17:00:53 [842572 | WARNING]: Process<3345273>: no connection available to broadcast state change from running to running
2023-12-08 17:00:53 [842573 |  REPORT]: [3345273|PwBaseWorkChain|on_except]: Traceback (most recent call last):
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/process_states.py", line 228, in execute
    result = self.run_fn(*self.args, **self.kwargs)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/processes/workchains/workchain.py", line 314, in _do_step
    finished, stepper_result = self._stepper.step()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/workchains.py", line 295, in step
    finished, result = self._child_stepper.step()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/workchains.py", line 538, in step
    finished, result = self._child_stepper.step()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/workchains.py", line 295, in step
    finished, result = self._child_stepper.step()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/workchains.py", line 246, in step
    return True, self._fn(self._workchain)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/processes/workchains/restart.py", line 197, in run_process
    node = self.submit(self.process_class, **inputs)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/processes/process.py", line 544, in submit
    return self.runner.submit(process, **kwargs)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/runners.py", line 183, in submit
    process_inited = self.instantiate_process(process, **inputs)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/runners.py", line 169, in instantiate_process
    return instantiate_process(self, process, **inputs)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/utils.py", line 64, in instantiate_process
    process = process_class(runner=runner, inputs=inputs)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/base/state_machine.py", line 195, in __call__
    call_with_super_check(inst.init)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/base/utils.py", line 31, in call_with_super_check
    wrapped(*args, **kwargs)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/processes/process.py", line 188, in init
    super().init()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/base/utils.py", line 16, in wrapper
    wrapped(self, *args, **kwargs)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/processes.py", line 309, in init
    identifier = self._communicator.add_rpc_subscriber(self.message_receive, identifier=str(self.pid))
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/communications.py", line 141, in add_rpc_subscriber
    return self._communicator.add_rpc_subscriber(converted, identifier)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/threadcomms.py", line 215, in add_rpc_subscriber
    return self._loop_scheduler.await_(
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/pytray/aiothreads.py", line 164, in await_
    return self.await_submit(awaitable).result(timeout=self.task_timeout)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/asyncio/tasks.py", line 258, in __step
    result = coro.throw(exc)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/pytray/aiothreads.py", line 178, in coro
    res = await awaitable
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/communicator.py", line 483, in add_rpc_subscriber
    identifier = await msg_subscriber.add_rpc_subscriber(subscriber, identifier)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/communicator.py", line 124, in add_rpc_subscriber
    rpc_queue = await self._channel.declare_queue(exclusive=True, arguments=self._rmq_queue_arguments)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aio_pika/robust_channel.py", line 173, in declare_queue
    queue = await super().declare_queue(
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aio_pika/channel.py", line 325, in declare_queue
    await queue.declare(timeout=timeout)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aio_pika/queue.py", line 92, in declare
    self.declaration_result = await asyncio.wait_for(
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/asyncio/tasks.py", line 442, in wait_for
    return await fut
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiormq/channel.py", line 703, in queue_declare
    return await self.rpc(
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiormq/base.py", line 168, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiormq/base.py", line 25, in __inner
    return await self.task
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/asyncio/futures.py", line 284, in __await__
    yield self  # This tells Task to wait for completion.
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/asyncio/tasks.py", line 328, in __wakeup
    future.result()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/asyncio/futures.py", line 201, in result
    raise self._exception
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/asyncio/tasks.py", line 256, in __step
    result = coro.send(None)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiormq/channel.py", line 121, in rpc
    raise ChannelInvalidStateError("writer is None")
aiormq.exceptions.ChannelInvalidStateError: writer is None

2023-12-08 17:00:53 [842574 | WARNING]: Process<3345273>: no connection available to broadcast state change from running to excepted
2023-12-08 17:00:53 [842575 |   ERROR]: Traceback (most recent call last):
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/processes.py", line 888, in on_close
    cleanup()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/communications.py", line 144, in remove_rpc_subscriber
    return self._communicator.remove_rpc_subscriber(identifier)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/threadcomms.py", line 221, in remove_rpc_subscriber
    return self._loop_scheduler.await_(self._communicator.remove_rpc_subscriber(identifier))
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/pytray/aiothreads.py", line 164, in await_
    return self.await_submit(awaitable).result(timeout=self.task_timeout)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/asyncio/tasks.py", line 258, in __step
    result = coro.throw(exc)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/pytray/aiothreads.py", line 178, in coro
    res = await awaitable
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/communicator.py", line 488, in remove_rpc_subscriber
    await msg_subscriber.remove_rpc_subscriber(identifier)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/communicator.py", line 141, in remove_rpc_subscriber
    await rpc_queue.cancel(identifier)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aio_pika/robust_queue.py", line 140, in cancel
    result = await super().cancel(consumer_tag, timeout, nowait)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aio_pika/queue.py", line 264, in cancel
    return await asyncio.wait_for(
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/asyncio/tasks.py", line 442, in wait_for
    return await fut
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiormq/channel.py", line 395, in basic_cancel
    return await self.rpc(
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiormq/base.py", line 168, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiormq/base.py", line 25, in __inner
    return await self.task
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/asyncio/futures.py", line 284, in __await__
    yield self  # This tells Task to wait for completion.
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/asyncio/tasks.py", line 328, in __wakeup
    future.result()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/asyncio/futures.py", line 201, in result
    raise self._exception
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/asyncio/tasks.py", line 256, in __step
    result = coro.send(None)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiormq/channel.py", line 121, in rpc
    raise ChannelInvalidStateError("writer is None")
aiormq.exceptions.ChannelInvalidStateError: writer is None

Ok, it seems this is related to Dependencies: Update requirement `kiwipy~=0.8.2` by sphuber Β· Pull Request #243 Β· aiidateam/plumpy Β· GitHub.

Yes, the problem seems to be that the daemon worker loses its connection to RabbitMQ. The aio-pika library has functionality to automatically restore it, but apparently that is either not working, or comes in too late. We are stuck on quite an outdated version. The PR you link updates all of this, but I haven’t been able to merge because there is a single test failing. The test seems inconsequential for normal use, so I think the branch may actually improve the problem you are seeing, but I haven’t had the courage yet to actually merge and release it. If you would be interested to test the branch, that would be great. But unfortunately this exception is difficult to reproduce consistently.

Update: neither of the following solutions are successful:

  • Update with the new branch
  • Use RabbitMQ with older version (3.7.*) through Docker
  • Use RabbitMQ on the same computer where AiiDA runs with more recent version (3.9.13) after applying the changes to the consumer timeout

The exceptions appear randomly, e.g.

SelfConsistentHubbardWorkChain<3353736> Finished [404] [1:while_(should_run_iteration)(6:inspect_hp)]
    β”œβ”€β”€ structure_reorder_kinds<3353754> Finished [0]
    β”œβ”€β”€ PwBaseWorkChain<3353770> Finished [0] [3:results]
    β”‚   β”œβ”€β”€ create_kpoints_from_distance<3353772> Finished [0]
    β”‚   └── PwCalculation<3353787> Finished [0]
    β”œβ”€β”€ PwBaseWorkChain<3353963> Finished [0] [3:results]
    β”‚   β”œβ”€β”€ create_kpoints_from_distance<3353964> Finished [0]
    β”‚   β”œβ”€β”€ PwCalculation<3353968> Finished [463]
    β”‚   β”œβ”€β”€ PwCalculation<3354036> Finished [463]
    β”‚   └── PwCalculation<3354228> Finished [0]
    └── HpWorkChain<3354260> Finished [300] [2:inspect_workchain]
        β”œβ”€β”€ create_kpoints_from_distance<3354262> Finished [0]
        └── HpParallelizeAtomsWorkChain<3354264> Excepted [3:inspect_atoms]
            β”œβ”€β”€ HpBaseWorkChain<3354266> Finished [0] [3:results]
            β”‚   └── HpCalculation<3354269> Finished [0]
            β”œβ”€β”€ HpBaseWorkChain<3354316> Finished [0] [3:results]
            β”‚   └── HpCalculation<3354322> Finished [0]
            β”œβ”€β”€ HpBaseWorkChain<3354319> Finished [0] [3:results]
            β”‚   └── HpCalculation<3354325> Finished [0]
            └── HpBaseWorkChain<3354625> Waiting [2:while_(should_run_process)(1:run_process)]
                └── HpCalculation<3354628> Waiting

The report message is as follows:

2023-12-13 14:37:34 [848870 | REPORT]: [3354264|HpParallelizeAtomsWorkChain|on_except]: Traceback (most recent call last):
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/base/state_machine.py", line 324, in transition_to
    self._enter_next_state(new_state)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/base/state_machine.py", line 388, in _enter_next_state
    self._fire_state_event(StateEventHook.ENTERED_STATE, last_state)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/base/state_machine.py", line 300, in _fire_state_event
    callback(self, hook, state)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/processes.py", line 334, in <lambda>
    lambda _s, _h, from_state: self.on_entered(cast(Optional[process_states.State], from_state)),
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/processes/process.py", line 434, in on_entered
    super().on_entered(from_state)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/processes.py", line 717, in on_entered
    self._communicator.broadcast_send(body=None, sender=self.pid, subject=subject)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/communications.py", line 175, in broadcast_send
    return self._communicator.broadcast_send(body, sender, subject, correlation_id)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/threadcomms.py", line 232, in broadcast_send
    result = self._loop_scheduler.await_(
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/pytray/aiothreads.py", line 164, in await_
    return self.await_submit(awaitable).result(timeout=self.task_timeout)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/asyncio/tasks.py", line 256, in __step
    result = coro.send(None)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/pytray/aiothreads.py", line 178, in coro
    res = await awaitable
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/communicator.py", line 520, in broadcast_send
    result = await publisher.broadcast_send(body, sender, subject, correlation_id)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/communicator.py", line 64, in broadcast_send
    return await self.publish(message, routing_key=defaults.BROADCAST_TOPIC, mandatory=False)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/messages.py", line 208, in publish
    result = await self._exchange.publish(message, routing_key=routing_key, mandatory=mandatory)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aio_pika/exchange.py", line 190, in publish
    return await self.channel.basic_publish(
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiormq/channel.py", line 638, in basic_publish
    async with countdown.enter_context(self.lock):
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiormq/channel.py", line 142, in lock
    raise ChannelInvalidStateError("%r closed" % self)
aiormq.exceptions.ChannelInvalidStateError: <Channel: "4" at 0x146dad6b9130> closed

Update: I now integrated the commit of GitHub - aiidateam/plumpy at fix/aio-pika, hopefully they will help.

Thanks @bastonero . I am actually thinking of making a branch that simply catches this exception and then just emits a warning. The code is just trying to broadcast a state changed, and that fails because the connection is not available. The broadcast is not critical though. The next time it changes state, it will simply emit it again. Even if it was the final state change and the process is terminating, there is still no problem. The calling process has a polling backup mechanism just for this case.

Actually, I just had a look, and I already added this on the main branch of plumpy: Catch `ChannelInvalidStateError` in process state change (#278) Β· aiidateam/plumpy@fda244a Β· GitHub

I haven’t released it yet, but perhaps you could try using that branch.

1 Like

Update: the warning appeared, but the workchain still excepted with the following traceback

  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/process_states.py", line 228, in execute
    result = self.run_fn(*self.args, **self.kwargs)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/processes/workchains/workchain.py", line 317, in _do_step
    finished, stepper_result = self._stepper.step()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/workchains.py", line 295, in step
    finished, result = self._child_stepper.step()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/workchains.py", line 538, in step
    finished, result = self._child_stepper.step()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/workchains.py", line 295, in step
    finished, result = self._child_stepper.step()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/workchains.py", line 438, in step
    finished, retval = self._child_stepper.step()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/workchains.py", line 295, in step
    finished, result = self._child_stepper.step()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/workchains.py", line 246, in step
    return True, self._fn(self._workchain)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida_quantumespresso_hp/workflows/hubbard.py", line 431, in run_relax
    running = self.submit(PwRelaxWorkChain, **inputs)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/processes/process.py", line 542, in submit
    return self.runner.submit(process, **kwargs)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/runners.py", line 185, in submit
    process_inited = self.instantiate_process(process, **inputs)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/runners.py", line 171, in instantiate_process
    return instantiate_process(self, process, **inputs)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/utils.py", line 64, in instantiate_process
    process = process_class(runner=runner, inputs=inputs)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/base/state_machine.py", line 195, in __call__
    call_with_super_check(inst.init)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/base/utils.py", line 31, in call_with_super_check
    wrapped(*args, **kwargs)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/processes/process.py", line 188, in init
    super().init()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/base/utils.py", line 16, in wrapper
    wrapped(self, *args, **kwargs)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/processes.py", line 309, in init
    identifier = self._communicator.add_rpc_subscriber(self.message_receive, identifier=str(self.pid))
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/communications.py", line 141, in add_rpc_subscriber
    return self._communicator.add_rpc_subscriber(converted, identifier)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/threadcomms.py", line 189, in add_rpc_subscriber
    return self._loop_scheduler.await_(
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/pytray/aiothreads.py", line 164, in await_
    return self.await_submit(awaitable).result(timeout=self.task_timeout)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/asyncio/tasks.py", line 256, in __step
    result = coro.send(None)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/pytray/aiothreads.py", line 178, in coro
    res = await awaitable
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/communicator.py", line 480, in add_rpc_subscriber
    identifier = await msg_subscriber.add_rpc_subscriber(subscriber, identifier)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/communicator.py", line 121, in add_rpc_subscriber
    rpc_queue = await self._channel.declare_queue(exclusive=True, arguments=self._rmq_queue_arguments)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aio_pika/robust_channel.py", line 193, in declare_queue
    queue: RobustQueue = await super().declare_queue(   # type: ignore
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aio_pika/channel.py", line 324, in declare_queue
    channel=self.channel,
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aio_pika/channel.py", line 131, in channel
    raise aiormq.exceptions.ChannelInvalidStateError(
aiormq.exceptions.ChannelInvalidStateError: Channel has been closed

2023-12-14 11:28:19 [851419 | WARNING]: Process<3357906>: no connection available to broadcast state change from running to excepted
2023-12-14 11:28:19 [851420 |   ERROR]: Traceback (most recent call last):
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/processes.py", line 888, in on_close
    cleanup()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/communications.py", line 144, in remove_rpc_subscriber
    return self._communicator.remove_rpc_subscriber(identifier)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/threadcomms.py", line 195, in remove_rpc_subscriber
    return self._loop_scheduler.await_(self._communicator.remove_rpc_subscriber(identifier))
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/pytray/aiothreads.py", line 164, in await_
    return self.await_submit(awaitable).result(timeout=self.task_timeout)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/asyncio/tasks.py", line 258, in __step
    result = coro.throw(exc)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/pytray/aiothreads.py", line 178, in coro
    res = await awaitable
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/communicator.py", line 485, in remove_rpc_subscriber
    await msg_subscriber.remove_rpc_subscriber(identifier)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/communicator.py", line 138, in remove_rpc_subscriber
    await rpc_queue.cancel(identifier)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aio_pika/robust_queue.py", line 145, in cancel
    result = await super().cancel(consumer_tag, timeout, nowait)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aio_pika/queue.py", line 255, in cancel
    return await self.channel.basic_cancel(
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiormq/channel.py", line 484, in basic_cancel
    return await self.rpc(
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiormq/base.py", line 164, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiormq/abc.py", line 40, in __inner
    return await self.task
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/asyncio/futures.py", line 284, in __await__
    yield self  # This tells Task to wait for completion.
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/asyncio/tasks.py", line 328, in __wakeup
    future.result()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/asyncio/futures.py", line 201, in result
    raise self._exception
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/asyncio/tasks.py", line 256, in __step
    result = coro.send(None)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiormq/channel.py", line 163, in rpc
    lock = self.lock
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiormq/channel.py", line 142, in lock
    raise ChannelInvalidStateError("%r closed" % self)
aiormq.exceptions.ChannelInvalidStateError: <Channel: "3" at 0x14b5b05ffb80> closed

An other interesting one. As above, but with a new unseen error after that:

2023-12-14 12:20:25 [851579 |   ERROR]: Traceback (most recent call last):
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/utils/managers.py", line 91, in _get_node_by_link_label
    node = attribute_dict[label]
KeyError: 'remote_folder'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/utils/managers.py", line 140, in __getattr__
    return self._get_node_by_link_label(label=name)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/utils/managers.py", line 119, in _get_node_by_link_label
    raise NotExistent from exception
aiida.common.exceptions.NotExistent

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/base/state_machine.py", line 333, in transition_to
    call_with_super_check(self.on_terminated)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/base/utils.py", line 31, in call_with_super_check
    wrapped(*args, **kwargs)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida_quantumespresso_hp/workflows/hp/parallelize_atoms.py", line 141, in on_terminated
    called_descendant.outputs.remote_folder._clean()  # pylint: disable=protected-access
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/utils/managers.py", line 146, in __getattr__
    raise NotExistentAttributeError(
aiida.common.exceptions.NotExistentAttributeError: Node<3359304> does not have an output with link label 'remote_folder'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/processes/process.py", line 443, in on_terminated
    self.runner.persister.delete_checkpoint(self.pid)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/persistence.py", line 152, in delete_checkpoint
    calc.delete_checkpoint()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/nodes/process/process.py", line 515, in delete_checkpoint
    self.base.attributes.delete(self.CHECKPOINT_KEY)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/nodes/attributes.py", line 152, in delete
    self._node._check_mutability_attributes([key])  # pylint: disable=protected-access
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/utils/mixins.py", line 211, in _check_mutability_attributes
    raise exceptions.ModificationNotAllowed('attributes of a sealed node are immutable')
aiida.common.exceptions.ModificationNotAllowed: attributes of a sealed node are immutable

Thanks @bastonero , can you verify that 3359304 really doesn’t have the remote_folder output. What is its final status? Did it finish nominally? And then what is the full report for the parallelize_atoms workchain that raised the exception?

The calculatiion has the remote_folder, I guess probably the parallel_atoms workchian excepted before, hence cleaning the remote folders, so probably that’s why it raised this further exceptions (?).

Re the parallelize_atoms, here it is:

2023-12-14 12:01:44 [851504 |  REPORT]: [3359246|HpParallelizeAtomsWorkChain|run_init]: launched initialization HpBaseWorkChain<3359248>
2023-12-14 12:01:45 [851505 |  REPORT]:   [3359248|HpBaseWorkChain|run_process]: launching HpCalculation<3359251> iteration #1
2023-12-14 12:04:16 [851512 |  REPORT]:   [3359248|HpBaseWorkChain|results]: work chain completed after 1 iterations
2023-12-14 12:04:16 [851513 |  REPORT]: [3359246|HpParallelizeAtomsWorkChain|run_atoms]: launched HpBaseWorkChain<3359259> for atomic site 1 of kind Fe
2023-12-14 12:04:17 [851514 |  REPORT]:   [3359259|HpBaseWorkChain|run_process]: launching HpCalculation<3359262> iteration #1
2023-12-14 12:04:18 [851515 |  REPORT]:   [3359248|HpBaseWorkChain|on_terminated]: cleaned remote folders of calculations: 3359251
2023-12-14 12:04:20 [851516 |  REPORT]:   [3359248|HpBaseWorkChain|on_terminated]: cleaned remote folders of calculations: 3359251
2023-12-14 12:20:05 [851569 |  REPORT]:   [3359259|HpBaseWorkChain|results]: work chain completed after 1 iterations
2023-12-14 12:20:07 [851570 |  REPORT]:   [3359259|HpBaseWorkChain|on_terminated]: cleaned remote folders of calculations: 3359262
2023-12-14 12:20:09 [851571 |  REPORT]:   [3359259|HpBaseWorkChain|on_terminated]: cleaned remote folders of calculations: 3359262
2023-12-14 12:20:09 [851572 |  REPORT]: [3359246|HpParallelizeAtomsWorkChain|run_final]: launched HpBaseWorkChain<3359301> to collect matrices
2023-12-14 12:20:10 [851573 |  REPORT]:   [3359301|HpBaseWorkChain|run_process]: launching HpCalculation<3359304> iteration #1
2023-12-14 12:20:22 [851574 | WARNING]: Process<3359246>: no connection available to broadcast state change from waiting to running
2023-12-14 12:20:22 [851575 | WARNING]: Process<3359246>: no connection available to broadcast state change from running to running
2023-12-14 12:20:22 [851576 |  REPORT]: [3359246|HpParallelizeAtomsWorkChain|on_except]: Traceback (most recent call last):
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/process_states.py", line 228, in execute
    result = self.run_fn(*self.args, **self.kwargs)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/processes/workchains/workchain.py", line 317, in _do_step
    finished, stepper_result = self._stepper.step()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/workchains.py", line 295, in step
    finished, result = self._child_stepper.step()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/workchains.py", line 246, in step
    return True, self._fn(self._workchain)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida_quantumespresso_hp/workflows/hp/parallelize_atoms.py", line 112, in run_final
    node = self.submit(HpBaseWorkChain, **inputs)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/processes/process.py", line 542, in submit
    return self.runner.submit(process, **kwargs)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/runners.py", line 185, in submit
    process_inited = self.instantiate_process(process, **inputs)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/runners.py", line 171, in instantiate_process
    return instantiate_process(self, process, **inputs)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/utils.py", line 64, in instantiate_process
    process = process_class(runner=runner, inputs=inputs)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/base/state_machine.py", line 195, in __call__
    call_with_super_check(inst.init)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/base/utils.py", line 31, in call_with_super_check
    wrapped(*args, **kwargs)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/processes/process.py", line 188, in init
    super().init()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/base/utils.py", line 16, in wrapper
    wrapped(self, *args, **kwargs)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/processes.py", line 309, in init
    identifier = self._communicator.add_rpc_subscriber(self.message_receive, identifier=str(self.pid))
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/communications.py", line 141, in add_rpc_subscriber
    return self._communicator.add_rpc_subscriber(converted, identifier)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/threadcomms.py", line 189, in add_rpc_subscriber
    return self._loop_scheduler.await_(
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/pytray/aiothreads.py", line 164, in await_
    return self.await_submit(awaitable).result(timeout=self.task_timeout)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/asyncio/tasks.py", line 256, in __step
    result = coro.send(None)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/pytray/aiothreads.py", line 178, in coro
    res = await awaitable
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/communicator.py", line 480, in add_rpc_subscriber
    identifier = await msg_subscriber.add_rpc_subscriber(subscriber, identifier)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/communicator.py", line 121, in add_rpc_subscriber
    rpc_queue = await self._channel.declare_queue(exclusive=True, arguments=self._rmq_queue_arguments)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aio_pika/robust_channel.py", line 193, in declare_queue
    queue: RobustQueue = await super().declare_queue(   # type: ignore
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aio_pika/channel.py", line 324, in declare_queue
    channel=self.channel,
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aio_pika/channel.py", line 131, in channel
    raise aiormq.exceptions.ChannelInvalidStateError(
aiormq.exceptions.ChannelInvalidStateError: Channel has been closed

2023-12-14 12:20:22 [851577 | WARNING]: Process<3359246>: no connection available to broadcast state change from running to excepted
2023-12-14 12:20:22 [851578 |   ERROR]: Traceback (most recent call last):
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/processes.py", line 888, in on_close
    cleanup()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/communications.py", line 144, in remove_rpc_subscriber
    return self._communicator.remove_rpc_subscriber(identifier)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/threadcomms.py", line 195, in remove_rpc_subscriber
    return self._loop_scheduler.await_(self._communicator.remove_rpc_subscriber(identifier))
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/pytray/aiothreads.py", line 164, in await_
    return self.await_submit(awaitable).result(timeout=self.task_timeout)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/asyncio/tasks.py", line 258, in __step
    result = coro.throw(exc)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/pytray/aiothreads.py", line 178, in coro
    res = await awaitable
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/communicator.py", line 485, in remove_rpc_subscriber
    await msg_subscriber.remove_rpc_subscriber(identifier)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/communicator.py", line 138, in remove_rpc_subscriber
    await rpc_queue.cancel(identifier)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aio_pika/robust_queue.py", line 145, in cancel
    result = await super().cancel(consumer_tag, timeout, nowait)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aio_pika/queue.py", line 255, in cancel
    return await self.channel.basic_cancel(
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiormq/channel.py", line 484, in basic_cancel
    return await self.rpc(
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiormq/base.py", line 164, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiormq/abc.py", line 40, in __inner
    return await self.task
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/asyncio/futures.py", line 284, in __await__
    yield self  # This tells Task to wait for completion.
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/asyncio/tasks.py", line 328, in __wakeup
    future.result()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/asyncio/futures.py", line 201, in result
    raise self._exception
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/asyncio/tasks.py", line 256, in __step
    result = coro.send(None)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiormq/channel.py", line 163, in rpc
    lock = self.lock
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiormq/channel.py", line 142, in lock
    raise ChannelInvalidStateError("%r closed" % self)
aiormq.exceptions.ChannelInvalidStateError: <Channel: "3" at 0x146ed0da0950> closed

2023-12-14 12:20:25 [851579 |   ERROR]: Traceback (most recent call last):
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/utils/managers.py", line 91, in _get_node_by_link_label
    node = attribute_dict[label]
KeyError: 'remote_folder'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/utils/managers.py", line 140, in __getattr__
    return self._get_node_by_link_label(label=name)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/utils/managers.py", line 119, in _get_node_by_link_label
    raise NotExistent from exception
aiida.common.exceptions.NotExistent

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/base/state_machine.py", line 333, in transition_to
    call_with_super_check(self.on_terminated)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/base/utils.py", line 31, in call_with_super_check
    wrapped(*args, **kwargs)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida_quantumespresso_hp/workflows/hp/parallelize_atoms.py", line 141, in on_terminated
    called_descendant.outputs.remote_folder._clean()  # pylint: disable=protected-access
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/utils/managers.py", line 146, in __getattr__
    raise NotExistentAttributeError(
aiida.common.exceptions.NotExistentAttributeError: Node<3359304> does not have an output with link label 'remote_folder'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/processes/process.py", line 443, in on_terminated
    self.runner.persister.delete_checkpoint(self.pid)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/persistence.py", line 152, in delete_checkpoint
    calc.delete_checkpoint()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/nodes/process/process.py", line 515, in delete_checkpoint
    self.base.attributes.delete(self.CHECKPOINT_KEY)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/nodes/attributes.py", line 152, in delete
    self._node._check_mutability_attributes([key])  # pylint: disable=protected-access
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/utils/mixins.py", line 211, in _check_mutability_attributes
    raise exceptions.ModificationNotAllowed('attributes of a sealed node are immutable')
aiida.common.exceptions.ModificationNotAllowed: attributes of a sealed node are immutable

2023-12-14 13:13:18 [851700 |  REPORT]:   [3359307|HpBaseWorkChain|run_process]: launching HpCalculation<3359340> iteration #1
2023-12-14 13:17:28 [851733 |  REPORT]:   [3359307|HpBaseWorkChain|results]: work chain completed after 1 iterations
2023-12-14 13:17:28 [851734 |  REPORT]:   [3359307|HpBaseWorkChain|on_terminated]: remote folders will not be cleaned
2023-12-14 13:17:28 [851735 |  REPORT]:   [3359307|HpBaseWorkChain|on_terminated]: remote folders will not be cleaned

Ok thanks, I see the problem here is again due to a connection problem to RabbitMQ but this time not because a process is trying to send a broadcast, but because it is trying to add itself as an RPC subscriber. Like the previous fix, we could also catch and just log this exception. It is not critical for the actual completion of the process, but it will not be able to respond to play, pause and kill instructions, which are RPCs (remote procedure calls). I think that is nowhere near as bad as having the entire workchain stack fallover though. I will update plumpy with a similar fix and you can try that.

The exception due to the missing output folder is indeed a consequence then. Due to the exception, it is going to on_terminated which contains the cleanup, but it is done too soon before the subprocesses have finished. Solving the first issue will solve this issue.

Thanks @sphuber ! Indeed, also the first example I reported tried to do that before excepting:

2023-12-14 11:28:19 [851416 | WARNING]: Process<3357906>: no connection available to broadcast state change from running to running
2023-12-14 11:28:19 [851417 | WARNING]: Process<3357906>: no connection available to broadcast state change from running to running
2023-12-14 11:28:19 [851418 |  REPORT]: [3357906|SelfConsistentHubbardWorkChain|on_except]: Traceback (most recent call last):
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/process_states.py", line 228, in execute
[...]

Made the update in this branch: GitHub - aiidateam/plumpy at fix/channel-invalid-state

Would be great if you could try it. Hopefully that catches most locations where these connection exceptions can crop up.

1 Like

Thanks again @sphuber for the amazing support. I installed the update, and now running again. So, in principle if all the exception are correctly caught, you said that the processes will be unresponsive: does this mean that the process won’t be able to proceed further, correct? So, one should revive the processes via the verdi devel rabbitmq tasks analyze --fix? Or will it proceed to completion either way?

Not quite. The tasks fix command is there if the task for the process got lost in the process queue and so it is not going to be run by any daemon worker. But these exceptions we are seeing are when the process is actually running and while being run, fails to connect to RabbitMQ. These actions are often not crucial for the process eventually finishing, so we don’t want them to fully except the process. We will just β€œignore” (and log) them and continue without them.

I essentially put try/except statements for closed connection problems around 4 types of code during the processes’ lifetime:

  • registering itself as an RPC subscriber
  • registering itself as a broadcast subscriber
  • unregistering itself as an RPC subscriber
  • unregistering itself as a broadcast subscriber

The first two are done when the process is started in Process.init and the other two are the β€œcleanups”, which are essentially undoing the registration.

The RPC subscriber is necessary for the process to be able to receive and execute requests to play, pause or kill itself. So if that subscription fails, you won’t be able to do that.

The broadcast subscriber is there such that it can receive broadcast messages, for example when a child process broadcasts that it has changed its state. The process can listen for this in order to determine when it should continue, but it has a polling backup mechanism just for this case where it misses the broadcast.

So concretely, I think ignoring these errors don’t have any real downsides, except that if it is the RPC subscriber, the play/pause/kill commands won’t find their way to the process. But even this could be fixed if really need be by restarting the daemon workers. This will force the process to be reinited, where it will run the same code again to subscribe it as an RPC and broadcast subscriber.

Hope that clears things up.

1 Like

Thanks again for the clarification - not really into the design of the daemon, but after reading the SI of the paper I start having a gist of it.

Unfortunately, the changes are not enough. I sent you over Slack the full report in a txt file - it seems I cannot attach files (?).

Ok, I decided to really start and try to attack the problem at the roots and deal with the ChannelInvalidStateError directly. If a command sent over a channel fails, the channel is closed. If a client keeps sending frames over a closed channel to RabbitMQ, it will close the connection. Since RabbitMQ operates mutliple channels over a single connection, aio-pika protects the connection by raising the ChannelInvalidStateError when it is used in a closed state. Apparently, this use case is not automatically taken care of by the RobustChannel to automatically reopen and retry the failed command, so we have to take care of this ourselves.

After a lot of digging and testing, I have something that might work. I tested it with a very simple minimal working example where I forcibly kill the connection myself, and it automatically recovers from the channel error. However, it is difficult to see whether this will work when integrated into AiiDA. The branch is here: GitHub - aiidateam/kiwipy at fix/auto-recover-channel-invalid-state-error
It is based off of the last released version of kiwipy, which I think you were already on, so I think you should be able to checkout the branch, install it and run it.

It would be great if you could give it a spin since you seem to have a system where it is easy to reproduce the problem. Let me know how it goes

1 Like

Here I am! I think the new code improved a lot the situation, nevertheless I got this exception from time to time:

Warning: You are currently using a post release development version of AiiDA: 2.4.0.post0
Warning: Be aware that this is not recommended for production and is not officially supported.
Warning: Databases used with this version may not be compatible with future releases of AiiDA
Warning: as you might not be able to automatically migrate your data.

*** 3367869: None
*** Scheduler output: N/A
*** Scheduler errors: N/A
*** 19 LOG MESSAGES:
+-> WARNING at 2023-12-16 04:19:24.196921+01:00
 | Process<3367869>: sending broadcast of state change from waiting to waiting timed out
+-> WARNING at 2023-12-16 04:19:26.046277+01:00
 | Process<3367869>: sending broadcast of state change from waiting to waiting timed out
+-> WARNING at 2023-12-16 04:20:06.422318+01:00
 | Process<3367869>: sending broadcast of state change from waiting to waiting timed out
+-> WARNING at 2023-12-16 04:20:09.747224+01:00
 | Process<3367869>: sending broadcast of state change from waiting to waiting timed out
+-> WARNING at 2023-12-16 04:20:48.272721+01:00
 | Process<3367869>: sending broadcast of state change from waiting to running timed out
+-> WARNING at 2023-12-16 04:20:50.883049+01:00
 | Process<3367869>: sending broadcast of state change from waiting to running timed out
+-> WARNING at 2023-12-16 04:19:25.594462+01:00
 | Process<3367869>: sending broadcast of state change from waiting to waiting timed out
+-> WARNING at 2023-12-16 04:20:06.624977+01:00
 | Process<3367869>: sending broadcast of state change from waiting to waiting timed out
+-> WARNING at 2023-12-16 04:20:48.252557+01:00
 | Process<3367869>: sending broadcast of state change from waiting to running timed out
+-> REPORT at 2023-12-16 04:20:48.293784+01:00
 | [3367869|PwCalculation|on_except]: Traceback (most recent call last):
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/process_states.py", line 228, in execute
 |     result = self.run_fn(*self.args, **self.kwargs)
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/processes/calcjobs/calcjob.py", line 660, in parse
 |     retrieved = self.node.outputs.retrieved
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/utils/managers.py", line 140, in __getattr__
 |     return self._get_node_by_link_label(label=name)
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/utils/managers.py", line 89, in _get_node_by_link_label
 |     attribute_dict = self._construct_attribute_dict(self._incoming)
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/utils/managers.py", line 61, in _construct_attribute_dict
 |     return AttributeDict(links.nested())
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/utils/links.py", line 372, in nested
 |     raise KeyError(f"duplicate label '{port_name}' in namespace '{'.'.join(port_namespaces)}'")
 | KeyError: "duplicate label 'retrieved' in namespace ''"
+-> WARNING at 2023-12-16 04:20:58.316668+01:00
 | Process<3367869>: sending broadcast of state change from running to excepted timed out
+-> REPORT at 2023-12-16 04:20:48.303827+01:00
 | [3367869|PwCalculation|on_except]: Traceback (most recent call last):
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/process_states.py", line 228, in execute
 |     result = self.run_fn(*self.args, **self.kwargs)
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/processes/calcjobs/calcjob.py", line 660, in parse
 |     retrieved = self.node.outputs.retrieved
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/utils/managers.py", line 140, in __getattr__
 |     return self._get_node_by_link_label(label=name)
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/utils/managers.py", line 89, in _get_node_by_link_label
 |     attribute_dict = self._construct_attribute_dict(self._incoming)
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/utils/managers.py", line 61, in _construct_attribute_dict
 |     return AttributeDict(links.nested())
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/utils/links.py", line 372, in nested
 |     raise KeyError(f"duplicate label '{port_name}' in namespace '{'.'.join(port_namespaces)}'")
 | KeyError: "duplicate label 'retrieved' in namespace ''"
+-> WARNING at 2023-12-16 04:20:58.328951+01:00
 | Process<3367869>: sending broadcast of state change from running to excepted timed out
+-> ERROR at 2023-12-16 04:21:08.341994+01:00
 | Traceback (most recent call last):
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/pytray/aiothreads.py", line 164, in await_
 |     return self.await_submit(awaitable).result(timeout=self.task_timeout)
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/concurrent/futures/_base.py", line 448, in result
 |     raise TimeoutError()
 | concurrent.futures._base.TimeoutError
 | 
 | The above exception was the direct cause of the following exception:
 | 
 | Traceback (most recent call last):
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/processes.py", line 888, in on_close
 |     cleanup()
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/communications.py", line 144, in remove_rpc_subscriber
 |     return self._communicator.remove_rpc_subscriber(identifier)
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/threadcomms.py", line 195, in remove_rpc_subscriber
 |     return self._loop_scheduler.await_(self._communicator.remove_rpc_subscriber(identifier))
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/pytray/aiothreads.py", line 168, in await_
 |     raise concurrent.futures.TimeoutError(
 | concurrent.futures._base.TimeoutError: remove_rpc_subscriber after 10 seconds
+-> REPORT at 2023-12-16 04:20:50.930874+01:00
 | [3367869|PwCalculation|on_except]: Traceback (most recent call last):
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/process_states.py", line 228, in execute
 |     result = self.run_fn(*self.args, **self.kwargs)
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/processes/calcjobs/calcjob.py", line 660, in parse
 |     retrieved = self.node.outputs.retrieved
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/utils/managers.py", line 140, in __getattr__
 |     return self._get_node_by_link_label(label=name)
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/utils/managers.py", line 89, in _get_node_by_link_label
 |     attribute_dict = self._construct_attribute_dict(self._incoming)
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/utils/managers.py", line 61, in _construct_attribute_dict
 |     return AttributeDict(links.nested())
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/utils/links.py", line 372, in nested
 |     raise KeyError(f"duplicate label '{port_name}' in namespace '{'.'.join(port_namespaces)}'")
 | KeyError: "duplicate label 'retrieved' in namespace ''"
+-> WARNING at 2023-12-16 04:21:00.966886+01:00
 | Process<3367869>: sending broadcast of state change from running to excepted timed out
+-> ERROR at 2023-12-16 04:21:10.981476+01:00
 | Traceback (most recent call last):
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/pytray/aiothreads.py", line 164, in await_
 |     return self.await_submit(awaitable).result(timeout=self.task_timeout)
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/concurrent/futures/_base.py", line 448, in result
 |     raise TimeoutError()
 | concurrent.futures._base.TimeoutError
 | 
 | The above exception was the direct cause of the following exception:
 | 
 | Traceback (most recent call last):
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/processes.py", line 888, in on_close
 |     cleanup()
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/communications.py", line 144, in remove_rpc_subscriber
 |     return self._communicator.remove_rpc_subscriber(identifier)
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/threadcomms.py", line 195, in remove_rpc_subscriber
 |     return self._loop_scheduler.await_(self._communicator.remove_rpc_subscriber(identifier))
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/pytray/aiothreads.py", line 168, in await_
 |     raise concurrent.futures.TimeoutError(
 | concurrent.futures._base.TimeoutError: remove_rpc_subscriber after 10 seconds
+-> ERROR at 2023-12-16 04:21:11.010073+01:00
 | Traceback (most recent call last):
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/processes/process.py", line 443, in on_terminated
 |     self.runner.persister.delete_checkpoint(self.pid)
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/persistence.py", line 152, in delete_checkpoint
 |     calc.delete_checkpoint()
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/nodes/process/process.py", line 515, in delete_checkpoint
 |     self.base.attributes.delete(self.CHECKPOINT_KEY)
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/nodes/attributes.py", line 152, in delete
 |     self._node._check_mutability_attributes([key])  # pylint: disable=protected-access
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/utils/mixins.py", line 211, in _check_mutability_attributes
 |     raise exceptions.ModificationNotAllowed('attributes of a sealed node are immutable')
 | aiida.common.exceptions.ModificationNotAllowed: attributes of a sealed node are immutable
+-> ERROR at 2023-12-16 04:21:08.337683+01:00
 | Traceback (most recent call last):
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/pytray/aiothreads.py", line 164, in await_
 |     return self.await_submit(awaitable).result(timeout=self.task_timeout)
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/concurrent/futures/_base.py", line 448, in result
 |     raise TimeoutError()
 | concurrent.futures._base.TimeoutError
 | 
 | The above exception was the direct cause of the following exception:
 | 
 | Traceback (most recent call last):
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/processes.py", line 888, in on_close
 |     cleanup()
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/communications.py", line 144, in remove_rpc_subscriber
 |     return self._communicator.remove_rpc_subscriber(identifier)
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/threadcomms.py", line 195, in remove_rpc_subscriber
 |     return self._loop_scheduler.await_(self._communicator.remove_rpc_subscriber(identifier))
 |   File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/pytray/aiothreads.py", line 168, in await_
 |     raise concurrent.futures.TimeoutError(
 | concurrent.futures._base.TimeoutError: remove_rpc_subscriber after 10 seconds

I also get the following for many processes:

2023-12-15 18:31:55 [856650 |  REPORT]: [3368816|PhBaseWorkChain|run_process]: launching PhCalculation<3369264> iteration #1
2023-12-16 11:42:07 [858193 | WARNING]: Process<3368816>: sending broadcast of state change from waiting to running timed out
2023-12-16 11:42:11 [858194 | WARNING]: Process<3368816>: sending broadcast of state change from waiting to running timed out
2023-12-16 11:42:17 [858195 | WARNING]: Process<3368816>: sending broadcast of state change from running to running timed out
2023-12-16 11:42:21 [858196 | WARNING]: Process<3368816>: sending broadcast of state change from running to running timed out
2023-12-16 11:42:27 [858197 | WARNING]: Process<3368816>: sending broadcast of state change from running to running timed out
2023-12-16 11:42:31 [858198 | WARNING]: Process<3368816>: sending broadcast of state change from running to running timed out
2023-12-16 11:42:37 [858199 | WARNING]: Process<3368816>: sending broadcast of state change from running to running timed out
2023-12-16 11:42:37 [858200 |  REPORT]: [3368816|PhBaseWorkChain|results]: work chain completed after 1 iterations
2023-12-16 11:42:41 [858201 | WARNING]: Process<3368816>: sending broadcast of state change from running to running timed out
2023-12-16 11:42:41 [858202 |  REPORT]: [3368816|PhBaseWorkChain|results]: work chain completed after 1 iterations
2023-12-16 11:42:47 [858203 | WARNING]: Process<3368816>: sending broadcast of state change from running to finished timed out
2023-12-16 11:42:51 [858204 | WARNING]: Process<3368816>: sending broadcast of state change from running to finished timed out
2023-12-16 11:42:57 [858205 |   ERROR]: Traceback (most recent call last):
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/pytray/aiothreads.py", line 164, in await_
    return self.await_submit(awaitable).result(timeout=self.task_timeout)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/concurrent/futures/_base.py", line 448, in result
    raise TimeoutError()
concurrent.futures._base.TimeoutError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/processes.py", line 888, in on_close
    cleanup()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/communications.py", line 144, in remove_rpc_subscriber
    return self._communicator.remove_rpc_subscriber(identifier)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/threadcomms.py", line 195, in remove_rpc_subscriber
    return self._loop_scheduler.await_(self._communicator.remove_rpc_subscriber(identifier))
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/pytray/aiothreads.py", line 168, in await_
    raise concurrent.futures.TimeoutError(
concurrent.futures._base.TimeoutError: remove_rpc_subscriber after 10 seconds

2023-12-16 11:42:58 [858206 |  REPORT]: [3368816|PhBaseWorkChain|on_terminated]: remote folders will not be cleaned
2023-12-16 11:43:01 [858207 |   ERROR]: Traceback (most recent call last):
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/pytray/aiothreads.py", line 164, in await_
    return self.await_submit(awaitable).result(timeout=self.task_timeout)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/concurrent/futures/_base.py", line 448, in result
    raise TimeoutError()
concurrent.futures._base.TimeoutError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/processes.py", line 888, in on_close
    cleanup()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/plumpy/communications.py", line 144, in remove_rpc_subscriber
    return self._communicator.remove_rpc_subscriber(identifier)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/threadcomms.py", line 195, in remove_rpc_subscriber
    return self._loop_scheduler.await_(self._communicator.remove_rpc_subscriber(identifier))
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/pytray/aiothreads.py", line 168, in await_
    raise concurrent.futures.TimeoutError(
concurrent.futures._base.TimeoutError: remove_rpc_subscriber after 10 seconds

2023-12-16 11:43:01 [858208 |   ERROR]: Traceback (most recent call last):
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/processes/process.py", line 443, in on_terminated
    self.runner.persister.delete_checkpoint(self.pid)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/engine/persistence.py", line 152, in delete_checkpoint
    calc.delete_checkpoint()
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/nodes/process/process.py", line 515, in delete_checkpoint
    self.base.attributes.delete(self.CHECKPOINT_KEY)
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/nodes/attributes.py", line 152, in delete
    self._node._check_mutability_attributes([key])  # pylint: disable=protected-access
  File "/home/bastonero/.conda/envs/aiida/lib/python3.9/site-packages/aiida/orm/utils/mixins.py", line 211, in _check_mutability_attributes
    raise exceptions.ModificationNotAllowed('attributes of a sealed node are immutable')
aiida.common.exceptions.ModificationNotAllowed: attributes of a sealed node are immutable

2023-12-16 11:43:01 [858209 |  REPORT]: [3368816|PhBaseWorkChain|on_terminated]: remote folders will not be cleaned

I am running 48 workchains from an aiida-core-with-services container. One of the calculations in the workchain is excepted for some of the 48 workchains but finished for the others. The cause of exception is

 |   File "/opt/conda/lib/python3.9/site-packages/aiormq/channel.py", line 121, in rpc
 |     raise ChannelInvalidStateError("writer is None")
 | aiormq.exceptions.ChannelInvalidStateError: writer is None
+-> ERROR at 2024-02-26 09:00:47.123195+00:00
 | Traceback (most recent call last):
 |   File "/opt/conda/lib/python3.9/site-packages/aiida/engine/utils.py", line 202, in exponential_backoff_retry
 |     result = await coro()
 |   File "/opt/conda/lib/python3.9/site-packages/aiida/engine/processes/calcjobs/tasks.py", line 145, in do_submit
 |     return execmanager.submit_calculation(node, transport)
 |   File "/opt/conda/lib/python3.9/site-packages/aiida/engine/daemon/execmanager.py", line 378, in submit_calculation
 |     calculation.set_job_id(result)
 |   File "/opt/conda/lib/python3.9/site-packages/aiida/orm/nodes/process/calculation/calcjob.py", line 306, in set_job_id
 |     return self.base.attributes.set(self.SCHEDULER_JOB_ID_KEY, str(job_id))
 |   File "/opt/conda/lib/python3.9/site-packages/aiida/orm/nodes/attributes.py", line 117, in set
 |     self._node._check_mutability_attributes([key])
 |   File "/opt/conda/lib/python3.9/site-packages/aiida/orm/utils/mixins.py", line 207, in _check_mutability_attributes
 |     raise exceptions.ModificationNotAllowed('attributes of a sealed node are immutable')
 | aiida.common.exceptions.ModificationNotAllowed: attributes of a sealed node are immutable

I am running 4 daemons to avoid overwhelming of daemons. My environment has

aiida-core 2.5.1
plumpy 0.21.10
kiwipy 0.7.7

This issue seems similar to the one in this thread. I was wondering if a solution was finalized for it?