Await_condition

Hi all,
Do you have any suggestions for the deprecated await_condition?
When two workchains are running, can the second workchain be made to wait until some steps of the first workchain have been completed?
When the necessary steps of the first workchain are done, some data become available in a database and the second workchain can continue…
Thank you.

Hi Hossein,

just to be sure that I correctly understand your use case. You want to submit two WorkChains, let’s say A and B (with steps a1, a2,... and b1, b2, ...), as outlined below. Is this correct?

A
  - a1
  - a2
  - a3


B
  - b1
  - b2
  - b3
  - b4

It’s of course possible to make the steps within each WorkChain to wait for each other, e.g., a2 for a1, but I assume this is not what you’re asking for.
On the other hand, if you submit A and B as sub-WorkChains within another top-level one (or even “independently”), to the best of my knowledge, it’s not possible to achieve that b2 will wait at some point for a1 to finish. Again, I hope I understood your use-case correctly.
I think that you would have to define all the steps that have to potentially wait for each other as individual blocks that are part of one common top level WorkChain, then you should be able to orchestrate this.

Our more recent AiiDA WorkGraph (Quick Start - AiiDA WorkGraph documentation) provides more flexibility with the concepts of tasks that you can combine and link. Maybe it’s possible (if it’s not possible with the WorkChain or at least simpler) to use a WorkGraph for this purpose. Pinging @Xing since he will know it.

Hi @t-reents , thanks for looping me in! I agree with your points on the WorkChains.
For WorkGraph, one can achieve the “wait-for-a1-before-b2” behavior in a couple of ways:

Please install the latest WorkGraph from the source

git clone https://github.com/aiidateam/aiida-workgraph
cd aiida-workgraph
pip install -e .

Option 1: Wrap WorkChain B in a WorkGraph

If you’d rather leave your existing WorkChains untouched, you can:

  • Wrap WorkChain B as a task in a WorkGraph.
  • Add a monitor task that polls for the output of WorkChain A (or whatever data you need).
  • Chain the monitor to task B so that B only runs once the monitor returns True.

Here is a pseudo-code example of how you might implement this:

from aiida_workgraph import WorkGraph, task
from aiida.workflows.arithmetic.multiply_add import MultiplyAddWorkChain
from aiida import load_profile

load_profile()

@task.monitor
def is_data_avaiable(pk: int, output_name: str) -> bool:
    """Return True when WorkChain A (PK={pk}) has produced the given output."""
    from aiida.orm import load_node
    node = load_node(pk)
    # here we assume the output is a Data node, you can modify this logic based on your needs
    return output_name in node.outputs

with WorkGraph("test_wait") as wg:
    # Replace with actual pk of WorkChain A, and output name you want to check
    ready = is_data_avaiable(pk=pk, output_name='product')
    # This is WorkChain B, I use MultiplyAddWorkChain as an example
    inputs = {"x": 3, "y": 4, "z": 5, "code": code}
    task_b = task(MultiplyAddWorkChain)(**inputs)
    # set the dependency
    # ">>" means task B will only run if is_data_avaiable returns True
    ready >> task_b

wg.submit()

Please refer to Monitor external events as a task for more details.

Option 2: Convert your WorkChains to WorkGraphs

If you’re willing to refactor, you can convert your existing WorkChains to WorkGraphs. That way each step becomes a task block, and you can express arbitrary “step-to-step” dependencies directly in the workflow graph.

Please refer to the Convert WorkChain to WorkGraph

Note: the online AiiDA WorkGraph documentation is currently undergoing a major refactoring. If you encounter any issues or broken links, please feel free to reach out directly. We are happy to help troubleshoot or provide the latest guidance.

1 Like

Thank you Timo and Xing for you replies!
These two WorkChains are independent and b2 has to wait for a1 to finish. But it seems it is not possible with my current WorkChains.
I will take a look at WorkGraph.
Am I right in thinking that the only way to make a WorkChain wait for an external event is to convert it to a WorkGraph?

Maybe other people have a better approach.
Here, I provide a way to wait for an external event inside a WorkChain. The idea is that while waiting for the external event, it should not block the event loop of the AiiDA runner (that run the WorkChain).

There are two ways to achieve this:

  • Use an asynchronous function to monitor the external state; however, WorkChain does not support running custom asynchronous functions yet.
  • Use a normal function to monitor the external state, but run the function in a separate process, thus not blocking the AiiDA runner. In this case, you can use the PythonJob to do this.

Here is an example function,

def is_data_avaiable(pk: int, output_name: str, timeout: int=3600) -> bool:
    """Check if data is available for WorkChain A with pk {pk}.
    Implement your logic here.
    If data is available, return True, otherwise False.
    """
    from aiida import load_profile, orm
    import time
    load_profile()

    node = orm.load_node(pk)
    tstart = time.time()
    # because this function run in another process, so it will not block the AiiDA runner.
    while output_name not in node.outputs:
        # Wait for the output to be available
        time.sleep(1)
        if time.time() - tstart > timeout:
            return

Inside your WorkChain B, you submit this function as an PythonJob in the step before b1.

B
  - submit pythonjob step
  - b1
  - b2
  - b3
  - b4

Here is an example how to submit the function as a PythonJob.

from aiida_pythonjob import PythonJob, prepare_pythonjob_inputs

inputs = prepare_pythonjob_inputs(
    is_data_avaiable,
    function_inputs={"pk": nodeA.pk, "output_name": "result", "timeout": 3600},
)
process = self.submit(PythonJob, inputs=inputs)
self.to_context(process=process)

In order to let the function fetch your AiiDA database, you need to run the PythonJob on the localhost where your AiiDA is running, and activate the same Python environment (where your AiiDA is running) if needed. For more details, please refer to the PythonJob docs.

1 Like