Workflow optimization: starting a process without waiting for the previous

The following is the outline of a parent workflow that triggers two other child workflows: one for transferring data and another one for running a simulation. This loops over multiple days.

Given that ‘transfer_data’ should run in block each day (meaning that the workflow should wait for the transfer of one day to be done before starting the next one), and that run_sim can be submitted without need to wait for it to be done.

Question:

The optimal way is that once the transfer is completed for one day, aiida submits run_sim and does not wait for the workflow to star the transfer of data for the next day (it will be like not sending the run_sim to context). Can this be done in AiiDA?

spec.outline(
             while_(cls.dates)(
                              cls.transfer_data,
                              cls.run_sim
                              )
             cls.finalize,
             )

Hi @LucR31 , I am not sure I fully understand the use case. Do you mean that run_sim starts a calculation that is running for a multiple days on end? And that each day you want to run a separate job transfer_data that takes the output data of run_sim and moves/copies it somewhere else? How would the workflow now when the next transfer_data should be run? How is the run_sim going to be ended?

Hi @sphuber, thanks for the reply!
So, run_sim does the calculation of one day at a time, and transfer_data prepares the data of one day to be use by the subsequent run_sim.

run_sim needs to wait for the transfer of that day to be done. But the next days transfer doesn’t need to wait for run_sim of previous day. Yet, the transfers cannot be launched in parallel.

Here is an example of what I mean: imagine we want to do 3 days, then:

transfer day 1, once that’s done, run_sim day 1 and transfer day 2 starts, after transfer day 2 is finished, then run_sim of day 2 and transfer day 3 starts…

The workflow ends when all calculations are done.

What does the transfer task need to wait? If it does not need to wait for any tasks, you can run all days in parallel.

Thanks, that clarifies some things. But a few more questions remain open:

  • Are the run_sim calculations independent? i.e. as long as the relevant transfer is done, they can run in parallel?
  • How long does the transfer operation take typically, in absolute terms or compared to the run_sim duration?

Hi @Xing, unfortunately, and for different reasons, the transfers are dependent with each other and cannot run in parallel.

@sphuber:

  • run_sim are independent and can indeed run in parallel.
  • It is hard to say because it varies form day to day. But usually the run_sim takes longer.

Ok thanks for the additional information, everything is clear now. Unfortunately, it is currently not possible what you are trying to do in the way you are suggesting. The WorkChain API will only actually start running subprocesses that are submitted in a workchain step after the step has returned.

If you are looping over the dates (i.e. number of independent runs) as you suggest:

spec.outline(
    while_(cls.dates)(
        cls.transfer_data,
        cls.run_sim
    ),
    cls.finalize,
)

When you launch the transfer job in transfer_data it will only be actually started once transfer_data returns and it is only when that job finished that the workchain will move on to run_sim.

One alternative would be to change the logic to:

spec.outline(
    cls.transfer_all_data,
    cls.run_all_sim,
    cls.finalize
)

Here instead of looping in the outline, you simply call transfer_all_data once and submit all transfer jobs. This way they will be running in parallel and once all are done, it will call run_all_sim and you can submit all simulation jobs. One potential downside is that if there is a big change in runtime for the transfer jobs (either intrinsically or due to waiting times in the queue), the slowest one will determine the start time for all simulation runs.

There is a way to get around this and that is to write a sub worfklow that just does one date. It would then have two steps: launch the transfer job followed by running the simulation job. You could then have a parent worfklow that loops over the dates and calls the subworkflow:

spec.outline(
    while_(cls.dates)(
        cls.run_sub_workflow
    ),
    cls.finalize,
)

and the sub workflow would have date as an input and then just have

spec.outline(
    cls.transfer_data,
    cls.run_sim
)

In this way, your transfer and sim jobs for each date are truly independent from one another and I think should minimize unnecessary waiting time.

Hopefully that helps.

WorkChain

Base on your description, you want to:

  • step 1, transfer_data, for day 1
  • step 2, run_sim for day 1, and transfer_data for day 2, in parallel.
  • step i+1, run sim for day i, and transfer_data for day i+1, in parallel.
  • step n + 1, run _sim for day n

Therefore, you can replace your run_sim with run_sim_and_transfer_next_data, which will run sim for day i, and transfer data for day i+1 in parallel.

spec.outline(
     cls.transfer_data,
     while_(cls.dates)(
                      cls.run_sim_and_transfer_next_data
                      )
     cls.finalize,
     )
def run_sim_and_transfer_next_data(self):
    process1 = submit(your_sim, inputs)
    process2 = submit(your_transfer_data, inputs)
    self.to_context({"sim_i", process1, "transfer_data_i", process2)

Just make sure, in the last run, you only run sim and skip the transfer_data.

In this case, one still need to wait the longer process between sim i and transfer_data i+1, but this is the limitation of WorkChain. As far as I know, there is no better solution within WorkChain.

WorkGraph

You can solve this waiting problem in the new AiiDA workflow system: AiiDA-WorkGraph, which provides the best performance for your case, Here is the example code:

from aiida_workgraph import WorkGraph, task

@task()
def transfer_data():
    """Your data transfer code here"""
    pass

@task()
def sim():
    """Your simulation code here"""
    pass

# suppose you have n days
n = 5
wg = WorkGraph()
for i in range(n):
    wg.add_task(transfer_data, name=f"transfer_data_{i}")
    wg.add_task(sim, name=f"sim_{i}")
    # let sim_i wait for transfer_data_i
    wg.tasks[f"sim{i}"].wait = [wg.tasks[f"transfer_data_{i}"]]
    # let transfer_data i+1 wait for transfer_data i
    wg.tasks[f"transfer_data_{i}"].wait = [wg.tasks[f"transfer_data_{i-1}"]] if i > 0 else []

wg.submit()

This will build the workflow as shown in the GUI:

When the transfer_data i is finished, it will immediately submit transfer_data i + 1 and `sim i’. This can be seen by the time used for each process (I tried the WorkGraph using a simple CalcJob as the tasks).

One more figure shows the performance

1 Like