Jobs Router
Job management API endpoints including submission, querying, and status updates.
The Jobs router is composed of multiple sub-routers:
- Submission: Job submission endpoints
- Query: Job search and filtering
- Status: Job status management
- Sandboxes: Sandbox upload/download
Router
jobs
Attributes
Classes
Sub-Routers
Submission
Attributes
MAX_PARAMETRIC_JOBS = 20
module-attribute
Classes
JobID
pydantic-model
Functions
submit_jdl_jobs(job_definitions, job_db, job_logging_db, user_info, check_permissions, config)
async
Submit a list of jobs in JDL format.
Source code in diracx-routers/src/diracx/routers/jobs/submission.py
Query
Attributes
MAX_PER_PAGE = 10000
module-attribute
Classes
Functions
search(config, job_db, job_parameters_db, job_logging_db, user_info, check_permissions, response, page=1, per_page=100, body=None)
async
Creates a search query to the job database. This search can be based on different parameters, such as jobID, status, owner, etc.
Possibilities
- Use search to filter jobs based on various parameters (optional).
- Use parameters to specify which job parameters to return (optional).
- Use sort to order the results based on specific parameters (optional).
By default, the search will return all jobs the user has access to, and all the fields of the job will be returned.
Source code in diracx-routers/src/diracx/routers/jobs/query.py
summary(config, job_db, user_info, body, check_permissions)
async
Group jobs by a specific list of parameters. Returns an array of n-uplets, where each n-uplet contains the values of the grouping parameters and the number of jobs that match those values.
Body parameters:
- grouping: List of parameters to group the jobs by.
- search: List of search parameters to filter the jobs by (optional).
Source code in diracx-routers/src/diracx/routers/jobs/query.py
Status
Attributes
EXAMPLE_STATUS_UPDATES = {'Default': {'value': {1: {str(datetime.now(timezone.utc)): {'Status': 'Killed', 'MinorStatus': 'Marked as killed', 'ApplicationStatus': 'Job was killed by user', 'Source': 'User'}}, 2: {str(datetime.now(timezone.utc)): {'Status': 'Failed', 'MinorStatus': 'Timeout'}}}}, 'Structure of the request body': {'value': {'<job_id>': {'<timestamp>': {'Status': '<status>', 'MinorStatus': '<minor_status>', 'ApplicationStatus': '<application_status>', 'Source': '<source>'}}}}}
module-attribute
EXAMPLE_HEARTBEAT = {'Default': {'value': {1: {'LoadAverage': 2.5, 'MemoryUsed': 1024.0, 'Vsize': 2048.0, 'AvailableDiskSpace': 500.0, 'CPUConsumed': 75.0, 'WallClockTime': 3600.0, 'StandardOutput': 'Job is running smoothly.'}, 2: {'LoadAverage': 1.0, 'MemoryUsed': 512.0, 'Vsize': 1024.0, 'AvailableDiskSpace': 250.0, 'CPUConsumed': 50.0, 'WallClockTime': 1800.0, 'StandardOutput': 'Job is waiting for resources.'}}}, 'Structure of the request body': {'value': {'<job_id>': {'LoadAverage': 2.5, 'MemoryUsed': 1024.0, 'Vsize': 2048.0, 'AvailableDiskSpace': 500.0, 'CPUConsumed': 75.0, 'WallClockTime': 3600.0, 'StandardOutput': 'Job is running smoothly.'}}}}
module-attribute
EXAMPLE_RESCHEDULE = {'Default': {'value': {'job_ids': [1, 2, 3]}}, 'One job': {'value': {'job_ids': [1]}}}
module-attribute
EXAMPLE_METADATA = {'Default': {'value': {1: {'UserPriority': 2, 'HeartBeatTime': str(datetime.now(timezone.utc)), 'Status': 'Done', 'Site': 'Meyrin'}, 2: {'UserPriority': 1, 'HeartBeatTime': str(datetime.now(timezone.utc)), 'JobType': 'AnotherType'}}}}
module-attribute
Classes
Functions
set_job_statuses(job_update, config, job_db, job_logging_db, task_queue_db, job_parameters_db, check_permissions, force=False)
async
Set the status of a job or a list of jobs.
Body parameters:
- Status: The new status of the job.
- MinorStatus: The minor status of the job.
- ApplicationStatus: The application-specific status of the job.
- Source: The source of the status update (default is "Unknown").
Source code in diracx-routers/src/diracx/routers/jobs/status.py
add_heartbeat(data, config, job_db, job_logging_db, task_queue_db, job_parameters_db, check_permissions)
async
Register a heartbeat from the job.
This endpoint is used by the JobAgent to send heartbeats to the WMS and to receive job commands from the WMS. It also results in stalled jobs being restored to the RUNNING status.
The data parameter and return value are mappings keyed by job ID.
Source code in diracx-routers/src/diracx/routers/jobs/status.py
reschedule_jobs(job_ids, config, job_db, job_logging_db, task_queue_db, job_parameters_db, check_permissions, reset_jobs=False)
async
Reschedule a list of killed or failed jobs.
Body parameters:
- job_ids: List of job IDs to reschedule.
- reset_jobs: If True, reset the count of reschedules for the jobs.
Source code in diracx-routers/src/diracx/routers/jobs/status.py
patch_metadata(updates, job_db, job_parameters_db, check_permissions)
async
Update job metadata such as UserPriority, HeartBeatTime, JobType, etc. The argument are all the attributes/parameters of a job (except the ID).
Source code in diracx-routers/src/diracx/routers/jobs/status.py
Sandboxes
Attributes
MAX_SANDBOX_SIZE_BYTES = 100 * 1024 * 1024
module-attribute
Classes
Functions
initiate_sandbox_upload(user_info, sandbox_info, sandbox_metadata_db, settings, check_permissions)
async
Get the PFN for the given sandbox, initiate an upload as required.
If the sandbox already exists in the database then the PFN is returned and there is no "url" field in the response.
If the sandbox does not exist in the database then the "url" and "fields" should be used to upload the sandbox to the storage backend.
Source code in diracx-routers/src/diracx/routers/jobs/sandboxes.py
get_sandbox_file(pfn, settings, sandbox_metadata_db, user_info, check_permissions)
async
Get a presigned URL to download a sandbox file.
This route cannot use a redirect response most clients will also send the authorization header when following a redirect. This is not desirable as it would leak the authorization token to the storage backend. Additionally, most storage backends return an error when they receive an authorization header for a presigned URL.
Source code in diracx-routers/src/diracx/routers/jobs/sandboxes.py
get_job_sandboxes(job_id, sandbox_metadata_db, job_db, check_permissions)
async
Get input and output sandboxes of given job.
Source code in diracx-routers/src/diracx/routers/jobs/sandboxes.py
get_job_sandbox(job_id, sandbox_metadata_db, job_db, sandbox_type, check_permissions)
async
Get input or output sandbox of given job.
Source code in diracx-routers/src/diracx/routers/jobs/sandboxes.py
assign_sandbox_to_job(job_id, pfn, sandbox_metadata_db, job_db, settings, check_permissions)
async
Map the pfn as output sandbox to job.
Source code in diracx-routers/src/diracx/routers/jobs/sandboxes.py
unassign_job_sandboxes(job_id, sandbox_metadata_db, job_db, check_permissions)
async
Delete single job sandbox mapping.
Source code in diracx-routers/src/diracx/routers/jobs/sandboxes.py
unassign_bulk_jobs_sandboxes(job_ids, sandbox_metadata_db, job_db, check_permissions)
async
Delete bulk jobs sandbox mapping.
Source code in diracx-routers/src/diracx/routers/jobs/sandboxes.py
Access Policies
Attributes
CheckWMSPolicyCallable = Annotated[Callable, Depends(WMSAccessPolicy.check)]
module-attribute
CheckSandboxPolicyCallable = Annotated[Callable, Depends(SandboxAccessPolicy.check)]
module-attribute
Classes
ActionType
Bases: StrEnum
Source code in diracx-routers/src/diracx/routers/jobs/access_policies.py
Attributes
CREATE = auto()
class-attribute
instance-attribute
READ = auto()
class-attribute
instance-attribute
MANAGE = auto()
class-attribute
instance-attribute
QUERY = auto()
class-attribute
instance-attribute
PILOT = auto()
class-attribute
instance-attribute
WMSAccessPolicy
Bases: BaseAccessPolicy
Rules: * You need either NORMAL_USER or JOB_ADMINISTRATOR in your properties * An admin cannot create any resource but can read everything and modify everything * A NORMAL_USER can create * a NORMAL_USER can query and read only his own jobs.
Source code in diracx-routers/src/diracx/routers/jobs/access_policies.py
Functions
policy(policy_name, user_info, /, *, action=None, job_db=None, job_ids=None)
async
staticmethod
Source code in diracx-routers/src/diracx/routers/jobs/access_policies.py
SandboxAccessPolicy
Bases: BaseAccessPolicy
Policy for the sandbox. They are similar to the WMS access policies.