Skip to content

Pilot Agents Database

Pilot agent tracking and management database.

Database Schema

schema

Attributes

PilotAgentsDBBase = declarative_base() module-attribute

Classes

PilotAgents

Bases: PilotAgentsDBBase

Source code in diracx-db/src/diracx/db/sql/pilot_agents/schema.py
class PilotAgents(PilotAgentsDBBase):
    __tablename__ = "PilotAgents"

    pilot_id = Column("PilotID", Integer, autoincrement=True, primary_key=True)
    initial_job_id = Column("InitialJobID", Integer, default=0)
    current_job_id = Column("CurrentJobID", Integer, default=0)
    pilot_job_reference = Column("PilotJobReference", String(255), default="Unknown")
    pilot_stamp = Column("PilotStamp", String(32), default="")
    destination_site = Column("DestinationSite", String(128), default="NotAssigned")
    queue = Column("Queue", String(128), default="Unknown")
    grid_site = Column("GridSite", String(128), default="Unknown")
    vo = Column("VO", String(128))
    grid_type = Column("GridType", String(32), default="LCG")
    benchmark = Column("BenchMark", Double, default=0.0)
    submission_time = NullColumn("SubmissionTime", SmarterDateTime)
    last_update_time = NullColumn("LastUpdateTime", SmarterDateTime)
    status = Column("Status", String(32), default="Unknown")
    status_reason = Column("StatusReason", String(255), default="Unknown")
    accounting_sent = Column("AccountingSent", EnumBackedBool(), default=False)

    __table_args__ = (
        Index("PilotJobReference", "PilotJobReference"),
        Index("Status", "Status"),
        Index("Statuskey", "GridSite", "DestinationSite", "Status"),
    )
Attributes
pilot_id = Column('PilotID', Integer, autoincrement=True, primary_key=True) class-attribute instance-attribute
initial_job_id = Column('InitialJobID', Integer, default=0) class-attribute instance-attribute
current_job_id = Column('CurrentJobID', Integer, default=0) class-attribute instance-attribute
pilot_job_reference = Column('PilotJobReference', String(255), default='Unknown') class-attribute instance-attribute
pilot_stamp = Column('PilotStamp', String(32), default='') class-attribute instance-attribute
destination_site = Column('DestinationSite', String(128), default='NotAssigned') class-attribute instance-attribute
queue = Column('Queue', String(128), default='Unknown') class-attribute instance-attribute
grid_site = Column('GridSite', String(128), default='Unknown') class-attribute instance-attribute
vo = Column('VO', String(128)) class-attribute instance-attribute
grid_type = Column('GridType', String(32), default='LCG') class-attribute instance-attribute
benchmark = Column('BenchMark', Double, default=0.0) class-attribute instance-attribute
submission_time = NullColumn('SubmissionTime', SmarterDateTime) class-attribute instance-attribute
last_update_time = NullColumn('LastUpdateTime', SmarterDateTime) class-attribute instance-attribute
status = Column('Status', String(32), default='Unknown') class-attribute instance-attribute
status_reason = Column('StatusReason', String(255), default='Unknown') class-attribute instance-attribute
accounting_sent = Column('AccountingSent', EnumBackedBool(), default=False) class-attribute instance-attribute

JobToPilotMapping

Bases: PilotAgentsDBBase

Source code in diracx-db/src/diracx/db/sql/pilot_agents/schema.py
class JobToPilotMapping(PilotAgentsDBBase):
    __tablename__ = "JobToPilotMapping"

    pilot_id = Column("PilotID", Integer, primary_key=True)
    job_id = Column("JobID", Integer, primary_key=True)
    start_time = Column("StartTime", SmarterDateTime)

    __table_args__ = (Index("JobID", "JobID"), Index("PilotID", "PilotID"))
Attributes
pilot_id = Column('PilotID', Integer, primary_key=True) class-attribute instance-attribute
job_id = Column('JobID', Integer, primary_key=True) class-attribute instance-attribute
start_time = Column('StartTime', SmarterDateTime) class-attribute instance-attribute

PilotOutput

Bases: PilotAgentsDBBase

Source code in diracx-db/src/diracx/db/sql/pilot_agents/schema.py
class PilotOutput(PilotAgentsDBBase):
    __tablename__ = "PilotOutput"

    pilot_id = Column("PilotID", Integer, primary_key=True)
    std_output = Column("StdOutput", Text)
    std_error = Column("StdError", Text)
Attributes
pilot_id = Column('PilotID', Integer, primary_key=True) class-attribute instance-attribute
std_output = Column('StdOutput', Text) class-attribute instance-attribute
std_error = Column('StdError', Text) class-attribute instance-attribute

Database Access Layer

db

Attributes

Classes

PilotAgentsDB

Bases: BaseSQLDB

PilotAgentsDB class is a front-end to the PilotAgents Database.

Source code in diracx-db/src/diracx/db/sql/pilot_agents/db.py
class PilotAgentsDB(BaseSQLDB):
    """PilotAgentsDB class is a front-end to the PilotAgents Database."""

    metadata = PilotAgentsDBBase.metadata

    async def add_pilot_references(
        self,
        pilot_ref: list[str],
        vo: str,
        grid_type: str = "DIRAC",
        pilot_stamps: dict | None = None,
    ) -> None:
        if pilot_stamps is None:
            pilot_stamps = {}

        now = datetime.now(tz=timezone.utc)

        # Prepare the list of dictionaries for bulk insertion
        values = [
            {
                "PilotJobReference": ref,
                "VO": vo,
                "GridType": grid_type,
                "SubmissionTime": now,
                "LastUpdateTime": now,
                "Status": "Submitted",
                "PilotStamp": pilot_stamps.get(ref, ""),
            }
            for ref in pilot_ref
        ]

        # Insert multiple rows in a single execute call
        stmt = insert(PilotAgents).values(values)
        await self.conn.execute(stmt)
        return
Attributes
metadata = PilotAgentsDBBase.metadata class-attribute instance-attribute
Functions
add_pilot_references(pilot_ref, vo, grid_type='DIRAC', pilot_stamps=None) async
Source code in diracx-db/src/diracx/db/sql/pilot_agents/db.py
async def add_pilot_references(
    self,
    pilot_ref: list[str],
    vo: str,
    grid_type: str = "DIRAC",
    pilot_stamps: dict | None = None,
) -> None:
    if pilot_stamps is None:
        pilot_stamps = {}

    now = datetime.now(tz=timezone.utc)

    # Prepare the list of dictionaries for bulk insertion
    values = [
        {
            "PilotJobReference": ref,
            "VO": vo,
            "GridType": grid_type,
            "SubmissionTime": now,
            "LastUpdateTime": now,
            "Status": "Submitted",
            "PilotStamp": pilot_stamps.get(ref, ""),
        }
        for ref in pilot_ref
    ]

    # Insert multiple rows in a single execute call
    stmt = insert(PilotAgents).values(values)
    await self.conn.execute(stmt)
    return