How to write WorkChains that execute calculations in parallel

What is the recommendation for executing isolated calculations in parallel?

As a concrete example, I want to calculate reduction/oxidation potentials for a given molecule structure. I’ve written a WorkChain like this:

class IonizationEnergy(WorkChain):

    @classmethod
    def define(cls, spec):
        """ Oxidation/Reduction Potential WorkChain """
        super().define(spec)
        spec.input('structure', valid_type=StructureData)
        spec.input('code', valid_type=AbstractCode)
        spec.outline(
            cls.neutral_calc,
            cls.oxidized_calc,
            cls.reduced_calc,
            cls.calculate_ionization_potentials,
        )
        spec.output('oxidation_potential', valid_type=Float)
        spec.output('reduction_potential', valid_type=Float)

In the first three steps I’m calling a remote DFT code. The final step of the workchain uses calcfunctions to do simple arithmetic on the outputs of the first three steps so it is required that they execute before the final step. However, oxidized_calc and reduced_calc have no dependence on the outputs of the previous steps in the workchain and it would be faster to execute them together.

The one solution that I immediately see here is to stick all of my calcjobs in one step of the workchain to submit them all at once. Is there any way to execute some steps asynchronously or is the order defined in the spec locked to sequential execution? Any recommendation here is appreciated.

No, your analysis is correct. The only way to execute these jobs in parallel is to submit them all in the same step. But if these jobs are truely independent, there is no real downside to putting them in the same logical step is there? If you want to separate the code in individual methods just for clarity, you can still do that. You could have a single step, but have that step call three different methods that each do one calculation, but in your outline you just add the single “parent” step.

1 Like

I initially thought it would make the provenance messier but I now see that there’s no real downside to running all the jobs in one step. Will I need as many daemon workers as I have jobs submitted in parallel or can one worker handle multiple parallel submissions?

One worker can handle multiple jobs in parallel yes. By default I think it will take 100 processes in parallel, but this can be configured. It might not be advisable to allow it to take more though, as it can get bogged down. If you need to run more in parallel, you would have to start thinking of increasing the number of workers.

Now attempting to bundle my jobs into one step I get unexpected behavior. Here’s my workchain:

class ParallelIonizationEnergy(WorkChain):
    @classmethod
    def define(cls, spec):
        """ Oxidation/Reduction Potential WorkChain """
        super().define(spec)
        spec.input('structure', valid_type=StructureData)
        spec.input('code', valid_type=AbstractCode)
        spec.outline(
            cls.parallel_calcs,
            cls.calculate_ionization_potentials,
        )
        spec.output('oxidation_potential', valid_type=Float)
        spec.output('reduction_potential', valid_type=Float)
    
    def build_neutral_calc(self):
        structure = self.inputs.structure
        code = self.inputs.code
        builder = code.get_builder()
        builder.structure = structure
        parameters = Dict(neutral_parameters)
        # parameters.description = "Input parameters for neutral molecule ORCA calculation"
        # parameters.store()
        builder.parameters = parameters
        max_wallclock_seconds = walltime
        builder.metadata.options.resources = computer_resources
        builder.metadata.options.max_wallclock_seconds = max_wallclock_seconds
        builder.metadata.options.withmpi = True
        return builder
    
    def build_ox_calc(self):
        structure = self.inputs.structure
        code = self.inputs.code
        builder = code.get_builder()
        builder.structure = structure
        parameters = Dict(oxidized_parameters)
        # parameters.description = "Input parameters for oxidized molecule ORCA calculation"
        # parameters.store()
        builder.parameters = parameters
        max_wallclock_seconds = walltime
        builder.metadata.options.resources = computer_resources
        builder.metadata.options.max_wallclock_seconds = max_wallclock_seconds
        builder.metadata.options.withmpi = True
        return builder

    def build_red_calc(self):
        structure = self.inputs.structure
        code = self.inputs.code
        builder = code.get_builder()
        builder.structure = structure
        parameters = Dict(reduced_parameters)
        # parameters.description = "Input parameters for reduced molecule ORCA calculation"
        # parameters.store()
        builder.parameters = parameters
        max_wallclock_seconds = walltime
        builder.metadata.options.resources = computer_resources
        builder.metadata.options.max_wallclock_seconds = max_wallclock_seconds
        builder.metadata.options.withmpi = True
        return builder
    
    def parallel_calcs(self):
        neutral_calc = self.build_neutral_calc()
        neutral_results = self.submit(neutral_calc)
        ox_calc= self.build_ox_calc()
        ox_results = self.submit(ox_calc)
        red_calc = self.build_red_calc()
        red_results = self.submit(red_calc)
        return ToContext(neutral_results=neutral_results, ox_results=ox_results, red_results=red_results)
    
    def calculate_ionization_potentials(self):
        neutral_data = self.ctx.neutral_results.outputs.output_parameters
        ox_data = self.ctx.ox_results.outputs.output_parameters
        red_data = self.ctx.red_results.outputs.output_parameters
        oxidation_potential = get_oxidation_potential(neutral_data, ox_data)
        reduction_potential = get_reduction_potential(neutral_data, red_data)
        self.out('oxidation_potential', oxidation_potential)
        self.out('reduction_potential', reduction_potential)

When I submit this from the verdi shell, the workchain immediately fails and shows its state as “Excepted” in the process list.

The process report on the excepted workchain shows this message:

2024-03-31 15:30:53 [105 | REPORT]: [505|ParallelIonizationEnergy|on_except]: Traceback (most recent call last):
  File "/home/cooper/envs/aiida/lib/python3.10/site-packages/plumpy/process_states.py", line 228, in execute
    result = self.run_fn(*self.args, **self.kwargs)
  File "/home/cooper/envs/aiida/lib/python3.10/site-packages/aiida/engine/processes/workchains/workchain.py", line 296, in run
    return self._do_step()
  File "/home/cooper/envs/aiida/lib/python3.10/site-packages/aiida/engine/processes/workchains/workchain.py", line 313, in _do_step
    finished, stepper_result = self._stepper.step()
  File "/home/cooper/envs/aiida/lib/python3.10/site-packages/plumpy/workchains.py", line 295, in step
    finished, result = self._child_stepper.step()
  File "/home/cooper/envs/aiida/lib/python3.10/site-packages/plumpy/workchains.py", line 246, in step
    return True, self._fn(self._workchain)
  File "/home/cooper/Research/aiida/tutorials/IonizationEnergyWorkChain.py", line 251, in parallel_calcs_2
    #     structure = self.inputs.structure
  File "/home/cooper/envs/aiida/lib/python3.10/site-packages/aiida/engine/processes/functions.py", line 277, in decorated_function
    result, _ = run_get_node(*args, **kwargs)
  File "/home/cooper/envs/aiida/lib/python3.10/site-packages/aiida/engine/processes/functions.py", line 249, in run_get_node
    result = process.execute()
  File "/home/cooper/envs/aiida/lib/python3.10/site-packages/aiida/engine/processes/functions.py", line 558, in execute
    result = super().execute()
  File "/home/cooper/envs/aiida/lib/python3.10/site-packages/plumpy/processes.py", line 90, in func_wrapper
    return func(self, *args, **kwargs)
  File "/home/cooper/envs/aiida/lib/python3.10/site-packages/plumpy/processes.py", line 1203, in execute
    return self.future().result()
  File "/home/cooper/envs/aiida/lib/python3.10/site-packages/plumpy/process_states.py", line 228, in execute
    result = self.run_fn(*self.args, **self.kwargs)
  File "/home/cooper/envs/aiida/lib/python3.10/site-packages/aiida/engine/processes/functions.py", line 606, in run
    result = self._func(*args, **kwargs)
  File "/home/cooper/Research/aiida/tutorials/IonizationEnergyWorkChain.py", line 71, in neutral_calc
    builder.metadata.options.withmpi = True
  File "/home/cooper/envs/aiida/lib/python3.10/site-packages/aiida/engine/launch.py", line 117, in submit
    process_inited = instantiate_process(runner, process, **inputs)
  File "/home/cooper/envs/aiida/lib/python3.10/site-packages/aiida/engine/utils.py", line 83, in instantiate_process
    process = process_class(runner=runner, inputs=inputs)
  File "/home/cooper/envs/aiida/lib/python3.10/site-packages/plumpy/base/state_machine.py", line 194, in __call__
    inst.transition_to(inst.create_initial_state())
  File "/home/cooper/envs/aiida/lib/python3.10/site-packages/plumpy/base/state_machine.py", line 339, in transition_to
    self.transition_failed(initial_state_label, label, *sys.exc_info()[1:])
  File "/home/cooper/envs/aiida/lib/python3.10/site-packages/plumpy/processes.py", line 1009, in transition_failed
    raise exception.with_traceback(trace)
  File "/home/cooper/envs/aiida/lib/python3.10/site-packages/plumpy/base/state_machine.py", line 324, in transition_to
    self._enter_next_state(new_state)
  File "/home/cooper/envs/aiida/lib/python3.10/site-packages/plumpy/base/state_machine.py", line 384, in _enter_next_state
    self._fire_state_event(StateEventHook.ENTERING_STATE, next_state)
  File "/home/cooper/envs/aiida/lib/python3.10/site-packages/plumpy/base/state_machine.py", line 300, in _fire_state_event
    callback(self, hook, state)
  File "/home/cooper/envs/aiida/lib/python3.10/site-packages/plumpy/processes.py", line 335, in <lambda>
    lambda _s, _h, state: self.on_entering(cast(process_states.State, state)),
  File "/home/cooper/envs/aiida/lib/python3.10/site-packages/plumpy/processes.py", line 689, in on_entering
    call_with_super_check(self.on_create)
  File "/home/cooper/envs/aiida/lib/python3.10/site-packages/plumpy/base/utils.py", line 31, in call_with_super_check
    wrapped(*args, **kwargs)
  File "/home/cooper/envs/aiida/lib/python3.10/site-packages/aiida/engine/processes/process.py", line 403, in on_create
    self._pid = self._create_and_setup_db_record()
  File "/home/cooper/envs/aiida/lib/python3.10/site-packages/aiida/engine/processes/process.py", line 589, in _create_and_setup_db_record
    self._setup_db_record()
  File "/home/cooper/envs/aiida/lib/python3.10/site-packages/aiida/engine/processes/process.py", line 693, in _setup_db_record
    raise exceptions.InvalidOperation('calling processes from a calculation type process is forbidden.')
aiida.common.exceptions.InvalidOperation: calling processes from a calculation type process is forbidden.

However, when I run this directly from the shell, everything executes normally. Furthermore, when I take the second and third self.submit() calls out of the parallel_calcs step, the job can be successfully submitted.

I suspect I have some fundamental misunderstanding about the intended structure of a workchain and how I can bundle multiple calculations.

Did you restart the daemon after making all the changes? This reeks like old code being executed by the daemon.

I had restarted it but this time I called the process repair command which found a few processes without process tasks. After restarting the daemon, killing the repaired processes, and resubmitting, it seems to be working!

Thanks as always @sphuber for the fast, helpful responses.

2 Likes