Exposing dynamic outputs using labels from Calculations?

Dear community,

I am facing one issue on how to expose the outputs from some calculations in the a workflow (AiiDA) ,

for example say i am running a loop of calculation (as an example)

    def run(self):

        opts = self.inputs.parameters["parameters"]
        for opt in opts:
            inputs = AttributeDict(self.exposed_inputs(Calculation, namespace="my_calc"))
            inputs.parameters = get_parameters(opt)
            inputs.metadata.label = label
            inputs.metadata.call_link_label = label
            future = self.submit(Calculation, **inputs)
            self.to_context(**{label: future})

so the loop can launch a different set of Calculations with distintive labels from the parameters.
The question i have is how i can expose the outputs , in particular i would like to expose all the outputs from the calculations but now in the namespace using label.

for example node.output.{label} (and all the output from Calculation). any suggestions ?


One solution is to create a dynamic output, and save all the Calculations outputs there.
Here is a working example,

from aiida.engine import WorkChain, submit, run
from aiida.calculations.arithmetic.add import ArithmeticAddCalculation
from aiida.common import AttributeDict
from aiida import load_profile, orm


class AddMultiplyWorkChain(WorkChain):

    def define(cls, spec):
        spec.expose_inputs(ArithmeticAddCalculation, namespace='add')
        spec.output_namespace("my_outputs", dynamic=True)

    def run_add(self):
        inputs = AttributeDict(self.exposed_inputs(ArithmeticAddCalculation, namespace="add"))
        for i in range(4):
            inputs['y'] = orm.Int(i)
            label = f"add_{i}"
            future = self.submit(ArithmeticAddCalculation, **inputs)
            self.to_context(**{label: future})

    def inspect_run(self):
        """Collect outputs"""
        my_outputs = {}
        for i in range(4):
            label = f"add_{i}"
            my_outputs[label] = {}
            process = getattr(self.ctx, label)
            for key in process.outputs._get_keys():
                my_outputs[label][key] = getattr(process.outputs, key)
        self.out("my_outputs", my_outputs)

code = orm.load_code("add@localhost")
run(AddMultiplyWorkChain, **{"add": {"code": code, "x": orm.Int(1), "y": orm.Int(2)}})

You will find outputs similar to

Outputs                PK    Type
---------------------  ----  ----------
        remote_folder  5746  RemoteData
        retrieved      5750  FolderData
        sum            5754  Int
        remote_folder  5747  RemoteData
        retrieved      5751  FolderData
        sum            5755  Int
        remote_folder  5748  RemoteData
        retrieved      5752  FolderData
        sum            5756  Int
        remote_folder  5749  RemoteData
        retrieved      5753  FolderData
        sum            5757  Int

You can also try aiida-workgraph, in which you can launch a dynamic number of sub-process. In the workgraph, one does not need to expose the outputs, and you will have access to all sub-process nodes.
Here is the same example using aiida-workgraph:

from aiida_workgraph import WorkGraph

data = range(4)
wg = WorkGraph("test")
for i in data:
    add = wg.nodes.new(ArithmeticAddCalculation, name=f"add_{i}",
                       code=code, x=orm.Int(1), y=orm.Int(i)
# You can also use submit directly
# wg.submit(wait=True, timeout=100)
# get the outputs of add_0
print("outputs: ", wg.nodes["add_0"].node.outputs)

In case, you submit the job without waiting, you can load the WorkGraph later using its pk

from aiida_workgraph import WorkGraph
wg = WorkGraph.load(pk)
print("outputs: ", wg.nodes["add_0"].node.outputs)

Thank you Xing! ,

First option does the job, i will also check the other two you mentioned!