Aiida-workgraph: How to handle dynamic inputs and outputs of tasks

Using aiida-workgraph I am trying to create a workflow where I want to start a (potentially variable) number of DFT calculations using different input StructureData and then pass the corresponding StructureData and one of the output ports of the DFT calculation to a calcfunction task that gives one of the StructureData’s as output depending on what the outputs of the DFT calculations were. So the workgraph should look like something below,

I have been trying to follow this example Run tasks in parallel - AiiDA WorkGraph documentation because it seems to do something similar with having a variable number of inputs into a calcfunction but I don’t understand exactly how it works and can’t seem to get it to work with StructureData instead of the Int used in the example (I get an error message about StructureData not being supported due to not being serializable). Does anyone know how I would express a workflow like this in aiida-workgraph and would someone be able to better explain how this dynamic input and output works with aiida-workgraph?

1 Like

Hello @ahkole,

Yes the whole context thing is very cumbersome, I think some other person can explain it to you better than me, but there is a more understandable way to do this in the workgraph when increasing the link limit.


@task.calcfunction
def struc_generator(seed: orm.Int):
    np.random.seed(seed.value)
    return orm.StructureData(cell=np.random.rand(3,3))

@task.calcfunction
def aggregate_strucs(**collected_strucs):
    for key,value in collected_strucs.items():
        print(key,value)
    # we have to clone because aiida does not allow to return identity
    return {"result": {key: value.clone() for key, value in collected_strucs.items()}}


wg = WorkGraph("aggregate")

aggregate_strucs_task = wg.add_task(aggregate_strucs, name="aggregate_strucs_task")

# we have to increase the link limit because by default workgraph only supports one link per input socket
# this is still an experimental feature that is why
aggregate_strucs_task.inputs["collected_strucs"].link_limit = 50

for i in range(max_query_pk):
    struc_generator_task = wg.add_task(
        struc_generator, name=f"struc_generator{i}", seed=orm.Int(i)
    )
    wg.add_link(
        struc_generator_task.outputs["result"],
        aggregate_strucs_task.inputs["collected_strucs"],
    )
    
#display(wg) # if you use a jupyter notebook you can use this
wg.run()
aggregate_strucs_task.outputs["result"].value

You error seems to be because you want to aggregate two arbitrary sized parameters. For this you need to use a calcfunction for each input you want to be arbitrary sized parameter. If you want both inputs in the same function you need to use a task function as calcfunction does not support this. The task function however does not track provenance so you need to reintroduce calcfunctions to store your results in the database. This is the code

@task.calcfunction
def struc_generator(seed: orm.Int):
    np.random.seed(seed.value)
    return orm.StructureData(cell=np.random.rand(3,3))

@task.calcfunction
def output_generator(seed: orm.Int):
    np.random.seed(seed.value)
    return orm.Int(seed.value)


@task.calcfunction
def aggregate_strucs(**collected_strucs):
    for key,value in collected_strucs.items():
        print(key,value)
    # we have to clone because aiida does not allow to return identity
    return {"result": {key: value.clone() for key, value in collected_strucs.items()}}


@task.calcfunction
def aggregate_outputs(**collected_outputs):
    for key,value in collected_outputs.items():
        print(key,value)
    # we have to clone because aiida does not allow to return identity
    return {"result": {key: value.clone() for key, value in collected_outputs.items()}}

@task
def aggregate(strucs, outputs):
    # you can now postprocess the two dicts but be aware that this does not track provenance
    # e.g. choose structure with minimal cell volume and its output
    idx = np.argmin([struc.get_cell_volume() for struc in strucs.values()])
    sel_struc = list(strucs.values())[idx]
    sel_output = list(outputs.values())[idx]
    return_identity(sel_struc) # for provenance
    return_identity(sel_output) # for provenance
    return {"struc": sel_struc, "output": sel_output}
        
@task.calcfunction
def return_identity(obj):
    return obj.clone()
   
wg = WorkGraph("aggregate_two")

aggregate_strucs_task = wg.add_task(aggregate_strucs, name="aggregate_strucs_task")
aggregate_outputs_task = wg.add_task(aggregate_outputs, name="aggregate_outputs_task")

# we have to increase the link limit because by default workgraph only supports one link per input socket
# this is still an experimental feature that is why
aggregate_strucs_task.inputs["collected_strucs"].link_limit = 50
aggregate_outputs_task.inputs["collected_outputs"].link_limit = 50


for i in range(2):
    struc_generator_task = wg.add_task(
        struc_generator, name=f"struc_generator{i}", seed=orm.Int(i)
    )
    output_generator_task = wg.add_task(
        output_generator, name=f"output_generator{i}", seed=orm.Int(i)
    )

    wg.add_link(
        struc_generator_task.outputs["result"],
        aggregate_strucs_task.inputs["collected_strucs"],
    )
    wg.add_link(
        output_generator_task.outputs["result"],
        aggregate_outputs_task.inputs["collected_outputs"],
    )
    
aggregate_task = wg.add_task(aggregate, name="aggregate_task",
                             strucs=aggregate_strucs_task.outputs["result"],
                             outputs=aggregate_outputs_task.outputs["result"])
    
#display(wg) # use this to show if in jupyter notebook 
wg.run()

You can can plot the provenance graph and should see something like this

You can retrieve the result with a query builder

qb = QueryBuilder()
qb.append(
    orm.StructureData,
    filters=orm.StructureData.fields.pk == 2432 # primary of the structure data outputed at the end
)
qb.first(flat=True)

This has become now quite cumbersome and the provenance graph does not really display what happened, so I think we have to do some work to do to support this case better (or maybe someone else finds a better solution), but I hope it solves your problem for now.

2 Likes

Hi @ahkole, thanks for opening this interesting topic. and thanks, @agoscinski, for the example.
Note, this is not only relevant to the aiida-workgraph package, but also to the AIIDA calcfunction in general. I try to guess/explain the problem and discuss possible solutions.

The error message ( StructureData not being supported due to not being serializable), is probably caused by passing a set of StructureData to a non-dynamic input of a calcfunction.

Suppose one define a calcfunction has the following signature:

@calcfunction
def get_structure(structures: dict[str, orm.StructureData], dft_outputs: dict[str, orm.Dict]) -> orm.StructureData:

Running the function will raise an error because structures is a non-dynamic input, and one can not pass a set of AiiDA nodes to a non-dynamic input.

In order to allow dynamic inputs, one must use a variable keyword argument **datas in the calcfunction signature.

@calcfunction
def get_structure(**datas) -> orm.StructureData:

This will declare that the input is dynamic, and one can pass any number of inputs to the calcfunction. So you can use it like this:

get_structure(structures=..., dft_outputs=...)

However, when adding this calcfuntion to a WorkGraph, WorkGraph will only define one input port for the task, i.e., datas.

There are two kind of solutions to this problem:

Solution 1: Aggregate the inputs in a single dictionary

As suggested by @agoscinski , you can use a normal task to gather the inputs and pass them to the calcfunction.

Here, I provide another solution similar to the docs example you mentioned. One can use the context of the WorkGraph to store the structures and energies, and then output the context variable as an output of a graph_builder task.

Here is a working example. To run it, please install the latest aiida-workgraph version(0.3.26).

from aiida_workgraph import task, WorkGraph
from aiida import load_profile, orm
from ase.build import bulk

load_profile()

@task(outputs=[{"name": "energy"}])
def emt_calculator(structure: orm.StructureData) -> dict:
    """Calculate the energy of a structure using the EMT calculator."""
    from ase.calculators.emt import EMT

    atoms = structure.get_ase()
    atoms.calc = EMT()
    atoms.get_potential_energy()
    return {"energy": orm.Float(atoms.calc.results["energy"])}

@task.graph_builder(outputs=[{"name": "datas", "from": "context.datas"}])
def calculate_energies(structures: dict):
    """Calculate the energies of a set of structures."""
    wg = WorkGraph()
    # we store the structures in the 'context.datas.structures''
    # later we can output the 'context.datas' as an output
    wg.context = {"datas.structures": structures}
    for key, structure in structures.items():
        emt1 = wg.add_task(emt_calculator, structure=structure)
        # add result of emt1 to 'context.datas.energies'
        emt1.set_context({"energy": f"datas.energies.{key}"})
    return wg

@task.calcfunction()
def get_stablest_structure(**datas):
    """Get the structure with the lowest energy.
    The input is dynamic, we must use a variable kewword argument. **datas
    """
    structures = datas["structures"]
    energies = datas["energies"]
    min_key = min(energies, key=energies.get)
    return structures[min_key].clone()

#---------------------------------------------------------
# create a workgraph
wg = WorkGraph("find_stable_structure")
wg.add_task(calculate_energies, name="calculate_energies1")
wg.add_task(get_stablest_structure, name="get_stablest_structure1",
            datas=wg.tasks["calculate_energies1"].outputs["datas"])
#---------------------------------------------------------
# generate structures with different lattice constants
al = bulk("Al")
structures = {}
scales = [0.95, 1.0, 1.05]
for i in range(len(scales)):
    atoms = al.copy()
    atoms.set_cell(atoms.get_cell() * scales[i], scale_atoms=True)
    structure = orm.StructureData(ase=atoms)
    structures[f"Al_{i}"] = structure

# run the workgraph with the input structures
wg.run(inputs={"calculate_energies1": {"structures": structures}})
print("Most stable structure:", wg.tasks["get_stablest_structure1"].outputs["result"].value)

Solution: 2 Add the input ports manually

Another solution is to add the input ports manually to the calcfunction task. By doing so, one can pass the structures and energies to the calcfunction task directly.
Again, please install the latest aiida-workgraph version (0.3.26).

# Add the inputs manually, so that we can use them to link the tasks
@task.calcfunction(inputs=[{"name": "structures"},
                           {"name": "energies"}])
def get_stabest_structure(**kwargs):
    """Get the structure with the lowest energy.
    The input is dynamic, we must use a variable kewword argument. **kwargs
    """
    structures = kwargs["structures"]
    energies = kwargs["energies"]
    min_key = min(energies, key=energies.get)
    return structures[min_key].clone()

Here is the modified example:

from aiida_workgraph import task, WorkGraph
from aiida import load_profile, orm
from ase.build import bulk

load_profile()

@task(outputs=[{"name": "energy"}])
def emt_calculator(structure: orm.StructureData) -> dict:
    """Calculate the energy of a structure using the EMT calculator."""
    from ase.calculators.emt import EMT

    atoms = structure.get_ase()
    atoms.calc = EMT()
    atoms.get_potential_energy()
    return {"energy": orm.Float(atoms.calc.results["energy"])}

@task.graph_builder(outputs=[{"name": "energies", "from": "context.energies"}])
def calculate_energies(structures: dict):
    """Calculate the energies of a set of structures."""
    wg = WorkGraph()
    for key, structure in structures.items():
        emt1 = wg.add_task(emt_calculator, structure=structure)
        # add result of emt1 to 'context.datas.energies'
        emt1.set_context({"energy": f"energies.{key}"})
    return wg

# define the inputs manually for connecting the tasks
@task.calcfunction(inputs=[{"name": "structures"},
                           {"name": "energies"}])
def get_stabest_structure(**kwargs):
    """Get the structure with the lowest energy.
    The input is dynamic, we must use a variable kewword argument. **kwargs
    """
    structures = kwargs["structures"]
    energies = kwargs["energies"]
    min_key = min(energies, key=energies.get)
    return structures[min_key].clone()

#---------------------------------------------------------
# create a workgraph
wg = WorkGraph("find_stable_structure")
wg.add_task(calculate_energies, name="calculate_energies1")
wg.add_task(get_stabest_structure, name="get_stabest_structure1",
            energies=wg.tasks["calculate_energies1"].outputs["energies"])
#---------------------------------------------------------
# generate structures with different lattice constants
al = bulk("Al")
structures = {}
scales = [0.95, 1.0, 1.05]
for i in range(len(scales)):
    atoms = al.copy()
    atoms.set_cell(atoms.get_cell() * scales[i], scale_atoms=True)
    structure = orm.StructureData(ase=atoms)
    structures[f"Al_{i}"] = structure

# run the workgraph with the input structures
wg.run(inputs={"calculate_energies1": {"structures": structures},
               "get_stabest_structure1": {"structures": structures}})
print("Most stable structure:", wg.tasks["get_stabest_structure1"].outputs["result"].value)

I am happy to discuss this further if anything is unclear or if you believe there are opportunities for improvement on the aiida-workgraph side.

1 Like

Interesting discussion on the aiida-workgraph. I am still familiarizing myself with it, but I wanted to already give a quick comment on calcfunctions not allowing to return one of its inputs.

The calcfunction represents a “calculation” in AiiDA’s provenance model and those can only create new nodes. If you were to return one of the input nodes, you would need a “workflow” like process, like a WorkChain or a workfunction. See the section on Provenance in the documentation

So if you have a function like get_stabest_structure that takes a set of StructureDatas as input and needs to select one of them and return it, you want a workfunction and not a calcfunction. It functions exactly as a calcfunction in syntax except it is allowed to return existing nodes. The provenance graph would look like the following:

See the section on workfunctions of the documentation for more details

2 Likes

Hi @agoscinski, @Xing, @sphuber, thanks for the elaborate explanations and examples! This was very helpful for identifying gaps in my knowledge (and for motivating me to have another good look at the documentation about process functions, calcfunctions and workfunctions). I think I understand things a lot better now.

It is a bit confusing that variable keywords work differently when calcfunctions are used in a WorkGraph compared to when they are used “regularly”, but I’m sure there is a good reason for that?

With all the examples given I think I am able to know define the workflow that I had in mind as a WorkGraph but there are still some questions that I have to understand all the concepts involved better,

(1) Is there a difference between @calcfunction and @task.calcfunction? I suspect it’s probably not possible to manually add input ports for @calcfunction using the @task.calcfunction(inputs=...) syntax? Are there other differences between using one or the other?

(2) What exactly is a @task task and how does it compare to using either calcfunctions, CalcJobs, workfunctions or WorkChains as tasks? Why doesn’t @task track provenance as @agoscinski mentioned? For which kinds of tasks would you use @task?

(3) I am also trying to understand which (kinds of) ports are generated when using different function signatures. If I understand the documentation on variadic arguments for process functions correctly (Process functions — AiiDA 2.6.2 documentation), then using a function signature def f(**kwargs) and calling it like f(x=Int(1), y=Int(2)) will generate a node with two input ports x and y. However, if you were to use the same signature for a task in a workgraph then you mention that only a single output port kwargs would be generated. What kind of input port is this? And what exactly happens if you link this context.datas to it? Do you then get some sort of nested namespace input ports? I.e., do you get the input ports kwargs.structures.key1, kwargs.structures.key2, kwargs.energies.key1, etc.? Similarly for the dynamic output ports of a @task.graph_builder. Do the generated output ports of calculate_energies form a nested namespace, i.e. do you get the output ports datas.structures.key1, datas.structures.key2, datas.energies.key1, etc.?

(4) Finally some questions about @sphuber’s comment about workfunction vs calcfunction. Is there also a @task.workfunction that you can define? And can you manually add input ports with the same syntax as for @task.calcfunction? And what kind of task is a @task.graph_builder? Is it more similar to a calculation or a workflow? I.e., is it allowed to create new data nodes? Is it allowed to return existing data nodes? Both?

@ahkole, you’ve posed some excellent questions that would be a great addition to the FAQ section of our documentation website. I’ll do my best to provide answers to each of them.

BTW, what @sphuber suggested is very useful.

(1) Is there a difference between @calcfunction and @task.calcfunction? …

They are the same if you don’t pass any arguments to @task.calcfunction. However, if the calcfunction returns mutiple AiiDA data node, then you need to define the outputs explicitly:

@task.calcfunction(outputs=[{"name": "sum"}, {"name": "diff"}])
def add_minus(x, y):
    return {"sum": x + y, "difference": x - y}

WorkGraph requires users to explicitly define ports to establish connections between tasks. This strict requirement reduces flexibility, but it clearly defines dependencies, enhancing the workflow’s readability and robustness.

  1. What exactly is a @task task and … For which kinds of tasks would you use @task?

When running a @task task, i.e., a normal task, in the engine, it is just a normal Python function, it does not store any provenance information. If you are familar with writing workflow using WorkChain, e.g., the While_ in a workchain.

spec.outline(
        cls.setup,
        while_(cls.should_run_process)(
            cls.run_process,
            cls.inspect_process,
        ),
        cls.results,
    )


# example
@classmethod
def should_run_process(cls):
    return self.ctx.some_force < 0.01

This cls.should_run_process does some calculation (compare value), but does not need to store provenance information. Similar in workgraph, you can use @task to define a task that does not need to store provenance information. Acutally, all the methods listed in the outline do not generate new AiiDA data node, but they are used to gather the results and transfer them to the next step throught the context variable.

(3) I am also trying to understand which (kinds of) ports are generated when using different function signatures. …

For a single input port kwargs, because it is variable keyword arguments, when run the function in the engine, it will pass **kwargs to the function, e.g.,

f(**{x=Int(1), y=Int(2)}) 

This is equivalent to to f(x=Int(1), y=Int(2)). You can run the example code, and use verdi process show pk to show the ports of a process. Here is the info:

Inputs      PK      Type
----------  ------  -------------
energies
    Al_0    134994  Float
    Al_1    134995  Float
    Al_2    134996  Float
structures
    Al_0    134988  StructureData
    Al_1    134989  StructureData
    Al_2    134990  StructureData

Here is the info for the graph_builder:

Outputs          PK      Type
---------------  ------  -------------
datas
    structures
        Al_0     134988  StructureData
        Al_1     134989  StructureData
        Al_2     134990  StructureData
    energies
        Al_0     134994  Float
        Al_1     134995  Float
        Al_2     134996  Float
execution_count  134993  Int

The datas output is a nested namespace. In the engine, when pass it to the kwargs port of next task, the engine will run get_stabest_structure(**datas), thus generate the inputs as shown before.

Is there also a @task.workfunction that you can define? And…

Yes, you can use @task.workfunction to define a workfunction task.
Yes, you can add input ports with the same syntax.

And what kind of task is a @task.graph_builder? Is it more similar to a calculation or a workflow? I.e., is it allowed to create new data nodes? Is it allowed to return existing data nodes? Both?

I would say it is a normal task, it does not store any provenance information on its inputs and outputs. Looking at the return value of the graph_builder function, it does not return an AiiDA node, but always return a WorkGraph object. Similar, you can pass any data type (AiiDA data node, python object, etc) to the graph_builder function. That’s why it very flexible, and you can really create dynamic (complex) nested workflows using graph_builder.

Again, let’s compare it with the WorkChain. For example, let’s say we have a WorkChain that calculate the energies of a set of structures, and then find the minimum energy.


class ExampleWorkChain(WorkChain):
    @classmethod
    def define(cls, spec):
        super().define(spec)
        spec.input_namespace('structures', valid_type=StructureData)
        spec.outline(
            cls.run_dft,
            cls.inspect_dft_result,
            cls.find_minimum,
            cls.results
        )
        spec.output('result')

    def run_dft(self):
        processes = {}
        for key, structure in self.inputs.structures.items():
            process = self.submit(SomeCalclation, inputs = {'structure': structure})
            processes[key] = process
        self.to_context(**processes)

    def inspect_dft_result(self):
        dft_outputs = {}
        for key in self.inputs.structures:
            process = self.ctx[key]
            dft_outputs[key] = process.outputs.some_output
        self.ctx.dft_outputs = dft_outputs

    def find_minimum(self):
        self.ctx.minimum = find_minimum(**self.ctx.dft_outputs)

    def results(self):
        self.out('result', self.ctx.minimum)

Using WorkGraph and @task.graph_builder:

@task.graph_builder(outputs=[{"name": "dft_outputs", "from": "context.dft_outputs"}])
def run_dft(structures):
    """Run DFT calculation for a set of structures."""
    wg = WorkGraph()
    for key, structure in structures.items():
        dft_task = wg.add_task(SomeCalclation, inputs = {'structure': structure})
        dft_task.set_context({"some_output": f"dft_outputs.{key}"})
    # return the workgraph
    return wg

wg = WorkGraph()
run_dft_task = wg.add_task(run_dft, structures = structures)
wg.add_link(run_dft_task.outputs["some_output"], inspect_dft_result_task.inputs["dft_outputs"])
wg.add_task(find_minimum, inputs = run_dft_task.outputs["dft_outputs"])

The run_dft graph_builder task has similar functionality as the run_dft and inspect_dft_result methods in the WorkChain. But do we lose the provenance information by using @task.graph_builder? No, the key is the WorkGraph object which returned by the graph_builder. After we launching the return WorkGraph object, it created a new process node, which will store all the provenance information.

But is there any risk of losing provenance information when using @task.graph_builder? Yes, if you created a AiiDA data inside the graph_builder function, and passed it into the return WorkGraph object, then you will lose the provenance information.

Same risk happens in the WorkChain, if you create a AiiDA data node inside a method, e.g., inspect_dft_result, and pass it to the next step, then you will lose the provenance information. And I found this is very common in the WorkChain! For example, in the PdosWorkChain, it generates a Dict node, and pass it as input node for the dos calculation. However, the developer (or user) needs to decide whether to keep strict provenance information or not. It is a trade-off between the flexibility and the provenance information.

1 Like

Hi @Xing, thank you for taking the time to answer all my questions! I think I am starting to understand things a lot better now. There is one thing I don’t yet completely understand about the combination of variadic arguments and manually adding input ports:

What exactly is the difference between (1) manually adding input ports for a variadic argument, i.e.

@task.calcfunction(inputs=[{"name": "structures"},
                           {"name": "energies"}])
def get_stablest_structure(**kwargs):
     pass

and (2) defining input ports through the arguments of the function, i.e.

@task.calcfunction()
def get_stablest_structure(structures, energies):
    pass

How do the input ports differ in these two cases? Would it still be possible to assign a dictionary of structures and energies to the parameters in case (2)? Or is it only possible to assign AiiDA data nodes to the inputs if you are in case (2)?

And finally, just to check if I understand the concepts correctly. Is the reason that provenance information is lost in the PdosWorkChain example because you have no record of what/who created the Dict node in the _generate_dos_inputs step? I.e., there is no record that this Dict node was generated as part of a PdosWorkChain?

I.e., there is no record that this Dict node was generated as part of a PdosWorkChain?

There is no explicit record of it in the provenance graph. As in there is no calculation node that creates it, but it simply seems to appears out of nowhere, as you would when you create the node manually in a shell and store it. In the greater context, it will still be clear that it was probably created through the PdosWorkChain, as that is explicitly represented in the graph by a workchain node and it will have a call link to the calculation node that then received the Dict node as input. On top of that, the workchain node records the version of the Python package that provides the PdosWorkChain, so it should be possible to retrace what source code was used (provided there were no custom local changes to a released version).

So the question is really whether you think an explicit calculation node in the graph to represent the creation of the Dict input node provides any additional value.

Okay, I think I understand. So the key difference in terms of provenance for @task and @task.graph_builder compared to tasks created from either calcfunctions, workfunctions, calcjobs or workchains is that there is no process node stored in the provenance graph and therefore there is no node that can be marked as the creator/returner if there are data nodes created/returned in such a task?

Yes, I see. For the particular example of the PdosWorkChain I doubt there is any added value for having a “creator” of this Dict input node. So I can imagine there are more cases where a user/developer can decide storing such an explicit record is not necessary.

How do the input ports differ in these two cases?

In case 1), **kwargs will declare the whole inputs of the calcfunction dynamic, thus, you can pass a dictionary of AiiDA data nodes, or any nested dictionary with AiiDA data nodes.
In case 2), the input ports are fixed. Each argument corresponds to a specific AiiDA data node. You can only pass one AiiDA data node per argument.

So the key difference in terms of provenance for @task and @task.graph_builder compared to tasks created from either calcfunctions…

For @task, the task does not generate a process, so there is no provenance recorded. Consequently, outputs for such tasks will always be None because nothing is stored in the database.
For graph_builder task, it creates and returns a WorkGraph, and we use the process of this WorkGraph as the process of the graph_builder task. Therefore, you can access the outputs of a graph_builder task.

@task.graph_builder(outputs=[{"name": "result", "from": "multiply_task.result"}])
def add_multiply(x, y, z):
    wg = WorkGraph()
    wg.add_task(add, x=x, y=y, name="add_task")
    wg.add_task(multiply, x=wg.tasks["add_task"].outputs["result"], y=z, name="multiply_task")
    return wg

wg = WorkGraph()
wg.add_task(add_multiply, x=1, y=2, z=3, name="add_multiply1")
wg.run()
print("result: ", wg.tasks["add_multiply_task"].outputs["result"].value)

Thanks @Xing! This clears up any remaining confusion I had for now.