Skip to content

Job Logging Database

Job logging and history database.

Database Schema

schema

Attributes

JobLoggingDBBase = declarative_base() module-attribute

Classes

MagicEpochDateTime

Bases: TypeDecorator

A SQLAlchemy type that stores a datetime as a numeric value representing the seconds elapsed since MAGIC_EPOC_NUMBER. The underlying column is defined as Numeric(12,3) which provides a fixed-precision representation.

Source code in diracx-db/src/diracx/db/sql/job_logging/schema.py
class MagicEpochDateTime(TypeDecorator):
    """A SQLAlchemy type that stores a datetime as a numeric value representing the
    seconds elapsed since MAGIC_EPOC_NUMBER. The underlying column is defined as
    Numeric(12,3) which provides a fixed-precision representation.
    """

    impl = Numeric(12, 3)
    cache_ok = True

    MAGIC_EPOC_NUMBER = 1270000000

    def process_bind_param(self, value, dialect):
        """Convert a Python datetime to a numeric value: (timestamp - MAGIC_EPOC_NUMBER).
        The result is rounded to three decimal places.
        """
        if value is None:
            return None
        if isinstance(value, datetime):
            # Convert datetime to seconds since the Unix epoch, subtract our magic epoch,
            # and round to three decimal places.
            epoch_seconds = (
                value.replace(tzinfo=UTC).timestamp() - self.MAGIC_EPOC_NUMBER
            )
            return round(epoch_seconds, 3)
        raise ValueError(
            "Expected a datetime object for MagicEpochDateTime bind parameter."
        )

    def process_result_value(self, value, dialect):
        """Convert the numeric database value back into a Python datetime by reversing the
        stored difference (adding MAGIC_EPOC_NUMBER).
        """
        if value is None:
            return None
        # Carefully convert from Decimal to datetime to avoid losing precision
        value += self.MAGIC_EPOC_NUMBER
        value_int = int(value)
        result = datetime.fromtimestamp(value_int, tz=UTC)
        return result.replace(microsecond=int((value - value_int) * 1_000_000))
Attributes
impl = Numeric(12, 3) class-attribute instance-attribute
cache_ok = True class-attribute instance-attribute
MAGIC_EPOC_NUMBER = 1270000000 class-attribute instance-attribute
Functions
process_bind_param(value, dialect)

Convert a Python datetime to a numeric value: (timestamp - MAGIC_EPOC_NUMBER). The result is rounded to three decimal places.

Source code in diracx-db/src/diracx/db/sql/job_logging/schema.py
def process_bind_param(self, value, dialect):
    """Convert a Python datetime to a numeric value: (timestamp - MAGIC_EPOC_NUMBER).
    The result is rounded to three decimal places.
    """
    if value is None:
        return None
    if isinstance(value, datetime):
        # Convert datetime to seconds since the Unix epoch, subtract our magic epoch,
        # and round to three decimal places.
        epoch_seconds = (
            value.replace(tzinfo=UTC).timestamp() - self.MAGIC_EPOC_NUMBER
        )
        return round(epoch_seconds, 3)
    raise ValueError(
        "Expected a datetime object for MagicEpochDateTime bind parameter."
    )
process_result_value(value, dialect)

Convert the numeric database value back into a Python datetime by reversing the stored difference (adding MAGIC_EPOC_NUMBER).

Source code in diracx-db/src/diracx/db/sql/job_logging/schema.py
def process_result_value(self, value, dialect):
    """Convert the numeric database value back into a Python datetime by reversing the
    stored difference (adding MAGIC_EPOC_NUMBER).
    """
    if value is None:
        return None
    # Carefully convert from Decimal to datetime to avoid losing precision
    value += self.MAGIC_EPOC_NUMBER
    value_int = int(value)
    result = datetime.fromtimestamp(value_int, tz=UTC)
    return result.replace(microsecond=int((value - value_int) * 1_000_000))

LoggingInfo

Bases: JobLoggingDBBase

Source code in diracx-db/src/diracx/db/sql/job_logging/schema.py
class LoggingInfo(JobLoggingDBBase):
    __tablename__ = "LoggingInfo"
    job_id = Column("JobID", Integer)
    seq_num = Column("SeqNum", Integer)
    status = Column("Status", String(32), default="")
    minor_status = Column("MinorStatus", String(128), default="")
    application_status = Column("ApplicationStatus", String(255), default="")
    status_time = DateNowColumn("StatusTime")
    status_time_order = Column("StatusTimeOrder", MagicEpochDateTime, default=0)
    source = Column("StatusSource", String(32), default="Unknown")
    __table_args__ = (PrimaryKeyConstraint("JobID", "SeqNum"),)
Attributes
job_id = Column('JobID', Integer) class-attribute instance-attribute
seq_num = Column('SeqNum', Integer) class-attribute instance-attribute
status = Column('Status', String(32), default='') class-attribute instance-attribute
minor_status = Column('MinorStatus', String(128), default='') class-attribute instance-attribute
application_status = Column('ApplicationStatus', String(255), default='') class-attribute instance-attribute
status_time = DateNowColumn('StatusTime') class-attribute instance-attribute
status_time_order = Column('StatusTimeOrder', MagicEpochDateTime, default=0) class-attribute instance-attribute
source = Column('StatusSource', String(32), default='Unknown') class-attribute instance-attribute

Database Access Layer

db

Attributes

Classes

JobLoggingDB

Bases: BaseSQLDB

Frontend for the JobLoggingDB. Provides the ability to store changes with timestamps.

Source code in diracx-db/src/diracx/db/sql/job_logging/db.py
class JobLoggingDB(BaseSQLDB):
    """Frontend for the JobLoggingDB. Provides the ability to store changes with timestamps."""

    metadata = JobLoggingDBBase.metadata

    async def insert_records(
        self,
        records: list[JobLoggingRecord],
    ):
        """Bulk insert entries to the JobLoggingDB table."""
        # First, fetch the maximum SeqNums for the given job_ids
        seqnum_stmt = (
            select(
                LoggingInfo.job_id, func.coalesce(func.max(LoggingInfo.seq_num) + 1, 1)
            )
            .where(LoggingInfo.job_id.in_([record.job_id for record in records]))
            .group_by(LoggingInfo.job_id)
        )

        seqnums = {
            jid: seqnum for jid, seqnum in (await self.conn.execute(seqnum_stmt))
        }
        # IF a seqnum is not found, then assume it does not exist and the first sequence number is 1.
        # https://docs.sqlalchemy.org/en/20/orm/queryguide/dml.html#orm-bulk-insert-statements
        values = []
        for record in records:
            if record.job_id not in seqnums:
                seqnums[record.job_id] = 1

            values.append(
                {
                    "JobID": record.job_id,
                    "SeqNum": seqnums[record.job_id],
                    "Status": record.status,
                    "MinorStatus": record.minor_status,
                    "ApplicationStatus": record.application_status[:255],
                    "StatusTime": record.date,
                    "StatusTimeOrder": record.date,
                    "StatusSource": record.source[:32],
                }
            )
            seqnums[record.job_id] = seqnums[record.job_id] + 1

        await self.conn.execute(
            LoggingInfo.__table__.insert(),
            values,
        )

    async def get_records(self, job_ids: list[int]) -> dict[int, JobStatusReturn]:
        """Returns a Status,MinorStatus,ApplicationStatus,StatusTime,Source tuple
        for each record found for job specified by its jobID in historical order.
        """
        # We could potentially use a group_by here, but we need to post-process the
        # results later.
        stmt = (
            select(
                LoggingInfo.job_id,
                LoggingInfo.status,
                LoggingInfo.minor_status,
                LoggingInfo.application_status,
                LoggingInfo.status_time,
                LoggingInfo.source,
            )
            .where(LoggingInfo.job_id.in_(job_ids))
            .order_by(LoggingInfo.status_time_order, LoggingInfo.status_time)
        )
        rows = await self.conn.execute(stmt)

        values = defaultdict(list)
        for (
            job_id,
            status,
            minor_status,
            application_status,
            status_time,
            status_source,
        ) in rows:
            values[job_id].append(
                [
                    status,
                    minor_status,
                    application_status,
                    status_time.replace(tzinfo=timezone.utc),
                    status_source,
                ]
            )

        # If no value has been set for the application status in the first place,
        # We put this status to unknown
        res: dict = defaultdict(list)
        for job_id, history in values.items():
            if history[0][2] == "idem":
                history[0][2] = "Unknown"

            # We replace "idem" values by the value previously stated
            for i in range(1, len(history)):
                for j in range(3):
                    if history[i][j] == "idem":
                        history[i][j] = history[i - 1][j]

            # And we replace arrays with tuples
            for (
                status,
                minor_status,
                application_status,
                status_time,
                status_source,
            ) in history:
                res[job_id].append(
                    JobStatusReturn(
                        Status=status,
                        MinorStatus=minor_status,
                        ApplicationStatus=application_status,
                        StatusTime=status_time,
                        Source=status_source,
                    )
                )

        return res

    async def delete_records(self, job_ids: list[int]):
        """Delete logging records for given jobs."""
        stmt = delete(LoggingInfo).where(LoggingInfo.job_id.in_(job_ids))
        await self.conn.execute(stmt)

    async def get_wms_time_stamps(
        self, job_ids: Iterable[int]
    ) -> dict[int, dict[str, datetime]]:
        """Get TimeStamps for job MajorState transitions for multiple jobs at once
        return a {JobID: {State:timestamp}} dictionary.
        """
        result: defaultdict[int, dict[str, datetime]] = defaultdict(dict)
        stmt = select(
            LoggingInfo.job_id, LoggingInfo.status, LoggingInfo.status_time_order
        ).where(LoggingInfo.job_id.in_(job_ids))
        for job_id, event, etime in await self.conn.execute(stmt):
            result[job_id][event] = etime
        return dict(result)
Attributes
metadata = JobLoggingDBBase.metadata class-attribute instance-attribute
Functions
insert_records(records) async

Bulk insert entries to the JobLoggingDB table.

Source code in diracx-db/src/diracx/db/sql/job_logging/db.py
async def insert_records(
    self,
    records: list[JobLoggingRecord],
):
    """Bulk insert entries to the JobLoggingDB table."""
    # First, fetch the maximum SeqNums for the given job_ids
    seqnum_stmt = (
        select(
            LoggingInfo.job_id, func.coalesce(func.max(LoggingInfo.seq_num) + 1, 1)
        )
        .where(LoggingInfo.job_id.in_([record.job_id for record in records]))
        .group_by(LoggingInfo.job_id)
    )

    seqnums = {
        jid: seqnum for jid, seqnum in (await self.conn.execute(seqnum_stmt))
    }
    # IF a seqnum is not found, then assume it does not exist and the first sequence number is 1.
    # https://docs.sqlalchemy.org/en/20/orm/queryguide/dml.html#orm-bulk-insert-statements
    values = []
    for record in records:
        if record.job_id not in seqnums:
            seqnums[record.job_id] = 1

        values.append(
            {
                "JobID": record.job_id,
                "SeqNum": seqnums[record.job_id],
                "Status": record.status,
                "MinorStatus": record.minor_status,
                "ApplicationStatus": record.application_status[:255],
                "StatusTime": record.date,
                "StatusTimeOrder": record.date,
                "StatusSource": record.source[:32],
            }
        )
        seqnums[record.job_id] = seqnums[record.job_id] + 1

    await self.conn.execute(
        LoggingInfo.__table__.insert(),
        values,
    )
get_records(job_ids) async

Returns a Status,MinorStatus,ApplicationStatus,StatusTime,Source tuple for each record found for job specified by its jobID in historical order.

Source code in diracx-db/src/diracx/db/sql/job_logging/db.py
async def get_records(self, job_ids: list[int]) -> dict[int, JobStatusReturn]:
    """Returns a Status,MinorStatus,ApplicationStatus,StatusTime,Source tuple
    for each record found for job specified by its jobID in historical order.
    """
    # We could potentially use a group_by here, but we need to post-process the
    # results later.
    stmt = (
        select(
            LoggingInfo.job_id,
            LoggingInfo.status,
            LoggingInfo.minor_status,
            LoggingInfo.application_status,
            LoggingInfo.status_time,
            LoggingInfo.source,
        )
        .where(LoggingInfo.job_id.in_(job_ids))
        .order_by(LoggingInfo.status_time_order, LoggingInfo.status_time)
    )
    rows = await self.conn.execute(stmt)

    values = defaultdict(list)
    for (
        job_id,
        status,
        minor_status,
        application_status,
        status_time,
        status_source,
    ) in rows:
        values[job_id].append(
            [
                status,
                minor_status,
                application_status,
                status_time.replace(tzinfo=timezone.utc),
                status_source,
            ]
        )

    # If no value has been set for the application status in the first place,
    # We put this status to unknown
    res: dict = defaultdict(list)
    for job_id, history in values.items():
        if history[0][2] == "idem":
            history[0][2] = "Unknown"

        # We replace "idem" values by the value previously stated
        for i in range(1, len(history)):
            for j in range(3):
                if history[i][j] == "idem":
                    history[i][j] = history[i - 1][j]

        # And we replace arrays with tuples
        for (
            status,
            minor_status,
            application_status,
            status_time,
            status_source,
        ) in history:
            res[job_id].append(
                JobStatusReturn(
                    Status=status,
                    MinorStatus=minor_status,
                    ApplicationStatus=application_status,
                    StatusTime=status_time,
                    Source=status_source,
                )
            )

    return res
delete_records(job_ids) async

Delete logging records for given jobs.

Source code in diracx-db/src/diracx/db/sql/job_logging/db.py
async def delete_records(self, job_ids: list[int]):
    """Delete logging records for given jobs."""
    stmt = delete(LoggingInfo).where(LoggingInfo.job_id.in_(job_ids))
    await self.conn.execute(stmt)
get_wms_time_stamps(job_ids) async

Get TimeStamps for job MajorState transitions for multiple jobs at once return a {JobID: {State:timestamp}} dictionary.

Source code in diracx-db/src/diracx/db/sql/job_logging/db.py
async def get_wms_time_stamps(
    self, job_ids: Iterable[int]
) -> dict[int, dict[str, datetime]]:
    """Get TimeStamps for job MajorState transitions for multiple jobs at once
    return a {JobID: {State:timestamp}} dictionary.
    """
    result: defaultdict[int, dict[str, datetime]] = defaultdict(dict)
    stmt = select(
        LoggingInfo.job_id, LoggingInfo.status, LoggingInfo.status_time_order
    ).where(LoggingInfo.job_id.in_(job_ids))
    for job_id, event, etime in await self.conn.execute(stmt):
        result[job_id][event] = etime
    return dict(result)