Implementing the Flux scheduler from LLNL

Hello everyone,

I am looking to possibly start implementing a scheduler plugin for the Flux scheduler that has been developed here at LLNL. This will be very useful as the next generation of super computers at the lab will only be running Flux.

There are some unique capabilities within Flux that I would like to take advantage of as I develop this but I’m not sure how well it fits into the AiiDA framework/thought process. Flux will work like a normal scheduler, lets say Slurm, where you can submit individual jobs to the queue. On top of this, you can request a Flux instance which will let you setup your personal queue(s) as needed for your tasks. Imagine being able to keep open a Slurm allocation and submitting the next step of your workflow without having to wait for a job to be submitted. There are some workflows that already take advantage of this at the lab and the throughput is very impressive. One example of this is Merlin.

Has something similar to this been done? Are there any pitfalls that can be thought of or is this not feasible at all? I’d appreciate any and all feedback on this. Thanks.

Nathan Keilbart

Hi Nathan,

Thanks a lot for getting in touch, It seems quite feasible to me.

Having a look at the abstract class for schedulers plugin I think a LLNL plugin can be developed only by implementing four abstract methods:

  • get_submit_script_header: basically define the generic structure of the submission script
  • submit_job: command to submit, this function should recieve and return a jobID → from what I read here Launching and tracking Flux jobs | LLNL HPC Tutorials , apparently also LLNL return something similar.
  • get_jobs: command to get status of one(or list of) job(s). Should update the status of aiida process as Queued, Running, Finished, etc…
  • kill_job: command to kill a job. Input value is jobID, return True or False

So implementing LLNL interface, should be rather straightforward. (assuming that you guys use SSH, If that’s not the case, then one has to develop also a transport plugin, and that’s more work…)

And about your main question regarding creating a new scheduler queue per workflow and submitting to that, I guess one should develop two extra methods for the scheduler plugin:

  • e.g. make_my_queue: command to create a Q in HPC
  • e.g. delete_my_queue: command to remove a Q from HPC

This way, you’ll need to do some extra steps for your aiida workflows. for example:

from aiida.orm.utils.loaders import load_computer
from aiida.calculations.arithmetic.add import ArithmeticAddCalculation

computer = load_computer(<COMPUTER NAME>)
scheduler = computer.get_scheduler()
# here you create the Q on remote
scheduler.make_my_queue(<Q_name>)

# and then in your workflow, specify <Q_name> 
# for e.g. with #SBATCH --partition=Q_name
builder = ArithmeticAddCalculation.get_builder()
builder.code = orm.load_code(<CODE_NAME>)
builder.x = orm.Int(extras_values[0])
builder.y = orm.Int(extras_values[1])
builder.metadata.options.custom_scheduler_commands = (<LLNL_Q_COMMAND>)
builder.submit()

# here, you eliminate the Q, if you want.
scheduler.delete_my_queue(<Q_name>)

Please don’t hesitate to ask more questions if unclear or if you need support developing it!

1 Like

Thanks for the quick response. I had to make sure I’d have the time to work on this and it looks like that is the case. I’ll reach back out as I have questions.

2 Likes

Hi @nkeilbart, when I try to bring a local scheduler for AiiDAlab, I was checking both Flux and hyperqueue.

I think those two are fit for the same goal to have a lightweight scheduler in your local machine. I finally decide to use hyperqueue because it can request for a more grind CPU allocation which is our requirement for the AiiDAlab container that may runs on the container that has non-integer cores allocated.

You can try the aiida-hyperqueue to see if it fit your goal.
Here is how I set up an aiida computer (localhost) for hyperqueue scheduler: aiidalab-qe/before-notebook.d/42_setup-hq-computer.sh at main · aiidalab/aiidalab-qe · GitHub

Here is how I start the hyperqueue service to submit the job: aiidalab-qe/before-notebook.d/43_start-hq.sh at main · aiidalab/aiidalab-qe · GitHub (basically, start the server and then start a worker with given number of cores)

But for sure, feel free to try to implement Flux scheduler plugin.

@ali-khosravi @sphuber

I’m finally coming back to implementing this scheduler and having some strange behavior I’ve been trying to track down. I don’t currently have a public facing git repository but will work on doing that soon so I can point you to the source code if you need to reference it. Currently, I’ve been modeling the flux scheduler to work much like the slurm scheduler. I am currently working with AiiDA v2.6.2 as well. I am able to get AiiDA to submit the job correctly to the cluster and it is then able to request the job information. What I am trying to track down is that somewhere along the way the job status is changing from Queued to finished or something else because it is continuing forward and attempting to pull the stdout and stderr file from the server. It then causes an error saying that the files could not be found. Are there any thoughts or suggestions on how to best debug this issue? I’ve been trying to follow all the functions back but it’s obviously a lot and I haven’t quite found the right spot yet. Let me know what other information I can provide. Thanks.

Hi @nkeilbart

Good to hear that you are implementing this.
How are you testing your plugin? if it’s via a dummy job --which perhaps is written manually(?)-- then the behavior might be expected, as it will not necessarily produce the output files.
And how are you observing the error, is it the tailoring of verdi daemon logshow?
And what’s the final status of the tasks in AiiDA? --if you do verdi process list -a is it excepted, or is it Finished[0]?–

It would be much easier, if you could ping us on the code directly, so we could review precisely on spot.

Thanks for the reply Ali. As per usual, you leave for the weekend and when you come back everything is working fine. I’m not sure if it was an issue on the cluster or on my end. For a brief summary, I am using a basic submission script of a silicon diamond structure with the QE pw binary. I had been using run instead of submit for the script to watch any print statements I had during the runtime. This was helping to debug a little bit. I’m still not sure what was occurring as it would say that it couldn’t parse the job but the job was still in the queue waiting to run. I’m not sure if I’m able to reproduce it right now as I’m able to get my job in right away in the debug queue and it’s possible I need to run a larger structure to ensure it’s working. I’ll come back and mention if it’s not working in that case.

Otherwise, it seems to be working as a normal scheduler and I’ll work on getting the more advanced features I’m hoping to include into this plugin working as well.

Okay I was able to run a larger system and it is now doing the same thing. The previous structure was so small and the debug queue was running right away that it didn’t have any wait time basically. Now I am getting the following output with the run command.

Warning: key ‘symmetries’ is not present in raw output dictionary
Error: ERROR_OUTPUT_STDOUT_INCOMPLETE
Error: Both the stdout and XML output files could not be read or parsed.
Warning: output parser returned exit code<305>: Both the stdout and XML output files could not be read or parsed.

Hi @nkeilbart

Guessing and judging with your explanation, I’d potentially check your implementation of get_jobs.
After submitting a job, AiiDA regularly checks the status of jobs via get_jobs. Once the status of any particular job id is marked as completed, it proceed with retrieving the files and parsing.
The errors you are seeing is from the parser, perhaps it raises either because those files do not exist, or they are partially written, meaning the job is not actually finished.

It could be that your jobs are falsly perceived as finished.
In the case of long jobs you notice, because obviously no output file is produced, or they are not completely written yet.
While in the case short job you don’t notice, probably because by the time it is perceived as finished, it is actually finished so you don’t get an error.

That’s all I can think of without seeing your code, it could be a wrong theory, which in that case I should then really see your implementation to understand what’s happening :slight_smile:

@ali-khosravi I appreciate the feedback. I’m working on getting approval to post this in an external git repository which shouldn’t take too long. I followed the output by printing out I think are key variables. This is what some of that looks like.

job=[‘f4Ccob2aKLnb’, ‘R’, ‘{“user”: {“uri”: “ssh://tioga21/var/tmp/keilbart/flux-jogBe3/local-0”}}’, ‘keilbart’, ‘1’, ‘64’, ‘tioga21’, ‘pdebug’, ‘1800.0’, ‘11.969725608825684’, ‘1744739449.578227’, ‘aiida-2492’, ‘1744739449.5373938’]

thisjob_dict={‘job_id’: ‘f4Ccob2aKLnb’, ‘state_raw’: ‘R’, ‘annotation’: ‘{“user”: {“uri”: “ssh://tioga21/var/tmp/keilbart/flux-jogBe3/local-0”}}’, ‘username’: ‘keilbart’, ‘number_nodes’: ‘1’, ‘number_cpus’: ‘64’, ‘allocated_machines’: ‘tioga21’, ‘partition’: ‘pdebug’, ‘time_limit’: ‘1800.0’, ‘time_used’: ‘11.969725608825684’, ‘dispatch_time’: ‘1744739449.578227’, ‘job_name’: ‘aiida-2492’,
‘submission_time’: ‘1744739449.5373938’}

job_state_string=<JobState.RUNNING: ‘running’>

job_list=[JobInfo({‘job_id’: ‘f4Ccob2aKLnb’, ‘annotation’: ‘{“user”: {“uri”: “ssh://tioga21/var/tmp/keilbart/flux-jogBe3/local-0”}}’, ‘job_state’: <JobState.RUNNING: ‘running’>, ‘job_owner’: ‘keilbart’, ‘num_machines’: 1, ‘num_mpiprocs’: 64, ‘allocated_machines_raw’: ‘tioga21’, ‘queue_name’: ‘pdebug’, ‘requested_wallclock_time_seconds’: ‘1800.0’, ‘wallclock_time_seconds’: ‘11.969725608825684’, ‘dispatch_time’: ‘1744739449.578227’, ‘submission_time’: ‘1744739449.5373938’})]

Warning: key ‘symmetries’ is not present in raw output dictionary
Error: ERROR_OUTPUT_STDOUT_INCOMPLETE
Error: Both the stdout and XML output files could not be read or parsed.
Warning: output parser returned exit code<305>: Both the stdout and XML output files could not be read or parsed.

From what I’m understanding, the last time the get_jobs() command is run in the scheduler.py it is finding my job submitted and currently in a running state. It only calls it once from what I can tell which tells me that the job_state is getting changed somewhere else? I was trying to make my way through the code but it was taking a bit and I thought there might be some other ideas on it.

If I run this through another cluster that is running slurm I don’t have any issues. I’ve set the output to debug for AiiDA and I see that it checks the job and would then schedule another request to update the CalcJob. This isn’t happening with the Flux server I am using. It seems to go to submitting a retrieve job right after checking the job. It does receive information from the get_jobs() function and has information stored in the jobs_cache as well.

Debug: Transport request closing transport for AuthInfo for keilbart1@llnl.gov on tioga
Info: updating CalcJob<2555> successful
Debug: Adding projection of node_1: [‘‘]
Debug: projections have become: [{’
’: {}}]
Debug: edge_tag chosen: main–node_1
Debug: Adding projection of main–node_1: [‘type’, ‘label’]
Debug: projections have become: [{‘type’: {}}, {‘label’: {}}]
Debug: projections data: {‘main’: , ‘node_1’: [{‘‘: {}}], ‘main–node_1’: [{‘type’: {}}, {‘label’: {}}]}
Debug: projection for main: []
Debug: projection for node_1: [{’
’: {}}]
Debug: Checking projections for edges: This is edge main–node_1 from node_1, with_outgoing of main
Debug: projection for main–node_1: [{‘type’: {}}, {‘label’: {}}]
Info: Process<2555>: Broadcasting state change: state_changed.waiting.waiting

This is a snippet of what the debug output is putting for me. There’s nothing in between updating the calcjob and changing the state that I can understand would cause the job state to be updated. On the other cluster when it sees there are no jobs it then updates which makes sense to me.

Any other thoughts would be appreciated. Otherwise, I’ll check back in when I have this on an external git repository.

Turns out to be a simple bug. I had thought I was stripping the \n from the job id string but I must have forgotten that day. That was causing issues later when it was attempting to check the job as it would return None and cause AiiDA to think the job had finished which then prompts it to parse the job. I now have a working scheduler similar to what Slurm can do.

I have been thinking of how to implement the queue portion. It seems that you suggested before submitting the builder that you would get the scheduler and make a queue. I’m wondering if this works as you can only create a personal queue once you have the flux allocation initiated. Unless the thought would be to put this python script inside of a flux submission script. If that’s the case it could work but I was hoping to make it more that it would submit for a flux allocation, once started it would get the information needed to connect to the allocation and then create the queue or run the individual jobs. What are the thoughts on that?

Super! then the theory worked :slight_smile:

The idea I had for the making and deleting the queue was just a suggestion, I’d say do it the whichever way is more convenient for you.

It all depends on how exactly you want it to be. The example I gave, was based on a scenario that I imagined: get an allocation, submit several things into it and then cancel the allocation.

If you want to keep the allocation alive for a while, then you may consider the same logic that @jusong.yu has implemented in GitHub - aiidateam/aiida-hyperqueue: AiiDA plugin for the HyperQueue metascheduler. and see if you can adopt that for Flux.