Skip to content

Jobs Router

Job management API endpoints including submission, querying, and status updates.

The Jobs router is composed of multiple sub-routers:

  • Submission: Job submission endpoints
  • Query: Job search and filtering
  • Status: Job status management
  • Sandboxes: Sandbox upload/download

Router

jobs

Attributes

Classes

Sub-Routers

Submission

Attributes

MAX_PARAMETRIC_JOBS = 20 module-attribute

Classes

JobID pydantic-model

Bases: BaseModel

Fields:

Source code in diracx-routers/src/diracx/routers/jobs/submission.py
class JobID(BaseModel):
    job_id: int

Attributes

job_id pydantic-field

Functions

submit_jdl_jobs(job_definitions, job_db, job_logging_db, user_info, check_permissions, config) async

Submit a list of jobs in JDL format.

Source code in diracx-routers/src/diracx/routers/jobs/submission.py
@router.post("/jdl")
async def submit_jdl_jobs(
    job_definitions: Annotated[list[str], Body(openapi_examples=EXAMPLE_JDLS)],
    job_db: JobDB,
    job_logging_db: JobLoggingDB,
    user_info: Annotated[AuthorizedUserInfo, Depends(verify_dirac_access_token)],
    check_permissions: CheckWMSPolicyCallable,
    config: Config,
) -> list[InsertedJob]:
    """Submit a list of jobs in JDL format."""
    await check_permissions(action=ActionType.CREATE, job_db=job_db)

    try:
        inserted_jobs = await submit_jdl_jobs_bl(
            job_definitions, job_db, job_logging_db, user_info, config
        )
    except ValueError as e:
        raise HTTPException(
            status_code=HTTPStatus.BAD_REQUEST,
            detail=str(e),
        ) from e
    return inserted_jobs

Query

Attributes

MAX_PER_PAGE = 10000 module-attribute

Classes

Functions

search(config, job_db, job_parameters_db, job_logging_db, user_info, check_permissions, response, page=1, per_page=100, body=None) async

Creates a search query to the job database. This search can be based on different parameters, such as jobID, status, owner, etc.

Possibilities - Use search to filter jobs based on various parameters (optional). - Use parameters to specify which job parameters to return (optional). - Use sort to order the results based on specific parameters (optional).

By default, the search will return all jobs the user has access to, and all the fields of the job will be returned.

Source code in diracx-routers/src/diracx/routers/jobs/query.py
@router.post("/search", responses=EXAMPLE_SEARCH_RESPONSES)
async def search(
    config: Config,
    job_db: JobDB,
    job_parameters_db: JobParametersDB,
    job_logging_db: JobLoggingDB,
    user_info: Annotated[AuthorizedUserInfo, Depends(verify_dirac_access_token)],
    check_permissions: CheckWMSPolicyCallable,
    response: Response,
    page: int = 1,
    per_page: int = 100,
    body: Annotated[
        SearchParams | None, Body(openapi_examples=EXAMPLE_SEARCHES)
    ] = None,
) -> list[dict[str, Any]]:
    """Creates a search query to the job database. This search can be based on
    different parameters, such as jobID, status, owner, etc.

    **Possibilities**
    - Use `search` to filter jobs based on various parameters (optional).
    - Use `parameters` to specify which job parameters to return (optional).
    - Use `sort` to order the results based on specific parameters (optional).

    By default, the search will return all jobs the user has access to, and all the fields
    of the job will be returned.
    """
    await check_permissions(action=ActionType.QUERY, job_db=job_db)

    preferred_username: str | None = user_info.preferred_username
    if JOB_ADMINISTRATOR in user_info.properties:
        preferred_username = None

    total, jobs = await search_bl(
        config=config,
        job_db=job_db,
        job_parameters_db=job_parameters_db,
        job_logging_db=job_logging_db,
        preferred_username=preferred_username,
        vo=user_info.vo,
        page=page,
        per_page=per_page,
        body=body,
    )

    # Set the Content-Range header if needed
    # https://datatracker.ietf.org/doc/html/rfc7233#section-4

    # No jobs found but there are jobs for the requested search
    # https://datatracker.ietf.org/doc/html/rfc7233#section-4.4
    if len(jobs) == 0 and total > 0:
        response.headers["Content-Range"] = f"jobs */{total}"
        response.status_code = HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE

    # The total number of jobs is greater than the number of jobs returned
    # https://datatracker.ietf.org/doc/html/rfc7233#section-4.2
    elif len(jobs) < total:
        first_idx = per_page * (page - 1)
        last_idx = min(first_idx + len(jobs), total) - 1 if total > 0 else 0
        response.headers["Content-Range"] = f"jobs {first_idx}-{last_idx}/{total}"
        response.status_code = HTTPStatus.PARTIAL_CONTENT
    return jobs

summary(config, job_db, user_info, body, check_permissions) async

Group jobs by a specific list of parameters. Returns an array of n-uplets, where each n-uplet contains the values of the grouping parameters and the number of jobs that match those values.

Body parameters: - grouping: List of parameters to group the jobs by. - search: List of search parameters to filter the jobs by (optional).

Source code in diracx-routers/src/diracx/routers/jobs/query.py
@router.post("/summary", responses=EXAMPLE_SUMMARY_RESPONSES)
async def summary(
    config: Config,
    job_db: JobDB,
    user_info: Annotated[AuthorizedUserInfo, Depends(verify_dirac_access_token)],
    body: SummaryParams,
    check_permissions: CheckWMSPolicyCallable,
):
    """Group jobs by a specific list of parameters. Returns an array of n-uplets, where each n-uplet contains the
    values of the grouping parameters and the number of jobs that match those values.

    Body parameters:
    - `grouping`: List of parameters to group the jobs by.
    - `search`: List of search parameters to filter the jobs by (optional).

    """
    await check_permissions(action=ActionType.QUERY, job_db=job_db)

    preferred_username: str | None = user_info.preferred_username
    if JOB_ADMINISTRATOR in user_info.properties:
        preferred_username = None

    return await summary_bl(
        config=config,
        job_db=job_db,
        preferred_username=preferred_username,
        vo=user_info.vo,
        body=body,
    )

Status

Attributes

EXAMPLE_STATUS_UPDATES = {'Default': {'value': {1: {str(datetime.now(timezone.utc)): {'Status': 'Killed', 'MinorStatus': 'Marked as killed', 'ApplicationStatus': 'Job was killed by user', 'Source': 'User'}}, 2: {str(datetime.now(timezone.utc)): {'Status': 'Failed', 'MinorStatus': 'Timeout'}}}}, 'Structure of the request body': {'value': {'<job_id>': {'<timestamp>': {'Status': '<status>', 'MinorStatus': '<minor_status>', 'ApplicationStatus': '<application_status>', 'Source': '<source>'}}}}} module-attribute

EXAMPLE_HEARTBEAT = {'Default': {'value': {1: {'LoadAverage': 2.5, 'MemoryUsed': 1024.0, 'Vsize': 2048.0, 'AvailableDiskSpace': 500.0, 'CPUConsumed': 75.0, 'WallClockTime': 3600.0, 'StandardOutput': 'Job is running smoothly.'}, 2: {'LoadAverage': 1.0, 'MemoryUsed': 512.0, 'Vsize': 1024.0, 'AvailableDiskSpace': 250.0, 'CPUConsumed': 50.0, 'WallClockTime': 1800.0, 'StandardOutput': 'Job is waiting for resources.'}}}, 'Structure of the request body': {'value': {'<job_id>': {'LoadAverage': 2.5, 'MemoryUsed': 1024.0, 'Vsize': 2048.0, 'AvailableDiskSpace': 500.0, 'CPUConsumed': 75.0, 'WallClockTime': 3600.0, 'StandardOutput': 'Job is running smoothly.'}}}} module-attribute

EXAMPLE_RESCHEDULE = {'Default': {'value': {'job_ids': [1, 2, 3]}}, 'One job': {'value': {'job_ids': [1]}}} module-attribute

EXAMPLE_METADATA = {'Default': {'value': {1: {'UserPriority': 2, 'HeartBeatTime': str(datetime.now(timezone.utc)), 'Status': 'Done', 'Site': 'Meyrin'}, 2: {'UserPriority': 1, 'HeartBeatTime': str(datetime.now(timezone.utc)), 'JobType': 'AnotherType'}}}} module-attribute

Classes

Functions

set_job_statuses(job_update, config, job_db, job_logging_db, task_queue_db, job_parameters_db, check_permissions, force=False) async

Set the status of a job or a list of jobs.

Body parameters: - Status: The new status of the job. - MinorStatus: The minor status of the job. - ApplicationStatus: The application-specific status of the job. - Source: The source of the status update (default is "Unknown").

Source code in diracx-routers/src/diracx/routers/jobs/status.py
@router.patch("/status")
async def set_job_statuses(
    job_update: Annotated[
        dict[int, dict[datetime, JobStatusUpdate]],
        Body(openapi_examples=EXAMPLE_STATUS_UPDATES),
    ],
    config: Config,
    job_db: JobDB,
    job_logging_db: JobLoggingDB,
    task_queue_db: TaskQueueDB,
    job_parameters_db: JobParametersDB,
    check_permissions: CheckWMSPolicyCallable,
    force: bool = False,
) -> SetJobStatusReturn:
    """Set the status of a job or a list of jobs.

    Body parameters:
    - `Status`: The new status of the job.
    - `MinorStatus`: The minor status of the job.
    - `ApplicationStatus`: The application-specific status of the job.
    - `Source`: The source of the status update (default is "Unknown").
    """
    await check_permissions(
        action=ActionType.MANAGE, job_db=job_db, job_ids=list(job_update)
    )

    try:
        result = await set_job_statuses_bl(
            status_changes=job_update,
            config=config,
            job_db=job_db,
            job_logging_db=job_logging_db,
            task_queue_db=task_queue_db,
            job_parameters_db=job_parameters_db,
            force=force,
        )
    except ValueError as e:
        raise HTTPException(
            status_code=HTTPStatus.BAD_REQUEST,
            detail=str(e),
        ) from e

    if not result.success:
        raise HTTPException(
            status_code=HTTPStatus.NOT_FOUND,
            detail=result.model_dump(),
        )

    return result

add_heartbeat(data, config, job_db, job_logging_db, task_queue_db, job_parameters_db, check_permissions) async

Register a heartbeat from the job.

This endpoint is used by the JobAgent to send heartbeats to the WMS and to receive job commands from the WMS. It also results in stalled jobs being restored to the RUNNING status.

The data parameter and return value are mappings keyed by job ID.

Source code in diracx-routers/src/diracx/routers/jobs/status.py
@router.patch("/heartbeat")
async def add_heartbeat(
    data: Annotated[dict[int, HeartbeatData], Body(openapi_examples=EXAMPLE_HEARTBEAT)],
    config: Config,
    job_db: JobDB,
    job_logging_db: JobLoggingDB,
    task_queue_db: TaskQueueDB,
    job_parameters_db: JobParametersDB,
    check_permissions: CheckWMSPolicyCallable,
) -> list[JobCommand]:
    """Register a heartbeat from the job.

    This endpoint is used by the JobAgent to send heartbeats to the WMS and to
    receive job commands from the WMS. It also results in stalled jobs being
    restored to the RUNNING status.

    The `data` parameter and return value are mappings keyed by job ID.
    """
    await check_permissions(action=ActionType.PILOT, job_db=job_db, job_ids=list(data))

    await add_heartbeat_bl(
        data, config, job_db, job_logging_db, task_queue_db, job_parameters_db
    )
    return await get_job_commands_bl(data, job_db)

reschedule_jobs(job_ids, config, job_db, job_logging_db, task_queue_db, job_parameters_db, check_permissions, reset_jobs=False) async

Reschedule a list of killed or failed jobs.

Body parameters: - job_ids: List of job IDs to reschedule. - reset_jobs: If True, reset the count of reschedules for the jobs.

Source code in diracx-routers/src/diracx/routers/jobs/status.py
@router.post(
    "/reschedule",
    openapi_extra={
        "requestBody": {
            "required": True,
            "content": {"application/json": {"examples": EXAMPLE_RESCHEDULE}},
        }
    },
)
async def reschedule_jobs(
    job_ids: Annotated[list[int], Body(embed=True)],
    config: Config,
    job_db: JobDB,
    job_logging_db: JobLoggingDB,
    task_queue_db: TaskQueueDB,
    job_parameters_db: JobParametersDB,
    check_permissions: CheckWMSPolicyCallable,
    reset_jobs: Annotated[bool, Query()] = False,
) -> dict[str, Any]:
    """Reschedule a list of killed or failed jobs.

    Body parameters:
    - `job_ids`: List of job IDs to reschedule.
    - `reset_jobs`: If True, reset the count of reschedules for the jobs.

    """
    await check_permissions(action=ActionType.MANAGE, job_db=job_db, job_ids=job_ids)

    resched_jobs = await reschedule_jobs_bl(
        job_ids,
        config,
        job_db,
        job_logging_db,
        task_queue_db,
        job_parameters_db,
        reset_jobs=reset_jobs,
    )

    if not resched_jobs.get("success", []):
        raise HTTPException(
            status_code=HTTPStatus.BAD_REQUEST,
            detail=resched_jobs,
        )

    # TODO: send jobs to OtimizationMind
    #  self.__sendJobsToOptimizationMind(validJobList)

    return resched_jobs

patch_metadata(updates, job_db, job_parameters_db, check_permissions) async

Update job metadata such as UserPriority, HeartBeatTime, JobType, etc. The argument are all the attributes/parameters of a job (except the ID).

Source code in diracx-routers/src/diracx/routers/jobs/status.py
@router.patch("/metadata", status_code=HTTPStatus.NO_CONTENT)
async def patch_metadata(
    updates: Annotated[dict[int, JobMetaData], Body(openapi_examples=EXAMPLE_METADATA)],
    job_db: JobDB,
    job_parameters_db: JobParametersDB,
    check_permissions: CheckWMSPolicyCallable,
):
    """Update job metadata such as UserPriority, HeartBeatTime, JobType, etc.
    The argument  are all the attributes/parameters of a job (except the ID).
    """
    await check_permissions(action=ActionType.MANAGE, job_db=job_db, job_ids=updates)
    try:
        await set_job_parameters_or_attributes_bl(updates, job_db, job_parameters_db)
    except ValueError as e:
        raise HTTPException(
            status_code=HTTPStatus.BAD_REQUEST,
            detail=str(e),
        ) from e

Sandboxes

Attributes

MAX_SANDBOX_SIZE_BYTES = 100 * 1024 * 1024 module-attribute

Classes

Functions

initiate_sandbox_upload(user_info, sandbox_info, sandbox_metadata_db, settings, check_permissions) async

Get the PFN for the given sandbox, initiate an upload as required.

If the sandbox already exists in the database then the PFN is returned and there is no "url" field in the response.

If the sandbox does not exist in the database then the "url" and "fields" should be used to upload the sandbox to the storage backend.

Source code in diracx-routers/src/diracx/routers/jobs/sandboxes.py
@router.post("/sandbox")
async def initiate_sandbox_upload(
    user_info: Annotated[AuthorizedUserInfo, Depends(verify_dirac_access_token)],
    sandbox_info: SandboxInfo,
    sandbox_metadata_db: SandboxMetadataDB,
    settings: SandboxStoreSettings,
    check_permissions: CheckSandboxPolicyCallable,
) -> SandboxUploadResponse:
    """Get the PFN for the given sandbox, initiate an upload as required.

    If the sandbox already exists in the database then the PFN is returned
    and there is no "url" field in the response.

    If the sandbox does not exist in the database then the "url" and "fields"
    should be used to upload the sandbox to the storage backend.
    """
    await check_permissions(
        action=ActionType.CREATE, sandbox_metadata_db=sandbox_metadata_db
    )

    try:
        sandbox_upload_response = await initiate_sandbox_upload_bl(
            user_info, sandbox_info, sandbox_metadata_db, settings
        )
    except ValueError as e:
        raise HTTPException(
            status_code=HTTPStatus.BAD_REQUEST,
            detail=str(e),
        ) from e
    return sandbox_upload_response

get_sandbox_file(pfn, settings, sandbox_metadata_db, user_info, check_permissions) async

Get a presigned URL to download a sandbox file.

This route cannot use a redirect response most clients will also send the authorization header when following a redirect. This is not desirable as it would leak the authorization token to the storage backend. Additionally, most storage backends return an error when they receive an authorization header for a presigned URL.

Source code in diracx-routers/src/diracx/routers/jobs/sandboxes.py
@router.get("/sandbox")
async def get_sandbox_file(
    pfn: Annotated[str, Query(max_length=256, pattern=SANDBOX_PFN_REGEX)],
    settings: SandboxStoreSettings,
    sandbox_metadata_db: SandboxMetadataDB,
    user_info: Annotated[AuthorizedUserInfo, Depends(verify_dirac_access_token)],
    check_permissions: CheckSandboxPolicyCallable,
) -> SandboxDownloadResponse:
    """Get a presigned URL to download a sandbox file.

    This route cannot use a redirect response most clients will also send the
    authorization header when following a redirect. This is not desirable as
    it would leak the authorization token to the storage backend. Additionally,
    most storage backends return an error when they receive an authorization
    header for a presigned URL.
    """
    short_pfn = pfn.split("|", 1)[-1]
    required_prefix = (
        "/"
        + f"S3/{settings.bucket_name}/{user_info.vo}/{user_info.dirac_group}/{user_info.preferred_username}"
        + "/"
    )
    await check_permissions(
        action=ActionType.READ,
        sandbox_metadata_db=sandbox_metadata_db,
        pfns=[short_pfn],
        required_prefix=required_prefix,
        se_name=settings.se_name,
    )

    return await get_sandbox_file_bl(pfn, sandbox_metadata_db, settings)

get_job_sandboxes(job_id, sandbox_metadata_db, job_db, check_permissions) async

Get input and output sandboxes of given job.

Source code in diracx-routers/src/diracx/routers/jobs/sandboxes.py
@router.get("/{job_id}/sandbox")
async def get_job_sandboxes(
    job_id: int,
    sandbox_metadata_db: SandboxMetadataDB,
    job_db: JobDB,
    check_permissions: CheckWMSPolicyCallable,
) -> dict[str, list[Any]]:
    """Get input and output sandboxes of given job."""
    await check_permissions(action=ActionType.READ, job_db=job_db, job_ids=[job_id])
    return await get_job_sandboxes_bl(job_id, sandbox_metadata_db)

get_job_sandbox(job_id, sandbox_metadata_db, job_db, sandbox_type, check_permissions) async

Get input or output sandbox of given job.

Source code in diracx-routers/src/diracx/routers/jobs/sandboxes.py
@router.get("/{job_id}/sandbox/{sandbox_type}")
async def get_job_sandbox(
    job_id: int,
    sandbox_metadata_db: SandboxMetadataDB,
    job_db: JobDB,
    sandbox_type: Literal["input", "output"],
    check_permissions: CheckWMSPolicyCallable,
) -> list[Any]:
    """Get input or output sandbox of given job."""
    await check_permissions(action=ActionType.READ, job_db=job_db, job_ids=[job_id])
    return await get_job_sandbox_bl(job_id, sandbox_metadata_db, sandbox_type)

assign_sandbox_to_job(job_id, pfn, sandbox_metadata_db, job_db, settings, check_permissions) async

Map the pfn as output sandbox to job.

Source code in diracx-routers/src/diracx/routers/jobs/sandboxes.py
@router.patch("/{job_id}/sandbox/output")
async def assign_sandbox_to_job(
    job_id: int,
    pfn: Annotated[str, Body(max_length=256, pattern=SANDBOX_PFN_REGEX)],
    sandbox_metadata_db: SandboxMetadataDB,
    job_db: JobDB,
    settings: SandboxStoreSettings,
    check_permissions: CheckWMSPolicyCallable,
):
    """Map the pfn as output sandbox to job."""
    await check_permissions(action=ActionType.MANAGE, job_db=job_db, job_ids=[job_id])
    try:
        await assign_sandbox_to_job_bl(job_id, pfn, sandbox_metadata_db, settings)
    except SandboxNotFoundError as e:
        raise HTTPException(
            status_code=HTTPStatus.BAD_REQUEST, detail="Sandbox not found"
        ) from e
    except (SandboxAlreadyAssignedError, AssertionError) as e:
        raise HTTPException(
            status_code=HTTPStatus.BAD_REQUEST, detail="Sandbox already assigned"
        ) from e

unassign_job_sandboxes(job_id, sandbox_metadata_db, job_db, check_permissions) async

Delete single job sandbox mapping.

Source code in diracx-routers/src/diracx/routers/jobs/sandboxes.py
@router.delete("/{job_id}/sandbox")
async def unassign_job_sandboxes(
    job_id: int,
    sandbox_metadata_db: SandboxMetadataDB,
    job_db: JobDB,
    check_permissions: CheckWMSPolicyCallable,
):
    """Delete single job sandbox mapping."""
    await check_permissions(action=ActionType.MANAGE, job_db=job_db, job_ids=[job_id])
    await unassign_jobs_sandboxes_bl([job_id], sandbox_metadata_db)

unassign_bulk_jobs_sandboxes(job_ids, sandbox_metadata_db, job_db, check_permissions) async

Delete bulk jobs sandbox mapping.

Source code in diracx-routers/src/diracx/routers/jobs/sandboxes.py
@router.post(
    "/sandbox/unassign",
    status_code=HTTPStatus.NO_CONTENT,
    openapi_extra={
        "requestBody": {
            "required": True,
            "content": {"application/json": {"examples": EXAMPLE_UNASSIGN}},
        }
    },
)
async def unassign_bulk_jobs_sandboxes(
    job_ids: Annotated[list[int], Body(embed=True)],
    sandbox_metadata_db: SandboxMetadataDB,
    job_db: JobDB,
    check_permissions: CheckWMSPolicyCallable,
):
    """Delete bulk jobs sandbox mapping."""
    await check_permissions(action=ActionType.MANAGE, job_db=job_db, job_ids=job_ids)
    await unassign_jobs_sandboxes_bl(job_ids, sandbox_metadata_db)

Access Policies

Attributes

CheckWMSPolicyCallable = Annotated[Callable, Depends(WMSAccessPolicy.check)] module-attribute

CheckSandboxPolicyCallable = Annotated[Callable, Depends(SandboxAccessPolicy.check)] module-attribute

Classes

ActionType

Bases: StrEnum

Source code in diracx-routers/src/diracx/routers/jobs/access_policies.py
class ActionType(StrEnum):
    # Create a job or a sandbox
    CREATE = auto()
    # Check job status, download a sandbox
    READ = auto()
    # Delete, kill, remove, set status, etc of a job
    # Delete or assign a sandbox
    MANAGE = auto()
    # Search
    QUERY = auto()
    # Actions from a pilot (e.g. heartbeat)
    PILOT = auto()

Attributes

CREATE = auto() class-attribute instance-attribute
READ = auto() class-attribute instance-attribute
MANAGE = auto() class-attribute instance-attribute
QUERY = auto() class-attribute instance-attribute
PILOT = auto() class-attribute instance-attribute

WMSAccessPolicy

Bases: BaseAccessPolicy

Rules: * You need either NORMAL_USER or JOB_ADMINISTRATOR in your properties * An admin cannot create any resource but can read everything and modify everything * A NORMAL_USER can create * a NORMAL_USER can query and read only his own jobs.

Source code in diracx-routers/src/diracx/routers/jobs/access_policies.py
class WMSAccessPolicy(BaseAccessPolicy):
    """Rules:
    * You need either NORMAL_USER or JOB_ADMINISTRATOR in your properties
    * An admin cannot create any resource but can read everything and modify everything
    * A NORMAL_USER can create
    * a NORMAL_USER can query and read only his own jobs.
    """

    @staticmethod
    async def policy(
        policy_name: str,
        user_info: AuthorizedUserInfo,
        /,
        *,
        action: ActionType | None = None,
        job_db: JobDB | None = None,
        job_ids: list[int] | None = None,
    ):
        assert action, "action is a mandatory parameter"
        assert job_db, "job_db is a mandatory parameter"

        if action == ActionType.PILOT:
            # TODO: For now we map this to MANAGE but it should be changed once
            # we have pilot credentials
            action = ActionType.MANAGE

        if action == ActionType.CREATE:
            if job_ids is not None:
                raise NotImplementedError(
                    "job_ids is not None with ActionType.CREATE. This shouldn't happen"
                )
            if NORMAL_USER not in user_info.properties:
                raise HTTPException(status.HTTP_403_FORBIDDEN)
            return

        if GENERIC_PILOT in user_info.properties and action == ActionType.MANAGE:
            # Authorize pilots
            return

        if JOB_ADMINISTRATOR in user_info.properties:
            return

        if NORMAL_USER not in user_info.properties:
            raise HTTPException(status.HTTP_403_FORBIDDEN)

        if action == ActionType.QUERY:
            if job_ids is not None:
                raise NotImplementedError(
                    "job_ids is not None with ActionType.QUERY. This shouldn't happen"
                )
            return

        if job_ids is None:
            raise NotImplementedError("job_ids is None. his shouldn't happen")

        # TODO: check the CS global job monitoring flag

        # Now we know we are either in READ/MODIFY for a NORMAL_USER
        # so just make sure that whatever job_id was given belongs
        # to the current user
        job_owners = await job_db.summary(
            ["Owner", "VO"],
            [
                {
                    "parameter": "JobID",
                    "operator": VectorSearchOperator.IN,
                    "values": job_ids,
                }
            ],
        )

        expected_owner = {
            "Owner": user_info.preferred_username,
            "VO": user_info.vo,
            "count": len(set(job_ids)),
        }
        # All the jobs belong to the user doing the query
        # and all of them are present
        if job_owners == [expected_owner]:
            return

        raise HTTPException(status.HTTP_403_FORBIDDEN)

Functions

policy(policy_name, user_info, /, *, action=None, job_db=None, job_ids=None) async staticmethod
Source code in diracx-routers/src/diracx/routers/jobs/access_policies.py
@staticmethod
async def policy(
    policy_name: str,
    user_info: AuthorizedUserInfo,
    /,
    *,
    action: ActionType | None = None,
    job_db: JobDB | None = None,
    job_ids: list[int] | None = None,
):
    assert action, "action is a mandatory parameter"
    assert job_db, "job_db is a mandatory parameter"

    if action == ActionType.PILOT:
        # TODO: For now we map this to MANAGE but it should be changed once
        # we have pilot credentials
        action = ActionType.MANAGE

    if action == ActionType.CREATE:
        if job_ids is not None:
            raise NotImplementedError(
                "job_ids is not None with ActionType.CREATE. This shouldn't happen"
            )
        if NORMAL_USER not in user_info.properties:
            raise HTTPException(status.HTTP_403_FORBIDDEN)
        return

    if GENERIC_PILOT in user_info.properties and action == ActionType.MANAGE:
        # Authorize pilots
        return

    if JOB_ADMINISTRATOR in user_info.properties:
        return

    if NORMAL_USER not in user_info.properties:
        raise HTTPException(status.HTTP_403_FORBIDDEN)

    if action == ActionType.QUERY:
        if job_ids is not None:
            raise NotImplementedError(
                "job_ids is not None with ActionType.QUERY. This shouldn't happen"
            )
        return

    if job_ids is None:
        raise NotImplementedError("job_ids is None. his shouldn't happen")

    # TODO: check the CS global job monitoring flag

    # Now we know we are either in READ/MODIFY for a NORMAL_USER
    # so just make sure that whatever job_id was given belongs
    # to the current user
    job_owners = await job_db.summary(
        ["Owner", "VO"],
        [
            {
                "parameter": "JobID",
                "operator": VectorSearchOperator.IN,
                "values": job_ids,
            }
        ],
    )

    expected_owner = {
        "Owner": user_info.preferred_username,
        "VO": user_info.vo,
        "count": len(set(job_ids)),
    }
    # All the jobs belong to the user doing the query
    # and all of them are present
    if job_owners == [expected_owner]:
        return

    raise HTTPException(status.HTTP_403_FORBIDDEN)

SandboxAccessPolicy

Bases: BaseAccessPolicy

Policy for the sandbox. They are similar to the WMS access policies.

Source code in diracx-routers/src/diracx/routers/jobs/access_policies.py
class SandboxAccessPolicy(BaseAccessPolicy):
    """Policy for the sandbox.
    They are similar to the WMS access policies.
    """

    @staticmethod
    async def policy(
        policy_name: str,
        user_info: AuthorizedUserInfo,
        /,
        *,
        action: ActionType | None = None,
        sandbox_metadata_db: SandboxMetadataDB | None = None,
        pfns: list[str] | None = None,
        required_prefix: str | None = None,
        se_name: str | None = None,
    ):
        assert action, "action is a mandatory parameter"
        assert sandbox_metadata_db, "sandbox_metadata_db is a mandatory parameter"

        if action == ActionType.CREATE:
            if NORMAL_USER not in user_info.properties:
                raise HTTPException(status.HTTP_403_FORBIDDEN)
            return

        if JOB_ADMINISTRATOR in user_info.properties:
            return

        if NORMAL_USER not in user_info.properties:
            raise HTTPException(status.HTTP_403_FORBIDDEN)

        # Getting a sandbox or modifying it
        if pfns:
            if required_prefix is None:
                raise NotImplementedError(
                    "required_prefix is None. This shouldn't happen"
                )
            if se_name is None:
                raise NotImplementedError("se_name is None. This shouldn't happen")
            for pfn in pfns:
                if not pfn.startswith(required_prefix):
                    raise HTTPException(
                        status_code=status.HTTP_403_FORBIDDEN,
                        detail=f"Invalid PFN. PFN must start with {required_prefix}",
                    )
                # Checking if the user owns the sandbox
                owner_id = await sandbox_metadata_db.get_owner_id(user_info)
                sandbox_owner_id = await sandbox_metadata_db.get_sandbox_owner_id(
                    pfn, se_name
                )
                if not owner_id or owner_id != sandbox_owner_id:
                    raise HTTPException(
                        status_code=status.HTTP_403_FORBIDDEN,
                        detail=f"{user_info.preferred_username} is not the owner of the sandbox",
                    )

Functions

policy(policy_name, user_info, /, *, action=None, sandbox_metadata_db=None, pfns=None, required_prefix=None, se_name=None) async staticmethod
Source code in diracx-routers/src/diracx/routers/jobs/access_policies.py
@staticmethod
async def policy(
    policy_name: str,
    user_info: AuthorizedUserInfo,
    /,
    *,
    action: ActionType | None = None,
    sandbox_metadata_db: SandboxMetadataDB | None = None,
    pfns: list[str] | None = None,
    required_prefix: str | None = None,
    se_name: str | None = None,
):
    assert action, "action is a mandatory parameter"
    assert sandbox_metadata_db, "sandbox_metadata_db is a mandatory parameter"

    if action == ActionType.CREATE:
        if NORMAL_USER not in user_info.properties:
            raise HTTPException(status.HTTP_403_FORBIDDEN)
        return

    if JOB_ADMINISTRATOR in user_info.properties:
        return

    if NORMAL_USER not in user_info.properties:
        raise HTTPException(status.HTTP_403_FORBIDDEN)

    # Getting a sandbox or modifying it
    if pfns:
        if required_prefix is None:
            raise NotImplementedError(
                "required_prefix is None. This shouldn't happen"
            )
        if se_name is None:
            raise NotImplementedError("se_name is None. This shouldn't happen")
        for pfn in pfns:
            if not pfn.startswith(required_prefix):
                raise HTTPException(
                    status_code=status.HTTP_403_FORBIDDEN,
                    detail=f"Invalid PFN. PFN must start with {required_prefix}",
                )
            # Checking if the user owns the sandbox
            owner_id = await sandbox_metadata_db.get_owner_id(user_info)
            sandbox_owner_id = await sandbox_metadata_db.get_sandbox_owner_id(
                pfn, se_name
            )
            if not owner_id or owner_id != sandbox_owner_id:
                raise HTTPException(
                    status_code=status.HTTP_403_FORBIDDEN,
                    detail=f"{user_info.preferred_username} is not the owner of the sandbox",
                )

Legacy