Skip to content

Job Database

Job database models and access layer.

Database Schema

schema

Attributes

JobDBBase = declarative_base() module-attribute

Classes

AccountedFlagEnum

Bases: TypeDecorator

Maps a AccountedFlagEnum() column to True/False in Python.

Source code in diracx-db/src/diracx/db/sql/job/schema.py
class AccountedFlagEnum(types.TypeDecorator):
    """Maps a ``AccountedFlagEnum()`` column to True/False in Python."""

    impl = types.Enum("True", "False", "Failed", name="accounted_flag_enum")
    cache_ok = True

    def process_bind_param(self, value, dialect) -> str:
        if value is True:
            return "True"
        elif value is False:
            return "False"
        elif value == "Failed":
            return "Failed"
        else:
            raise NotImplementedError(value, dialect)

    def process_result_value(self, value, dialect) -> bool | str:
        if value == "True":
            return True
        elif value == "False":
            return False
        elif value == "Failed":
            return "Failed"
        else:
            raise NotImplementedError(f"Unknown {value=}")
Attributes
impl = types.Enum('True', 'False', 'Failed', name='accounted_flag_enum') class-attribute instance-attribute
cache_ok = True class-attribute instance-attribute
Functions
process_bind_param(value, dialect)
Source code in diracx-db/src/diracx/db/sql/job/schema.py
def process_bind_param(self, value, dialect) -> str:
    if value is True:
        return "True"
    elif value is False:
        return "False"
    elif value == "Failed":
        return "Failed"
    else:
        raise NotImplementedError(value, dialect)
process_result_value(value, dialect)
Source code in diracx-db/src/diracx/db/sql/job/schema.py
def process_result_value(self, value, dialect) -> bool | str:
    if value == "True":
        return True
    elif value == "False":
        return False
    elif value == "Failed":
        return "Failed"
    else:
        raise NotImplementedError(f"Unknown {value=}")

Jobs

Bases: JobDBBase

Source code in diracx-db/src/diracx/db/sql/job/schema.py
class Jobs(JobDBBase):
    __tablename__ = "Jobs"

    job_id = Column(
        "JobID",
        Integer,
        ForeignKey("JobJDLs.JobID", ondelete="CASCADE"),
        primary_key=True,
        default=0,
    )
    job_type = Column("JobType", String(32), default="user")
    job_group = Column("JobGroup", String(32), default="00000000")
    site = Column("Site", String(100), default="ANY")
    job_name = Column("JobName", String(128), default="Unknown")
    owner = Column("Owner", String(64), default="Unknown")
    owner_group = Column("OwnerGroup", String(128), default="Unknown")
    vo = Column("VO", String(32))
    submission_time = NullColumn(
        "SubmissionTime",
        SmarterDateTime(),
    )
    reschedule_time = NullColumn(
        "RescheduleTime",
        SmarterDateTime(),
    )
    last_update_time = NullColumn(
        "LastUpdateTime",
        SmarterDateTime(),
    )
    start_exec_time = NullColumn(
        "StartExecTime",
        SmarterDateTime(),
    )
    heart_beat_time = NullColumn(
        "HeartBeatTime",
        SmarterDateTime(),
    )
    end_exec_time = NullColumn(
        "EndExecTime",
        SmarterDateTime(),
    )
    status = Column("Status", String(32), default="Received")
    minor_status = Column("MinorStatus", String(128), default="Unknown")
    application_status = Column("ApplicationStatus", String(255), default="Unknown")
    user_priority = Column("UserPriority", Integer, default=0)
    reschedule_counter = Column("RescheduleCounter", Integer, default=0)
    verified_flag = Column("VerifiedFlag", EnumBackedBool(), default=False)
    accounted_flag = Column("AccountedFlag", AccountedFlagEnum(), default=False)

    __table_args__ = (
        Index("JobType", "JobType"),
        Index("JobGroup", "JobGroup"),
        Index("Site", "Site"),
        Index("Owner", "Owner"),
        Index("OwnerGroup", "OwnerGroup"),
        Index("Status", "Status"),
        Index("MinorStatus", "MinorStatus"),
        Index("ApplicationStatus", "ApplicationStatus"),
        Index("StatusSite", "Status", "Site"),
        Index("LastUpdateTime", "LastUpdateTime"),
    )
Attributes
job_id = Column('JobID', Integer, ForeignKey('JobJDLs.JobID', ondelete='CASCADE'), primary_key=True, default=0) class-attribute instance-attribute
job_type = Column('JobType', String(32), default='user') class-attribute instance-attribute
job_group = Column('JobGroup', String(32), default='00000000') class-attribute instance-attribute
site = Column('Site', String(100), default='ANY') class-attribute instance-attribute
job_name = Column('JobName', String(128), default='Unknown') class-attribute instance-attribute
owner = Column('Owner', String(64), default='Unknown') class-attribute instance-attribute
owner_group = Column('OwnerGroup', String(128), default='Unknown') class-attribute instance-attribute
vo = Column('VO', String(32)) class-attribute instance-attribute
submission_time = NullColumn('SubmissionTime', SmarterDateTime()) class-attribute instance-attribute
reschedule_time = NullColumn('RescheduleTime', SmarterDateTime()) class-attribute instance-attribute
last_update_time = NullColumn('LastUpdateTime', SmarterDateTime()) class-attribute instance-attribute
start_exec_time = NullColumn('StartExecTime', SmarterDateTime()) class-attribute instance-attribute
heart_beat_time = NullColumn('HeartBeatTime', SmarterDateTime()) class-attribute instance-attribute
end_exec_time = NullColumn('EndExecTime', SmarterDateTime()) class-attribute instance-attribute
status = Column('Status', String(32), default='Received') class-attribute instance-attribute
minor_status = Column('MinorStatus', String(128), default='Unknown') class-attribute instance-attribute
application_status = Column('ApplicationStatus', String(255), default='Unknown') class-attribute instance-attribute
user_priority = Column('UserPriority', Integer, default=0) class-attribute instance-attribute
reschedule_counter = Column('RescheduleCounter', Integer, default=0) class-attribute instance-attribute
verified_flag = Column('VerifiedFlag', EnumBackedBool(), default=False) class-attribute instance-attribute
accounted_flag = Column('AccountedFlag', AccountedFlagEnum(), default=False) class-attribute instance-attribute

JobJDLs

Bases: JobDBBase

Source code in diracx-db/src/diracx/db/sql/job/schema.py
class JobJDLs(JobDBBase):
    __tablename__ = "JobJDLs"
    job_id = Column("JobID", Integer, autoincrement=True, primary_key=True)
    jdl = Column("JDL", Text)
    job_requirements = Column("JobRequirements", Text)
    original_jdl = Column("OriginalJDL", Text)
Attributes
job_id = Column('JobID', Integer, autoincrement=True, primary_key=True) class-attribute instance-attribute
jdl = Column('JDL', Text) class-attribute instance-attribute
job_requirements = Column('JobRequirements', Text) class-attribute instance-attribute
original_jdl = Column('OriginalJDL', Text) class-attribute instance-attribute

InputData

Bases: JobDBBase

Source code in diracx-db/src/diracx/db/sql/job/schema.py
class InputData(JobDBBase):
    __tablename__ = "InputData"
    job_id = Column(
        "JobID", Integer, ForeignKey("Jobs.JobID", ondelete="CASCADE"), primary_key=True
    )
    lfn = Column("LFN", String(255), default="", primary_key=True)
    status = Column("Status", String(32), default="AprioriGood")
Attributes
job_id = Column('JobID', Integer, ForeignKey('Jobs.JobID', ondelete='CASCADE'), primary_key=True) class-attribute instance-attribute
lfn = Column('LFN', String(255), default='', primary_key=True) class-attribute instance-attribute
status = Column('Status', String(32), default='AprioriGood') class-attribute instance-attribute

JobParameters

Bases: JobDBBase

Source code in diracx-db/src/diracx/db/sql/job/schema.py
class JobParameters(JobDBBase):
    __tablename__ = "JobParameters"
    job_id = Column(
        "JobID", Integer, ForeignKey("Jobs.JobID", ondelete="CASCADE"), primary_key=True
    )
    name = Column("Name", String(100), primary_key=True)
    value = Column("Value", Text)
Attributes
job_id = Column('JobID', Integer, ForeignKey('Jobs.JobID', ondelete='CASCADE'), primary_key=True) class-attribute instance-attribute
name = Column('Name', String(100), primary_key=True) class-attribute instance-attribute
value = Column('Value', Text) class-attribute instance-attribute

OptimizerParameters

Bases: JobDBBase

Source code in diracx-db/src/diracx/db/sql/job/schema.py
class OptimizerParameters(JobDBBase):
    __tablename__ = "OptimizerParameters"
    job_id = Column(
        "JobID", Integer, ForeignKey("Jobs.JobID", ondelete="CASCADE"), primary_key=True
    )
    name = Column("Name", String(100), primary_key=True)
    value = Column("Value", Text)
Attributes
job_id = Column('JobID', Integer, ForeignKey('Jobs.JobID', ondelete='CASCADE'), primary_key=True) class-attribute instance-attribute
name = Column('Name', String(100), primary_key=True) class-attribute instance-attribute
value = Column('Value', Text) class-attribute instance-attribute

AtticJobParameters

Bases: JobDBBase

Source code in diracx-db/src/diracx/db/sql/job/schema.py
class AtticJobParameters(JobDBBase):
    __tablename__ = "AtticJobParameters"
    job_id = Column(
        "JobID", Integer, ForeignKey("Jobs.JobID", ondelete="CASCADE"), primary_key=True
    )
    name = Column("Name", String(100), primary_key=True)
    value = Column("Value", Text)
    reschedule_cycle = Column("RescheduleCycle", Integer)
Attributes
job_id = Column('JobID', Integer, ForeignKey('Jobs.JobID', ondelete='CASCADE'), primary_key=True) class-attribute instance-attribute
name = Column('Name', String(100), primary_key=True) class-attribute instance-attribute
value = Column('Value', Text) class-attribute instance-attribute
reschedule_cycle = Column('RescheduleCycle', Integer) class-attribute instance-attribute

HeartBeatLoggingInfo

Bases: JobDBBase

Source code in diracx-db/src/diracx/db/sql/job/schema.py
class HeartBeatLoggingInfo(JobDBBase):
    __tablename__ = "HeartBeatLoggingInfo"
    job_id = Column(
        "JobID", Integer, ForeignKey("Jobs.JobID", ondelete="CASCADE"), primary_key=True
    )
    name = Column("Name", String(100), primary_key=True)
    value = Column("Value", Text)
    heart_beat_time = Column(
        "HeartBeatTime",
        SmarterDateTime(),
        primary_key=True,
    )
Attributes
job_id = Column('JobID', Integer, ForeignKey('Jobs.JobID', ondelete='CASCADE'), primary_key=True) class-attribute instance-attribute
name = Column('Name', String(100), primary_key=True) class-attribute instance-attribute
value = Column('Value', Text) class-attribute instance-attribute
heart_beat_time = Column('HeartBeatTime', SmarterDateTime(), primary_key=True) class-attribute instance-attribute

JobCommands

Bases: JobDBBase

Source code in diracx-db/src/diracx/db/sql/job/schema.py
class JobCommands(JobDBBase):
    __tablename__ = "JobCommands"
    job_id = Column(
        "JobID", Integer, ForeignKey("Jobs.JobID", ondelete="CASCADE"), primary_key=True
    )
    command = Column("Command", String(100))
    arguments = Column("Arguments", String(100))
    status = Column("Status", String(64), default="Received")
    reception_time = Column(
        "ReceptionTime",
        SmarterDateTime(),
        primary_key=True,
    )
    execution_time = NullColumn(
        "ExecutionTime",
        SmarterDateTime(),
    )
Attributes
job_id = Column('JobID', Integer, ForeignKey('Jobs.JobID', ondelete='CASCADE'), primary_key=True) class-attribute instance-attribute
command = Column('Command', String(100)) class-attribute instance-attribute
arguments = Column('Arguments', String(100)) class-attribute instance-attribute
status = Column('Status', String(64), default='Received') class-attribute instance-attribute
reception_time = Column('ReceptionTime', SmarterDateTime(), primary_key=True) class-attribute instance-attribute
execution_time = NullColumn('ExecutionTime', SmarterDateTime()) class-attribute instance-attribute

Database Access Layer

db

Attributes

Classes

JobDB

Bases: BaseSQLDB

Source code in diracx-db/src/diracx/db/sql/job/db.py
class JobDB(BaseSQLDB):
    metadata = JobDBBase.metadata

    # Field names which should be stored in the HeartBeatLoggingInfo table
    heartbeat_fields = {
        "LoadAverage",
        "MemoryUsed",
        "Vsize",
        "AvailableDiskSpace",
        "CPUConsumed",
        "WallClockTime",
    }

    # TODO: this is copied from the DIRAC JobDB
    # but is overwritten in LHCbDIRAC, so we need
    # to find a way to make it dynamic
    jdl_2_db_parameters = ["JobName", "JobType", "JobGroup"]

    async def summary(
        self, group_by: list[str], search: list[SearchSpec]
    ) -> list[dict[str, str | int]]:
        """Get a summary of jobs aggregated by specified fields.

        Args:
            group_by: List of field names to group results by.
            search: List of search specifications to filter jobs.

        Returns:
            List of dictionaries containing grouped job statistics.

        """
        return await self._summary(table=Jobs, group_by=group_by, search=search)

    async def search(
        self,
        parameters: list[str] | None,
        search: list[SearchSpec],
        sorts: list[SortSpec],
        *,
        distinct: bool = False,
        per_page: int = 100,
        page: int | None = None,
    ) -> tuple[int, list[dict[Any, Any]]]:
        """Search for jobs in the database matching specified criteria.

        Args:
            parameters: List of field names to return, or None for all fields.
            search: List of search specifications to filter jobs.
            sorts: List of sort specifications for result ordering.
            distinct: If True, return only distinct results.
            per_page: Number of results per page.
            page: Page number to return, or None for all results.

        Returns:
            Tuple of (total_count, list of job dictionaries).

        """
        return await self._search(
            table=Jobs,
            parameters=parameters,
            search=search,
            sorts=sorts,
            distinct=distinct,
            per_page=per_page,
            page=page,
        )

    async def create_job(self, compressed_original_jdl: str) -> int:
        """Create a new job with its original JDL.

        Args:
            compressed_original_jdl: The compressed original JDL string.

        Returns:
            The inserted job ID.

        """
        result = await self.conn.execute(
            JobJDLs.__table__.insert().values(
                JDL="",
                JobRequirements="",
                OriginalJDL=compressed_original_jdl,
            )
        )
        return result.lastrowid

    async def delete_jobs(self, job_ids: list[int]):
        """Delete jobs and their associated JDLs from the database.

        Args:
            job_ids: List of job IDs to delete.

        """
        stmt = delete(JobJDLs).where(JobJDLs.job_id.in_(job_ids))
        await self.conn.execute(stmt)

    async def insert_input_data(self, lfns: dict[int, list[str]]):
        """Insert input data LFNs for jobs.

        Args:
            lfns: Mapping of job IDs to lists of logical file names (LFNs).

        """
        await self.conn.execute(
            InputData.__table__.insert(),
            [
                {
                    "JobID": job_id,
                    "LFN": lfn,
                }
                for job_id, lfns_ in lfns.items()
                for lfn in lfns_
            ],
        )

    async def insert_job_attributes(self, jobs_to_update: dict[int, dict]):
        """Insert job attributes for newly created jobs.

        Args:
            jobs_to_update: Mapping of job IDs to their attribute dictionaries.

        """
        await self.conn.execute(
            Jobs.__table__.insert(),
            [
                {
                    "JobID": job_id,
                    **attrs,
                }
                for job_id, attrs in jobs_to_update.items()
            ],
        )

    async def update_job_jdls(self, jdls_to_update: dict[int, str]):
        """Update the JDL for existing jobs.

        Typically used just after inserting the original JDL or when rescheduling.

        Args:
            jdls_to_update: Mapping of job IDs to their compressed JDL strings.

        """
        await self.conn.execute(
            JobJDLs.__table__.update().where(
                JobJDLs.__table__.c.JobID == bindparam("b_JobID")
            ),
            [
                {
                    "b_JobID": job_id,
                    "JDL": compressed_jdl,
                }
                for job_id, compressed_jdl in jdls_to_update.items()
            ],
        )

    async def set_job_attributes(self, job_data: dict[int, dict[str, Any]]) -> None:
        """Update the parameters of the given jobs.

        Automatically updates LastUpdateTime when Status is changed.

        Args:
            job_data: Mapping of job IDs to their attribute dictionaries.

        Raises:
            ValueError: If job_data is empty.

        """
        # TODO: add myDate and force parameters.

        if not job_data:
            # nothing to do!
            raise ValueError("job_data is empty")

        for job_id in job_data.keys():
            if "Status" in job_data[job_id]:
                job_data[job_id].update(
                    {"LastUpdateTime": datetime.now(tz=timezone.utc)}
                )
        columns = set(key for attrs in job_data.values() for key in attrs.keys())
        case_expressions = {
            column: case(
                *[
                    (
                        Jobs.__table__.c.JobID == job_id,
                        # Since the setting of the new column value is obscured by the CASE statement,
                        # ensure that SQLAlchemy renders the new column value with the correct type
                        literal(attrs[column], type_=Jobs.__table__.c[column].type)
                        if not isinstance(attrs[column], expression.FunctionElement)
                        else attrs[column],
                    )
                    for job_id, attrs in job_data.items()
                    if column in attrs
                ],
                else_=getattr(Jobs.__table__.c, column),  # Retain original value
            )
            for column in columns
        }

        stmt = (
            Jobs.__table__.update()
            .values(**case_expressions)
            .where(Jobs.__table__.c.JobID.in_(job_data.keys()))
        )
        await self.conn.execute(stmt)

    async def get_job_jdls(
        self, job_ids: Iterable[int], original: bool = False
    ) -> dict[int, str]:
        """Get the JDLs for the given jobs.

        Args:
            job_ids: List of job IDs to retrieve JDLs for.
            original: If True, return the original JDL, otherwise return the processed JDL.

        Returns:
            Mapping of job IDs to their JDL strings.

        """
        if original:
            stmt = select(JobJDLs.job_id, JobJDLs.original_jdl).where(
                JobJDLs.job_id.in_(job_ids)
            )
        else:
            stmt = select(JobJDLs.job_id, JobJDLs.jdl).where(
                JobJDLs.job_id.in_(job_ids)
            )

        return {jobid: jdl for jobid, jdl in (await self.conn.execute(stmt)) if jdl}

    async def set_job_commands(self, commands: list[tuple[int, str, str]]) -> None:
        """Store commands to be passed to jobs with the next heartbeat.

        Args:
            commands: List of tuples containing (job_id, command, arguments).

        """
        await self.conn.execute(
            JobCommands.__table__.insert(),
            [
                {
                    "JobID": job_id,
                    "Command": command,
                    "Arguments": arguments,
                    "ReceptionTime": datetime.now(tz=timezone.utc),
                }
                for job_id, command, arguments in commands
            ],
        )

    async def set_properties(
        self, properties: dict[int, dict[str, Any]], update_timestamp: bool = False
    ) -> int:
        """Update job properties in bulk.

        All jobs must update the same set of properties.

        Args:
            properties: Mapping of job IDs to property dictionaries.
                Example: {job_id: {prop1: val1, prop2: val2}}.
            update_timestamp: If True, update the LastUpdateTime to now.

        Returns:
            Number of rows updated.

        Raises:
            NotImplementedError: If jobs attempt to update different sets of properties.

        """
        # Check that all we always update the same set of properties
        required_parameters_set = {tuple(sorted(k.keys())) for k in properties.values()}

        if len(required_parameters_set) != 1:
            raise NotImplementedError(
                "All the jobs should update the same set of properties"
            )

        required_parameters = list(required_parameters_set)[0]
        update_parameters = [{"job_id": k, **v} for k, v in properties.items()]

        columns = _get_columns(Jobs.__table__, required_parameters)
        values: dict[str, BindParameter[Any] | datetime] = {
            c.name: bindparam(c.name) for c in columns
        }
        if update_timestamp:
            values["LastUpdateTime"] = datetime.now(tz=timezone.utc)

        stmt = update(Jobs).where(Jobs.job_id == bindparam("job_id")).values(**values)
        rows = await self.conn.execute(stmt, update_parameters)

        return rows.rowcount

    async def add_heartbeat_data(
        self, job_id: int, dynamic_data: dict[str, str]
    ) -> None:
        """Add the job's heartbeat data to the database.

        Note:
            This does not update the HeartBeatTime column in the Jobs table.
            This is instead handled by diracx.logic.jobs.status.set_job_statuses
            as it involves updating multiple databases.

        Args:
            job_id: The job ID.
            dynamic_data: Mapping of dynamic data to store.
                Example: {"AvailableDiskSpace": "123"}.

        Raises:
            InvalidQueryError: If dynamic_data contains fields not in heartbeat_fields.

        """
        if extra_fields := set(dynamic_data) - self.heartbeat_fields:
            raise InvalidQueryError(
                f"Not allowed to store heartbeat data for: {extra_fields}. "
                f"Allowed keys are: {self.heartbeat_fields}"
            )
        values = [
            {
                "JobID": job_id,
                "Name": key,
                "Value": value,
                "HeartBeatTime": utcnow(),
            }
            for key, value in dynamic_data.items()
        ]
        await self.conn.execute(HeartBeatLoggingInfo.__table__.insert().values(values))

    async def get_job_commands(self, job_ids: Iterable[int]) -> list[JobCommand]:
        """Get commands to be passed to jobs with the next heartbeat.

        Commands are marked as "Sent" after retrieval.

        Args:
            job_ids: The job IDs to get commands for.

        Returns:
            List of JobCommand objects containing job_id, command, and arguments.

        """
        # Get the commands
        stmt = (
            select(JobCommands.job_id, JobCommands.command, JobCommands.arguments)
            .where(JobCommands.job_id.in_(job_ids), JobCommands.status == "Received")
            .order_by(JobCommands.job_id)
        )
        commands = await self.conn.execute(stmt)
        # Update the status of the commands
        stmt = (
            update(JobCommands)
            .where(JobCommands.job_id.in_(job_ids))
            .values(Status="Sent")
        )
        await self.conn.execute(stmt)
        # Return the commands grouped by job id
        return [
            JobCommand(job_id=cmd.JobID, command=cmd.Command, arguments=cmd.Arguments)
            for cmd in commands
        ]
Attributes
metadata = JobDBBase.metadata class-attribute instance-attribute
heartbeat_fields = {'LoadAverage', 'MemoryUsed', 'Vsize', 'AvailableDiskSpace', 'CPUConsumed', 'WallClockTime'} class-attribute instance-attribute
jdl_2_db_parameters = ['JobName', 'JobType', 'JobGroup'] class-attribute instance-attribute
Functions
summary(group_by, search) async

Get a summary of jobs aggregated by specified fields.

Parameters:

Name Type Description Default
group_by list[str]

List of field names to group results by.

required
search list[SearchSpec]

List of search specifications to filter jobs.

required

Returns:

Type Description
list[dict[str, str | int]]

List of dictionaries containing grouped job statistics.

Source code in diracx-db/src/diracx/db/sql/job/db.py
async def summary(
    self, group_by: list[str], search: list[SearchSpec]
) -> list[dict[str, str | int]]:
    """Get a summary of jobs aggregated by specified fields.

    Args:
        group_by: List of field names to group results by.
        search: List of search specifications to filter jobs.

    Returns:
        List of dictionaries containing grouped job statistics.

    """
    return await self._summary(table=Jobs, group_by=group_by, search=search)
search(parameters, search, sorts, *, distinct=False, per_page=100, page=None) async

Search for jobs in the database matching specified criteria.

Parameters:

Name Type Description Default
parameters list[str] | None

List of field names to return, or None for all fields.

required
search list[SearchSpec]

List of search specifications to filter jobs.

required
sorts list[SortSpec]

List of sort specifications for result ordering.

required
distinct bool

If True, return only distinct results.

False
per_page int

Number of results per page.

100
page int | None

Page number to return, or None for all results.

None

Returns:

Type Description
tuple[int, list[dict[Any, Any]]]

Tuple of (total_count, list of job dictionaries).

Source code in diracx-db/src/diracx/db/sql/job/db.py
async def search(
    self,
    parameters: list[str] | None,
    search: list[SearchSpec],
    sorts: list[SortSpec],
    *,
    distinct: bool = False,
    per_page: int = 100,
    page: int | None = None,
) -> tuple[int, list[dict[Any, Any]]]:
    """Search for jobs in the database matching specified criteria.

    Args:
        parameters: List of field names to return, or None for all fields.
        search: List of search specifications to filter jobs.
        sorts: List of sort specifications for result ordering.
        distinct: If True, return only distinct results.
        per_page: Number of results per page.
        page: Page number to return, or None for all results.

    Returns:
        Tuple of (total_count, list of job dictionaries).

    """
    return await self._search(
        table=Jobs,
        parameters=parameters,
        search=search,
        sorts=sorts,
        distinct=distinct,
        per_page=per_page,
        page=page,
    )
create_job(compressed_original_jdl) async

Create a new job with its original JDL.

Parameters:

Name Type Description Default
compressed_original_jdl str

The compressed original JDL string.

required

Returns:

Type Description
int

The inserted job ID.

Source code in diracx-db/src/diracx/db/sql/job/db.py
async def create_job(self, compressed_original_jdl: str) -> int:
    """Create a new job with its original JDL.

    Args:
        compressed_original_jdl: The compressed original JDL string.

    Returns:
        The inserted job ID.

    """
    result = await self.conn.execute(
        JobJDLs.__table__.insert().values(
            JDL="",
            JobRequirements="",
            OriginalJDL=compressed_original_jdl,
        )
    )
    return result.lastrowid
delete_jobs(job_ids) async

Delete jobs and their associated JDLs from the database.

Parameters:

Name Type Description Default
job_ids list[int]

List of job IDs to delete.

required
Source code in diracx-db/src/diracx/db/sql/job/db.py
async def delete_jobs(self, job_ids: list[int]):
    """Delete jobs and their associated JDLs from the database.

    Args:
        job_ids: List of job IDs to delete.

    """
    stmt = delete(JobJDLs).where(JobJDLs.job_id.in_(job_ids))
    await self.conn.execute(stmt)
insert_input_data(lfns) async

Insert input data LFNs for jobs.

Parameters:

Name Type Description Default
lfns dict[int, list[str]]

Mapping of job IDs to lists of logical file names (LFNs).

required
Source code in diracx-db/src/diracx/db/sql/job/db.py
async def insert_input_data(self, lfns: dict[int, list[str]]):
    """Insert input data LFNs for jobs.

    Args:
        lfns: Mapping of job IDs to lists of logical file names (LFNs).

    """
    await self.conn.execute(
        InputData.__table__.insert(),
        [
            {
                "JobID": job_id,
                "LFN": lfn,
            }
            for job_id, lfns_ in lfns.items()
            for lfn in lfns_
        ],
    )
insert_job_attributes(jobs_to_update) async

Insert job attributes for newly created jobs.

Parameters:

Name Type Description Default
jobs_to_update dict[int, dict]

Mapping of job IDs to their attribute dictionaries.

required
Source code in diracx-db/src/diracx/db/sql/job/db.py
async def insert_job_attributes(self, jobs_to_update: dict[int, dict]):
    """Insert job attributes for newly created jobs.

    Args:
        jobs_to_update: Mapping of job IDs to their attribute dictionaries.

    """
    await self.conn.execute(
        Jobs.__table__.insert(),
        [
            {
                "JobID": job_id,
                **attrs,
            }
            for job_id, attrs in jobs_to_update.items()
        ],
    )
update_job_jdls(jdls_to_update) async

Update the JDL for existing jobs.

Typically used just after inserting the original JDL or when rescheduling.

Parameters:

Name Type Description Default
jdls_to_update dict[int, str]

Mapping of job IDs to their compressed JDL strings.

required
Source code in diracx-db/src/diracx/db/sql/job/db.py
async def update_job_jdls(self, jdls_to_update: dict[int, str]):
    """Update the JDL for existing jobs.

    Typically used just after inserting the original JDL or when rescheduling.

    Args:
        jdls_to_update: Mapping of job IDs to their compressed JDL strings.

    """
    await self.conn.execute(
        JobJDLs.__table__.update().where(
            JobJDLs.__table__.c.JobID == bindparam("b_JobID")
        ),
        [
            {
                "b_JobID": job_id,
                "JDL": compressed_jdl,
            }
            for job_id, compressed_jdl in jdls_to_update.items()
        ],
    )
set_job_attributes(job_data) async

Update the parameters of the given jobs.

Automatically updates LastUpdateTime when Status is changed.

Parameters:

Name Type Description Default
job_data dict[int, dict[str, Any]]

Mapping of job IDs to their attribute dictionaries.

required

Raises:

Type Description
ValueError

If job_data is empty.

Source code in diracx-db/src/diracx/db/sql/job/db.py
async def set_job_attributes(self, job_data: dict[int, dict[str, Any]]) -> None:
    """Update the parameters of the given jobs.

    Automatically updates LastUpdateTime when Status is changed.

    Args:
        job_data: Mapping of job IDs to their attribute dictionaries.

    Raises:
        ValueError: If job_data is empty.

    """
    # TODO: add myDate and force parameters.

    if not job_data:
        # nothing to do!
        raise ValueError("job_data is empty")

    for job_id in job_data.keys():
        if "Status" in job_data[job_id]:
            job_data[job_id].update(
                {"LastUpdateTime": datetime.now(tz=timezone.utc)}
            )
    columns = set(key for attrs in job_data.values() for key in attrs.keys())
    case_expressions = {
        column: case(
            *[
                (
                    Jobs.__table__.c.JobID == job_id,
                    # Since the setting of the new column value is obscured by the CASE statement,
                    # ensure that SQLAlchemy renders the new column value with the correct type
                    literal(attrs[column], type_=Jobs.__table__.c[column].type)
                    if not isinstance(attrs[column], expression.FunctionElement)
                    else attrs[column],
                )
                for job_id, attrs in job_data.items()
                if column in attrs
            ],
            else_=getattr(Jobs.__table__.c, column),  # Retain original value
        )
        for column in columns
    }

    stmt = (
        Jobs.__table__.update()
        .values(**case_expressions)
        .where(Jobs.__table__.c.JobID.in_(job_data.keys()))
    )
    await self.conn.execute(stmt)
get_job_jdls(job_ids, original=False) async

Get the JDLs for the given jobs.

Parameters:

Name Type Description Default
job_ids Iterable[int]

List of job IDs to retrieve JDLs for.

required
original bool

If True, return the original JDL, otherwise return the processed JDL.

False

Returns:

Type Description
dict[int, str]

Mapping of job IDs to their JDL strings.

Source code in diracx-db/src/diracx/db/sql/job/db.py
async def get_job_jdls(
    self, job_ids: Iterable[int], original: bool = False
) -> dict[int, str]:
    """Get the JDLs for the given jobs.

    Args:
        job_ids: List of job IDs to retrieve JDLs for.
        original: If True, return the original JDL, otherwise return the processed JDL.

    Returns:
        Mapping of job IDs to their JDL strings.

    """
    if original:
        stmt = select(JobJDLs.job_id, JobJDLs.original_jdl).where(
            JobJDLs.job_id.in_(job_ids)
        )
    else:
        stmt = select(JobJDLs.job_id, JobJDLs.jdl).where(
            JobJDLs.job_id.in_(job_ids)
        )

    return {jobid: jdl for jobid, jdl in (await self.conn.execute(stmt)) if jdl}
set_job_commands(commands) async

Store commands to be passed to jobs with the next heartbeat.

Parameters:

Name Type Description Default
commands list[tuple[int, str, str]]

List of tuples containing (job_id, command, arguments).

required
Source code in diracx-db/src/diracx/db/sql/job/db.py
async def set_job_commands(self, commands: list[tuple[int, str, str]]) -> None:
    """Store commands to be passed to jobs with the next heartbeat.

    Args:
        commands: List of tuples containing (job_id, command, arguments).

    """
    await self.conn.execute(
        JobCommands.__table__.insert(),
        [
            {
                "JobID": job_id,
                "Command": command,
                "Arguments": arguments,
                "ReceptionTime": datetime.now(tz=timezone.utc),
            }
            for job_id, command, arguments in commands
        ],
    )
set_properties(properties, update_timestamp=False) async

Update job properties in bulk.

All jobs must update the same set of properties.

Parameters:

Name Type Description Default
properties dict[int, dict[str, Any]]

Mapping of job IDs to property dictionaries. Example: {job_id: {prop1: val1, prop2: val2}}.

required
update_timestamp bool

If True, update the LastUpdateTime to now.

False

Returns:

Type Description
int

Number of rows updated.

Raises:

Type Description
NotImplementedError

If jobs attempt to update different sets of properties.

Source code in diracx-db/src/diracx/db/sql/job/db.py
async def set_properties(
    self, properties: dict[int, dict[str, Any]], update_timestamp: bool = False
) -> int:
    """Update job properties in bulk.

    All jobs must update the same set of properties.

    Args:
        properties: Mapping of job IDs to property dictionaries.
            Example: {job_id: {prop1: val1, prop2: val2}}.
        update_timestamp: If True, update the LastUpdateTime to now.

    Returns:
        Number of rows updated.

    Raises:
        NotImplementedError: If jobs attempt to update different sets of properties.

    """
    # Check that all we always update the same set of properties
    required_parameters_set = {tuple(sorted(k.keys())) for k in properties.values()}

    if len(required_parameters_set) != 1:
        raise NotImplementedError(
            "All the jobs should update the same set of properties"
        )

    required_parameters = list(required_parameters_set)[0]
    update_parameters = [{"job_id": k, **v} for k, v in properties.items()]

    columns = _get_columns(Jobs.__table__, required_parameters)
    values: dict[str, BindParameter[Any] | datetime] = {
        c.name: bindparam(c.name) for c in columns
    }
    if update_timestamp:
        values["LastUpdateTime"] = datetime.now(tz=timezone.utc)

    stmt = update(Jobs).where(Jobs.job_id == bindparam("job_id")).values(**values)
    rows = await self.conn.execute(stmt, update_parameters)

    return rows.rowcount
add_heartbeat_data(job_id, dynamic_data) async

Add the job's heartbeat data to the database.

Note

This does not update the HeartBeatTime column in the Jobs table. This is instead handled by diracx.logic.jobs.status.set_job_statuses as it involves updating multiple databases.

Parameters:

Name Type Description Default
job_id int

The job ID.

required
dynamic_data dict[str, str]

Mapping of dynamic data to store. Example: {"AvailableDiskSpace": "123"}.

required

Raises:

Type Description
InvalidQueryError

If dynamic_data contains fields not in heartbeat_fields.

Source code in diracx-db/src/diracx/db/sql/job/db.py
async def add_heartbeat_data(
    self, job_id: int, dynamic_data: dict[str, str]
) -> None:
    """Add the job's heartbeat data to the database.

    Note:
        This does not update the HeartBeatTime column in the Jobs table.
        This is instead handled by diracx.logic.jobs.status.set_job_statuses
        as it involves updating multiple databases.

    Args:
        job_id: The job ID.
        dynamic_data: Mapping of dynamic data to store.
            Example: {"AvailableDiskSpace": "123"}.

    Raises:
        InvalidQueryError: If dynamic_data contains fields not in heartbeat_fields.

    """
    if extra_fields := set(dynamic_data) - self.heartbeat_fields:
        raise InvalidQueryError(
            f"Not allowed to store heartbeat data for: {extra_fields}. "
            f"Allowed keys are: {self.heartbeat_fields}"
        )
    values = [
        {
            "JobID": job_id,
            "Name": key,
            "Value": value,
            "HeartBeatTime": utcnow(),
        }
        for key, value in dynamic_data.items()
    ]
    await self.conn.execute(HeartBeatLoggingInfo.__table__.insert().values(values))
get_job_commands(job_ids) async

Get commands to be passed to jobs with the next heartbeat.

Commands are marked as "Sent" after retrieval.

Parameters:

Name Type Description Default
job_ids Iterable[int]

The job IDs to get commands for.

required

Returns:

Type Description
list[JobCommand]

List of JobCommand objects containing job_id, command, and arguments.

Source code in diracx-db/src/diracx/db/sql/job/db.py
async def get_job_commands(self, job_ids: Iterable[int]) -> list[JobCommand]:
    """Get commands to be passed to jobs with the next heartbeat.

    Commands are marked as "Sent" after retrieval.

    Args:
        job_ids: The job IDs to get commands for.

    Returns:
        List of JobCommand objects containing job_id, command, and arguments.

    """
    # Get the commands
    stmt = (
        select(JobCommands.job_id, JobCommands.command, JobCommands.arguments)
        .where(JobCommands.job_id.in_(job_ids), JobCommands.status == "Received")
        .order_by(JobCommands.job_id)
    )
    commands = await self.conn.execute(stmt)
    # Update the status of the commands
    stmt = (
        update(JobCommands)
        .where(JobCommands.job_id.in_(job_ids))
        .values(Status="Sent")
    )
    await self.conn.execute(stmt)
    # Return the commands grouped by job id
    return [
        JobCommand(job_id=cmd.JobID, command=cmd.Command, arguments=cmd.Arguments)
        for cmd in commands
    ]