Skip to content

Configuration

Configuration schema and sources for DiracX.

Config Schema

schema

Attributes

T = TypeVar('T') module-attribute

SerializableSet = Annotated[set[T], PlainSerializer(sorted, return_type=(list[T]), when_used='json-unless-none')] module-attribute

Classes

BaseModel pydantic-model

Bases: BaseModel

Config:

  • extra: forbid
  • frozen: True

Validators:

Source code in diracx-core/src/diracx/core/config/schema.py
class BaseModel(_BaseModel):
    model_config = ConfigDict(extra="forbid", frozen=True)

    @model_validator(mode="before")
    @classmethod
    def legacy_adaptor(cls, v):
        """Apply transformations to interpret the legacy DIRAC CFG format."""
        if not os.environ.get("DIRAC_COMPAT_ENABLE_CS_CONVERSION"):
            return v

        # If we're running with DIRAC_COMPAT_ENABLE_CS_CONVERSION set we apply
        # some hacky transformations to the content to ease the transition from
        # a CFG file. This is done by analysing the type hints as strings
        # though ideally we should parse the type hints properly.
        convertible_hints = {
            "list[str]",
            "SerializableSet[str]",
            "SerializableSet[SecurityProperty]",
        }
        for field, hint in cls.__annotations__.items():
            # Convert comma separated lists to actual lists
            if hint.startswith("set"):
                raise NotImplementedError("Use SerializableSet instead!")

            if field not in v:
                continue

            # Get the base hint without the optional part
            base_hint = hint.split(" | ")[0].strip()

            # Convert comma-separated strings to lists
            if base_hint in convertible_hints and isinstance(v[field], str):
                v[field] = [x.strip() for x in v[field].split(",") if x.strip()]

            # If the field is optional and the value is "None" convert it to None
            if "| None" in hint and v[field] == "None":
                v[field] = None
        return v
Attributes
model_config = ConfigDict(extra='forbid', frozen=True) class-attribute instance-attribute
Functions
legacy_adaptor(v) pydantic-validator

Apply transformations to interpret the legacy DIRAC CFG format.

Source code in diracx-core/src/diracx/core/config/schema.py
@model_validator(mode="before")
@classmethod
def legacy_adaptor(cls, v):
    """Apply transformations to interpret the legacy DIRAC CFG format."""
    if not os.environ.get("DIRAC_COMPAT_ENABLE_CS_CONVERSION"):
        return v

    # If we're running with DIRAC_COMPAT_ENABLE_CS_CONVERSION set we apply
    # some hacky transformations to the content to ease the transition from
    # a CFG file. This is done by analysing the type hints as strings
    # though ideally we should parse the type hints properly.
    convertible_hints = {
        "list[str]",
        "SerializableSet[str]",
        "SerializableSet[SecurityProperty]",
    }
    for field, hint in cls.__annotations__.items():
        # Convert comma separated lists to actual lists
        if hint.startswith("set"):
            raise NotImplementedError("Use SerializableSet instead!")

        if field not in v:
            continue

        # Get the base hint without the optional part
        base_hint = hint.split(" | ")[0].strip()

        # Convert comma-separated strings to lists
        if base_hint in convertible_hints and isinstance(v[field], str):
            v[field] = [x.strip() for x in v[field].split(",") if x.strip()]

        # If the field is optional and the value is "None" convert it to None
        if "| None" in hint and v[field] == "None":
            v[field] = None
    return v

UserConfig pydantic-model

Bases: BaseModel

Fields:

Validators:

Source code in diracx-core/src/diracx/core/config/schema.py
class UserConfig(BaseModel):
    PreferedUsername: str
    DNs: list[str] = []
    Email: EmailStr | None = None
    Suspended: list[str] = []
    Quota: int | None = None
    # TODO: These should be LHCbDIRAC specific
    CERNAccountType: str | None = None
    PrimaryCERNAccount: str | None = None
    CERNPersonId: int | None = None

    # Mapping from VO name to affiliation end date (YYYY-MM-DD)
    AffiliationEnds: dict[str, date] = Field(default_factory=dict)
Attributes
PreferedUsername pydantic-field
DNs = [] pydantic-field
Email = None pydantic-field
Suspended = [] pydantic-field
Quota = None pydantic-field
CERNAccountType = None pydantic-field
PrimaryCERNAccount = None pydantic-field
CERNPersonId = None pydantic-field
AffiliationEnds pydantic-field

GroupConfig pydantic-model

Bases: BaseModel

Fields:

Validators:

Source code in diracx-core/src/diracx/core/config/schema.py
class GroupConfig(BaseModel):
    AutoAddVOMS: bool = False
    AutoUploadPilotProxy: bool = False
    AutoUploadProxy: bool = False
    JobShare: int = 1000
    Properties: SerializableSet[SecurityProperty]
    Quota: int | None = None
    Users: SerializableSet[str]
    AllowBackgroundTQs: bool = False
    VOMSRole: str | None = None
    AutoSyncVOMS: bool = False
Attributes
AutoAddVOMS = False pydantic-field
AutoUploadPilotProxy = False pydantic-field
AutoUploadProxy = False pydantic-field
JobShare = 1000 pydantic-field
Properties pydantic-field
Quota = None pydantic-field
Users pydantic-field
AllowBackgroundTQs = False pydantic-field
VOMSRole = None pydantic-field
AutoSyncVOMS = False pydantic-field

IdpConfig pydantic-model

Bases: BaseModel

Fields:

Validators:

Source code in diracx-core/src/diracx/core/config/schema.py
class IdpConfig(BaseModel):
    URL: str
    ClientID: str

    @property
    def server_metadata_url(self):
        return f"{self.URL}/.well-known/openid-configuration"
Attributes
URL pydantic-field
ClientID pydantic-field
server_metadata_url property

SupportInfo pydantic-model

Bases: BaseModel

Fields:

Validators:

Source code in diracx-core/src/diracx/core/config/schema.py
class SupportInfo(BaseModel):
    Email: str | None = None
    Webpage: str | None = None
    Message: str = "Please contact system administrator"
Attributes
Email = None pydantic-field
Webpage = None pydantic-field
Message = 'Please contact system administrator' pydantic-field

RegistryConfig pydantic-model

Bases: BaseModel

Fields:

Validators:

Source code in diracx-core/src/diracx/core/config/schema.py
class RegistryConfig(BaseModel):
    IdP: IdpConfig
    Support: SupportInfo = Field(default_factory=SupportInfo)
    DefaultGroup: str
    DefaultStorageQuota: float = 0
    DefaultProxyLifeTime: int = 12 * 60 * 60
    VOMSName: str | None = None

    Users: MutableMapping[str, UserConfig]
    Groups: MutableMapping[str, GroupConfig]

    def sub_from_preferred_username(self, preferred_username: str) -> str:
        """Get the user sub from the preferred username.

        TODO: This could easily be cached or optimised
        """
        for sub, user in self.Users.items():
            if user.PreferedUsername == preferred_username:
                return sub
        raise KeyError(f"User {preferred_username} not found in registry")
Attributes
IdP pydantic-field
Support pydantic-field
DefaultGroup pydantic-field
DefaultStorageQuota = 0 pydantic-field
DefaultProxyLifeTime = 12 * 60 * 60 pydantic-field
VOMSName = None pydantic-field
Users pydantic-field
Groups pydantic-field
Functions
sub_from_preferred_username(preferred_username)

Get the user sub from the preferred username.

TODO: This could easily be cached or optimised

Source code in diracx-core/src/diracx/core/config/schema.py
def sub_from_preferred_username(self, preferred_username: str) -> str:
    """Get the user sub from the preferred username.

    TODO: This could easily be cached or optimised
    """
    for sub, user in self.Users.items():
        if user.PreferedUsername == preferred_username:
            return sub
    raise KeyError(f"User {preferred_username} not found in registry")

DIRACConfig pydantic-model

Bases: BaseModel

Fields:

Validators:

Source code in diracx-core/src/diracx/core/config/schema.py
class DIRACConfig(BaseModel):
    NoSetup: bool = False
Attributes
NoSetup = False pydantic-field

JobMonitoringConfig pydantic-model

Bases: BaseModel

Fields:

Validators:

Source code in diracx-core/src/diracx/core/config/schema.py
class JobMonitoringConfig(BaseModel):
    GlobalJobsInfo: bool = True
Attributes
GlobalJobsInfo = True pydantic-field

JobSchedulingConfig pydantic-model

Bases: BaseModel

Fields:

Validators:

Source code in diracx-core/src/diracx/core/config/schema.py
class JobSchedulingConfig(BaseModel):
    EnableSharesCorrection: bool = False
    MaxRescheduling: int = 3
Attributes
EnableSharesCorrection = False pydantic-field
MaxRescheduling = 3 pydantic-field

ServicesConfig pydantic-model

Bases: BaseModel

Fields:

Validators:

Source code in diracx-core/src/diracx/core/config/schema.py
class ServicesConfig(BaseModel):
    Catalogs: MutableMapping[str, Any] | None = None
    JobMonitoring: JobMonitoringConfig = JobMonitoringConfig()
    JobScheduling: JobSchedulingConfig = JobSchedulingConfig()
Attributes
Catalogs = None pydantic-field
JobMonitoring pydantic-field
JobScheduling pydantic-field

JobDescriptionConfig pydantic-model

Bases: BaseModel

Fields:

Validators:

Source code in diracx-core/src/diracx/core/config/schema.py
class JobDescriptionConfig(BaseModel):
    DefaultCPUTime: int = 86400
    DefaultPriority: int = 1
    MinCPUTime: int = 100
    MinPriority: int = 0
    MaxCPUTime: int = 500000
    MaxPriority: int = 10
    MaxInputData: int = 100
    AllowedJobTypes: list[str] = ["User", "Test", "Hospital"]
Attributes
DefaultCPUTime = 86400 pydantic-field
DefaultPriority = 1 pydantic-field
MinCPUTime = 100 pydantic-field
MinPriority = 0 pydantic-field
MaxCPUTime = 500000 pydantic-field
MaxPriority = 10 pydantic-field
MaxInputData = 100 pydantic-field
AllowedJobTypes = ['User', 'Test', 'Hospital'] pydantic-field

InputDataPolicyProtocolsConfig pydantic-model

Bases: BaseModel

Fields:

Validators:

Source code in diracx-core/src/diracx/core/config/schema.py
class InputDataPolicyProtocolsConfig(BaseModel):
    Remote: list[str] = []
    Local: list[str] = []
Attributes
Remote = [] pydantic-field
Local = [] pydantic-field

InputDataPolicyConfig pydantic-model

Bases: BaseModel

Config:

  • extra: ignore
  • frozen: True

Fields:

Validators:

Source code in diracx-core/src/diracx/core/config/schema.py
class InputDataPolicyConfig(BaseModel):
    # TODO: Remove this once the model is extended to support everything
    model_config = ConfigDict(extra="ignore", frozen=True)

    Default: str = "Default = DIRAC.WorkloadManagementSystem.Client.InputDataByProtocol"
    Download: str = "DIRAC.WorkloadManagementSystem.Client.DownloadInputData"
    Protocol: str = "DIRAC.WorkloadManagementSystem.Client.InputDataByProtocol"
    AllReplicas: bool = True
    Protocols: InputDataPolicyProtocolsConfig = InputDataPolicyProtocolsConfig()
    InputDataModule: str = "DIRAC.Core.Utilities.InputDataResolution"
Attributes
model_config = ConfigDict(extra='ignore', frozen=True) class-attribute instance-attribute
Default = 'Default = DIRAC.WorkloadManagementSystem.Client.InputDataByProtocol' pydantic-field
Download = 'DIRAC.WorkloadManagementSystem.Client.DownloadInputData' pydantic-field
Protocol = 'DIRAC.WorkloadManagementSystem.Client.InputDataByProtocol' pydantic-field
AllReplicas = True pydantic-field
Protocols pydantic-field
InputDataModule = 'DIRAC.Core.Utilities.InputDataResolution' pydantic-field

OperationsConfig pydantic-model

Bases: BaseModel

Fields:

Validators:

Source code in diracx-core/src/diracx/core/config/schema.py
class OperationsConfig(BaseModel):
    EnableSecurityLogging: bool = False
    InputDataPolicy: InputDataPolicyConfig = InputDataPolicyConfig()
    JobDescription: JobDescriptionConfig = JobDescriptionConfig()
    Services: ServicesConfig = ServicesConfig()
    SoftwareDistModule: str = "LocalSoftwareDist"

    Cloud: MutableMapping[str, Any] | None = None
    DataConsistency: MutableMapping[str, Any] | None = None
    DataManagement: MutableMapping[str, Any] | None = None
    EMail: MutableMapping[str, Any] | None = None
    GaudiExecution: MutableMapping[str, Any] | None = None
    Hospital: MutableMapping[str, Any] | None = None
    JobScheduling: MutableMapping[str, Any] | None = None
    JobTypeMapping: MutableMapping[str, Any] | None = None
    LogFiles: MutableMapping[str, Any] | None = None
    LogStorage: MutableMapping[str, Any] | None = None
    Logging: MutableMapping[str, Any] | None = None
    Matching: MutableMapping[str, Any] | None = None
    MonitoringBackends: MutableMapping[str, Any] | None = None
    NagiosConnector: MutableMapping[str, Any] | None = None
    Pilot: MutableMapping[str, Any] | None = None
    Productions: MutableMapping[str, Any] | None = None
    Shares: MutableMapping[str, Any] | None = None
    Shifter: MutableMapping[str, Any] | None = None
    SiteSEMappingByProtocol: MutableMapping[str, Any] | None = None
    TransformationPlugins: MutableMapping[str, Any] | None = None
    Transformations: MutableMapping[str, Any] | None = None
    ResourceStatus: MutableMapping[str, Any] | None = None
Attributes
EnableSecurityLogging = False pydantic-field
InputDataPolicy pydantic-field
JobDescription pydantic-field
Services pydantic-field
SoftwareDistModule = 'LocalSoftwareDist' pydantic-field
Cloud = None pydantic-field
DataConsistency = None pydantic-field
DataManagement = None pydantic-field
EMail = None pydantic-field
GaudiExecution = None pydantic-field
Hospital = None pydantic-field
JobScheduling = None pydantic-field
JobTypeMapping = None pydantic-field
LogFiles = None pydantic-field
LogStorage = None pydantic-field
Logging = None pydantic-field
Matching = None pydantic-field
MonitoringBackends = None pydantic-field
NagiosConnector = None pydantic-field
Pilot = None pydantic-field
Productions = None pydantic-field
Shares = None pydantic-field
Shifter = None pydantic-field
SiteSEMappingByProtocol = None pydantic-field
TransformationPlugins = None pydantic-field
Transformations = None pydantic-field
ResourceStatus = None pydantic-field

ResourcesComputingConfig pydantic-model

Bases: BaseModel

Config:

  • extra: ignore
  • frozen: True

Fields:

Validators:

Source code in diracx-core/src/diracx/core/config/schema.py
class ResourcesComputingConfig(BaseModel):
    # TODO: Remove this once the model is extended to support everything
    model_config = ConfigDict(extra="ignore", frozen=True)

    # TODO: Figure out how to remove this in LHCbDIRAC and then consider
    # constraining there to be at least one entry
    OSCompatibility: MutableMapping[str, set[str]] = {}

    @field_validator("OSCompatibility", mode="before")
    @classmethod
    def legacy_adaptor_os_compatibility(cls, v: Any) -> Any:
        """Apply transformations to interpret the legacy DIRAC CFG format."""
        if not os.environ.get("DIRAC_COMPAT_ENABLE_CS_CONVERSION"):
            return v
        os_compatibility = v.get("OSCompatibility", {})
        for k, v in os_compatibility.items():
            os_compatibility[k] = set(v.replace(" ", "").split(","))
        return os_compatibility

    @field_validator("OSCompatibility")
    @classmethod
    def ensure_self_compatibility(cls, v: dict[str, set[str]]) -> dict[str, set[str]]:
        """Ensure platforms are compatible with themselves."""
        for platform, compatible_platforms in v.items():
            compatible_platforms.add(platform)
        return v
Attributes
model_config = ConfigDict(extra='ignore', frozen=True) class-attribute instance-attribute
OSCompatibility = {} pydantic-field
Functions
legacy_adaptor_os_compatibility(v) pydantic-validator

Apply transformations to interpret the legacy DIRAC CFG format.

Source code in diracx-core/src/diracx/core/config/schema.py
@field_validator("OSCompatibility", mode="before")
@classmethod
def legacy_adaptor_os_compatibility(cls, v: Any) -> Any:
    """Apply transformations to interpret the legacy DIRAC CFG format."""
    if not os.environ.get("DIRAC_COMPAT_ENABLE_CS_CONVERSION"):
        return v
    os_compatibility = v.get("OSCompatibility", {})
    for k, v in os_compatibility.items():
        os_compatibility[k] = set(v.replace(" ", "").split(","))
    return os_compatibility
ensure_self_compatibility(v) pydantic-validator

Ensure platforms are compatible with themselves.

Source code in diracx-core/src/diracx/core/config/schema.py
@field_validator("OSCompatibility")
@classmethod
def ensure_self_compatibility(cls, v: dict[str, set[str]]) -> dict[str, set[str]]:
    """Ensure platforms are compatible with themselves."""
    for platform, compatible_platforms in v.items():
        compatible_platforms.add(platform)
    return v

ResourcesConfig pydantic-model

Bases: BaseModel

Config:

  • extra: ignore
  • frozen: True

Fields:

Validators:

Source code in diracx-core/src/diracx/core/config/schema.py
class ResourcesConfig(BaseModel):
    # TODO: Remove this once the model is extended to support everything
    model_config = ConfigDict(extra="ignore", frozen=True)

    Computing: ResourcesComputingConfig = ResourcesComputingConfig()
Attributes
model_config = ConfigDict(extra='ignore', frozen=True) class-attribute instance-attribute
Computing pydantic-field

Config pydantic-model

Bases: BaseModel

Fields:

Validators:

Source code in diracx-core/src/diracx/core/config/schema.py
class Config(BaseModel):
    DIRAC: DIRACConfig
    Operations: MutableMapping[str, OperationsConfig]
    Registry: MutableMapping[str, RegistryConfig]
    Resources: ResourcesConfig = ResourcesConfig()

    LocalSite: Any = None
    LogLevel: Any = None
    MCTestingDestination: Any = None
    Systems: Any | None = None
    WebApp: Any = None

    @model_validator(mode="before")
    @classmethod
    def ensure_operations_defaults(cls, v: dict[str, Any]):
        """Merge the Defaults entry into the VO-specific config under Operations."""
        operations = v.setdefault("Operations", {})
        if os.environ.get("DIRAC_COMPAT_ENABLE_CS_CONVERSION"):
            # The Defaults entry should be kept and not merged into the VO-specific
            # config as we want the "human readable" config to still contain it
            defaults = {}
        else:
            # Remove the Defaults entry
            defaults = operations.pop("Defaults", {})
        # Ensure an Operations entry exists for each VO
        # Defaults are automatically merged into each VO-specific config
        for vo in v.get("Registry", {}):
            operations[vo] = recursive_merge(defaults, operations.get(vo, {}))
        return v

    # These 2 parameters are used for client side caching
    # see the "/config/" route for details

    # hash for a unique representation of the config version
    _hexsha: str = PrivateAttr()
    # modification date
    _modified: datetime = PrivateAttr()
Attributes
DIRAC pydantic-field
Operations pydantic-field
Registry pydantic-field
Resources pydantic-field
LocalSite = None pydantic-field
LogLevel = None pydantic-field
MCTestingDestination = None pydantic-field
Systems = None pydantic-field
WebApp = None pydantic-field
Functions
ensure_operations_defaults(v) pydantic-validator

Merge the Defaults entry into the VO-specific config under Operations.

Source code in diracx-core/src/diracx/core/config/schema.py
@model_validator(mode="before")
@classmethod
def ensure_operations_defaults(cls, v: dict[str, Any]):
    """Merge the Defaults entry into the VO-specific config under Operations."""
    operations = v.setdefault("Operations", {})
    if os.environ.get("DIRAC_COMPAT_ENABLE_CS_CONVERSION"):
        # The Defaults entry should be kept and not merged into the VO-specific
        # config as we want the "human readable" config to still contain it
        defaults = {}
    else:
        # Remove the Defaults entry
        defaults = operations.pop("Defaults", {})
    # Ensure an Operations entry exists for each VO
    # Defaults are automatically merged into each VO-specific config
    for vo in v.get("Registry", {}):
        operations[vo] = recursive_merge(defaults, operations.get(vo, {}))
    return v

Functions

Config Sources

sources

This module implements the logic of the configuration server side.

This is where all the backend abstraction and the caching logic takes place.

Attributes

DEFAULT_CONFIG_FILE = 'default.yml' module-attribute

DEFAULT_GIT_BRANCH = 'master' module-attribute

DEFAULT_CS_REV_CACHE_SOFT_TTL = 5 module-attribute

DEFAULT_CS_REV_CACHE_HARD_TTL = 60 * 60 module-attribute

DEFAULT_CS_CONTENT_HARD_TTL = 15 module-attribute

logger = logging.getLogger(__name__) module-attribute

ConfigSourceUrl = Annotated[AnyUrlWithoutHost, BeforeValidator(_apply_default_scheme)] module-attribute

Classes

AnyUrlWithoutHost

Bases: AnyUrl

Source code in diracx-core/src/diracx/core/config/sources.py
class AnyUrlWithoutHost(AnyUrl):
    _constraints = UrlConstraints(host_required=False)

ConfigSource

Abstract class for the configuration source.

This class takes care of the expected caching and locking logic. Subclasses are responsible for implementing the actual logic to find revisions and reading the configuration.

Source code in diracx-core/src/diracx/core/config/sources.py
class ConfigSource(metaclass=ABCMeta):
    """Abstract class for the configuration source.

    This class takes care of the expected caching and locking logic. Subclasses
    are responsible for implementing the actual logic to find revisions and
    reading the configuration.
    """

    # Keep a mapping between the scheme and the class
    __registry: dict[str, type["ConfigSource"]] = {}
    scheme: str

    def __init__(self, *, backend_url: ConfigSourceUrl) -> None:
        # Revision cache is used to store the latest revision and its
        # modification date. This cache has two TTLs, one which triggers the
        # background refresh and the other which is results in a hard failure.
        # This allows us to avoid blocking while the refresh is done, while
        # maintaining strong guarantees on the data freshness.
        self._revision_cache = TwoLevelCache(
            soft_ttl=DEFAULT_CS_REV_CACHE_SOFT_TTL,
            hard_ttl=DEFAULT_CS_REV_CACHE_HARD_TTL,
            max_workers=1,
            max_items=1,
        )
        # The content of a given revision can be stored in a simple LRU cache
        # We keep the last two versions in memory to avoid any potential to flip
        # flop between two versions when it changes.
        self._content_cache: Cache = LRUCache(maxsize=2)

    @abstractmethod
    def latest_revision(self) -> tuple[str, datetime]:
        """Must return:
        * a unique hash as a string, representing the last version
        * a datetime object corresponding to when the version dates.
        """

    @abstractmethod
    def read_raw(self, hexsha: str, modified: datetime) -> Config:
        """Return the Config object that corresponds to the
        specific hash
        The `modified` parameter is just added as a attribute to the config.
        """

    def __init_subclass__(cls) -> None:
        """Keep a record of <scheme: class>."""
        if cls.scheme in cls.__registry:
            raise TypeError(f"{cls.scheme=} is already define")
        cls.__registry[cls.scheme] = cls

    @classmethod
    def create(cls):
        return cls.create_from_url(backend_url=os.environ["DIRACX_CONFIG_BACKEND_URL"])

    @classmethod
    def create_from_url(
        cls, *, backend_url: ConfigSourceUrl | Path | str
    ) -> "ConfigSource":
        """Factory method to produce a concrete instance depending on
        the backend URL scheme.

        """
        url = TypeAdapter(ConfigSourceUrl).validate_python(str(backend_url))
        return cls.__registry[url.scheme](backend_url=url)

    def read_config(self) -> Config:
        """Load the configuration from the backend with appropriate caching.

        :raises: diracx.core.exceptions.NotReadyError if the config is being loaded still
        :raises: git.exc.BadName if version does not exist
        """
        hexsha = self._revision_cache.get(
            "latest_revision", self._read_config_work, blocking=True
        )
        return self._content_cache[hexsha]

    async def read_config_non_blocking(self) -> Config:
        """Load the configuration from the backend with appropriate caching.

        :raises: diracx.core.exceptions.NotReadyError if the config is being loaded still
        :raises: git.exc.BadName if version does not exist
        """
        hexsha = self._revision_cache.get(
            "latest_revision", self._read_config_work, blocking=False
        )
        return self._content_cache[hexsha]

    def _read_config_work(self) -> str:
        """Work function for the thread pool of `self._revision_cache`.

        This function ensures that the latest revision is loaded into the
        content cache before it is admitted into the revision cache.
        """
        hexsha, modified = self.latest_revision()
        if hexsha not in self._content_cache:
            self._content_cache[hexsha] = self.read_raw(hexsha, modified)
        return hexsha

    def clear_caches(self):
        """Clear the caches."""
        self._revision_cache.clear()
        self._content_cache.clear()
Attributes
scheme instance-attribute
Functions
latest_revision() abstractmethod

Must return: * a unique hash as a string, representing the last version * a datetime object corresponding to when the version dates.

Source code in diracx-core/src/diracx/core/config/sources.py
@abstractmethod
def latest_revision(self) -> tuple[str, datetime]:
    """Must return:
    * a unique hash as a string, representing the last version
    * a datetime object corresponding to when the version dates.
    """
read_raw(hexsha, modified) abstractmethod

Return the Config object that corresponds to the specific hash The modified parameter is just added as a attribute to the config.

Source code in diracx-core/src/diracx/core/config/sources.py
@abstractmethod
def read_raw(self, hexsha: str, modified: datetime) -> Config:
    """Return the Config object that corresponds to the
    specific hash
    The `modified` parameter is just added as a attribute to the config.
    """
create() classmethod
Source code in diracx-core/src/diracx/core/config/sources.py
@classmethod
def create(cls):
    return cls.create_from_url(backend_url=os.environ["DIRACX_CONFIG_BACKEND_URL"])
create_from_url(*, backend_url) classmethod

Factory method to produce a concrete instance depending on the backend URL scheme.

Source code in diracx-core/src/diracx/core/config/sources.py
@classmethod
def create_from_url(
    cls, *, backend_url: ConfigSourceUrl | Path | str
) -> "ConfigSource":
    """Factory method to produce a concrete instance depending on
    the backend URL scheme.

    """
    url = TypeAdapter(ConfigSourceUrl).validate_python(str(backend_url))
    return cls.__registry[url.scheme](backend_url=url)
read_config()

Load the configuration from the backend with appropriate caching.

:raises: diracx.core.exceptions.NotReadyError if the config is being loaded still :raises: git.exc.BadName if version does not exist

Source code in diracx-core/src/diracx/core/config/sources.py
def read_config(self) -> Config:
    """Load the configuration from the backend with appropriate caching.

    :raises: diracx.core.exceptions.NotReadyError if the config is being loaded still
    :raises: git.exc.BadName if version does not exist
    """
    hexsha = self._revision_cache.get(
        "latest_revision", self._read_config_work, blocking=True
    )
    return self._content_cache[hexsha]
read_config_non_blocking() async

Load the configuration from the backend with appropriate caching.

:raises: diracx.core.exceptions.NotReadyError if the config is being loaded still :raises: git.exc.BadName if version does not exist

Source code in diracx-core/src/diracx/core/config/sources.py
async def read_config_non_blocking(self) -> Config:
    """Load the configuration from the backend with appropriate caching.

    :raises: diracx.core.exceptions.NotReadyError if the config is being loaded still
    :raises: git.exc.BadName if version does not exist
    """
    hexsha = self._revision_cache.get(
        "latest_revision", self._read_config_work, blocking=False
    )
    return self._content_cache[hexsha]
clear_caches()

Clear the caches.

Source code in diracx-core/src/diracx/core/config/sources.py
def clear_caches(self):
    """Clear the caches."""
    self._revision_cache.clear()
    self._content_cache.clear()

BaseGitConfigSource

Bases: ConfigSource

Base class for the git based config source.

Source code in diracx-core/src/diracx/core/config/sources.py
class BaseGitConfigSource(ConfigSource):
    """Base class for the git based config source."""

    repo_location: Path

    # Needed because of the ConfigSource.__init_subclass__
    scheme = "basegit"

    def __init__(self, *, backend_url: ConfigSourceUrl) -> None:
        super().__init__(backend_url=backend_url)
        self.remote_url = self.extract_remote_url(backend_url)
        self.git_branch = self.get_git_branch_from_url(backend_url)

    def latest_revision(self) -> tuple[str, datetime]:
        try:
            rev = sh.git(
                "rev-parse",
                self.git_branch,
                _cwd=self.repo_location,
                _tty_out=False,
                _async=is_running_in_async_context(),
            ).strip()
            commit_info = sh.git.show(
                "-s",
                "--format=%ct",
                rev,
                _cwd=self.repo_location,
                _tty_out=False,
                _async=is_running_in_async_context(),
            ).strip()
            modified = datetime.fromtimestamp(int(commit_info), tz=timezone.utc)
        except sh.ErrorReturnCode as e:
            raise BadConfigurationVersionError(
                f"Error parsing latest revision: {e}"
            ) from e
        logger.debug("Latest revision for %s is %s with mtime %s", self, rev, modified)
        return rev, modified

    def read_raw(self, hexsha: str, modified: datetime) -> Config:
        """:param: hexsha commit hash"""
        logger.debug("Reading %s for %s with mtime %s", self, hexsha, modified)
        try:
            blob = sh.git.show(
                f"{hexsha}:{DEFAULT_CONFIG_FILE}",
                _cwd=self.repo_location,
                _tty_out=False,
                _async=False,
            )
            raw_obj = yaml.safe_load(blob)
        except sh.ErrorReturnCode as e:
            raise BadConfigurationVersionError(
                f"Error reading configuration: {e}"
            ) from e

        config_class: Config = select_from_extension(group="diracx", name="config")[
            0
        ].load()
        config = config_class.model_validate(raw_obj)
        config._hexsha = hexsha
        config._modified = modified
        return config

    def extract_remote_url(self, backend_url: ConfigSourceUrl) -> str:
        """Extract the base URL without the 'git+' prefix and query parameters."""
        parsed_url = urlparse(str(backend_url).replace("git+", ""))
        remote_url = urlunparse(parsed_url._replace(query=""))
        return remote_url

    def get_git_branch_from_url(self, backend_url: ConfigSourceUrl) -> str:
        """Extract the branch from the query parameters."""
        return dict(backend_url.query_params()).get("branch", DEFAULT_GIT_BRANCH)
Attributes
repo_location instance-attribute
scheme = 'basegit' class-attribute instance-attribute
remote_url = self.extract_remote_url(backend_url) instance-attribute
git_branch = self.get_git_branch_from_url(backend_url) instance-attribute
Functions
latest_revision()
Source code in diracx-core/src/diracx/core/config/sources.py
def latest_revision(self) -> tuple[str, datetime]:
    try:
        rev = sh.git(
            "rev-parse",
            self.git_branch,
            _cwd=self.repo_location,
            _tty_out=False,
            _async=is_running_in_async_context(),
        ).strip()
        commit_info = sh.git.show(
            "-s",
            "--format=%ct",
            rev,
            _cwd=self.repo_location,
            _tty_out=False,
            _async=is_running_in_async_context(),
        ).strip()
        modified = datetime.fromtimestamp(int(commit_info), tz=timezone.utc)
    except sh.ErrorReturnCode as e:
        raise BadConfigurationVersionError(
            f"Error parsing latest revision: {e}"
        ) from e
    logger.debug("Latest revision for %s is %s with mtime %s", self, rev, modified)
    return rev, modified
read_raw(hexsha, modified)

:param: hexsha commit hash

Source code in diracx-core/src/diracx/core/config/sources.py
def read_raw(self, hexsha: str, modified: datetime) -> Config:
    """:param: hexsha commit hash"""
    logger.debug("Reading %s for %s with mtime %s", self, hexsha, modified)
    try:
        blob = sh.git.show(
            f"{hexsha}:{DEFAULT_CONFIG_FILE}",
            _cwd=self.repo_location,
            _tty_out=False,
            _async=False,
        )
        raw_obj = yaml.safe_load(blob)
    except sh.ErrorReturnCode as e:
        raise BadConfigurationVersionError(
            f"Error reading configuration: {e}"
        ) from e

    config_class: Config = select_from_extension(group="diracx", name="config")[
        0
    ].load()
    config = config_class.model_validate(raw_obj)
    config._hexsha = hexsha
    config._modified = modified
    return config
extract_remote_url(backend_url)

Extract the base URL without the 'git+' prefix and query parameters.

Source code in diracx-core/src/diracx/core/config/sources.py
def extract_remote_url(self, backend_url: ConfigSourceUrl) -> str:
    """Extract the base URL without the 'git+' prefix and query parameters."""
    parsed_url = urlparse(str(backend_url).replace("git+", ""))
    remote_url = urlunparse(parsed_url._replace(query=""))
    return remote_url
get_git_branch_from_url(backend_url)

Extract the branch from the query parameters.

Source code in diracx-core/src/diracx/core/config/sources.py
def get_git_branch_from_url(self, backend_url: ConfigSourceUrl) -> str:
    """Extract the branch from the query parameters."""
    return dict(backend_url.query_params()).get("branch", DEFAULT_GIT_BRANCH)

LocalGitConfigSource

Bases: BaseGitConfigSource

The configuration is stored on a local git repository When running on multiple servers, the filesystem must be shared.

Source code in diracx-core/src/diracx/core/config/sources.py
class LocalGitConfigSource(BaseGitConfigSource):
    """The configuration is stored on a local git repository
    When running on multiple servers, the filesystem must be shared.
    """

    scheme = "git+file"

    def __init__(self, *, backend_url: ConfigSourceUrl) -> None:
        super().__init__(backend_url=backend_url)
        if not backend_url.path:
            raise ValueError("Empty path for LocalGitConfigSource")

        self.repo_location = Path(backend_url.path)
        # Check if it's a valid git repository
        try:
            sh.git(
                "rev-parse",
                "--git-dir",
                _cwd=self.repo_location,
                _tty_out=False,
                _async=False,
            )
        except sh.ErrorReturnCode as e:
            raise ValueError(
                f"{self.repo_location} is not a valid git repository"
            ) from e
        sh.git.checkout(self.git_branch, _cwd=self.repo_location, _async=False)

    def __hash__(self):
        return hash(self.repo_location)
Attributes
scheme = 'git+file' class-attribute instance-attribute
repo_location = Path(backend_url.path) instance-attribute

RemoteGitConfigSource

Bases: BaseGitConfigSource

Use a remote directory as a config source.

Source code in diracx-core/src/diracx/core/config/sources.py
class RemoteGitConfigSource(BaseGitConfigSource):
    """Use a remote directory as a config source."""

    scheme = "git+https"

    def __init__(self, *, backend_url: ConfigSourceUrl) -> None:
        super().__init__(backend_url=backend_url)
        if not backend_url:
            raise ValueError("No remote url for RemoteGitConfigSource")

        self._temp_dir = TemporaryDirectory()
        self.repo_location = Path(self._temp_dir.name)
        sh.git.clone(
            self.remote_url, self.repo_location, branch=self.git_branch, _async=False
        )

    def __hash__(self):
        return hash(self.repo_location)

    def latest_revision(self) -> tuple[str, datetime]:
        logger.debug("Pulling latest version from %s", self)
        try:
            sh.git.pull(_cwd=self.repo_location, _async=False)
        except sh.ErrorReturnCode as err:
            logger.exception(err)

        return super().latest_revision()
Attributes
scheme = 'git+https' class-attribute instance-attribute
repo_location = Path(self._temp_dir.name) instance-attribute
Functions
latest_revision()
Source code in diracx-core/src/diracx/core/config/sources.py
def latest_revision(self) -> tuple[str, datetime]:
    logger.debug("Pulling latest version from %s", self)
    try:
        sh.git.pull(_cwd=self.repo_location, _async=False)
    except sh.ErrorReturnCode as err:
        logger.exception(err)

    return super().latest_revision()

Functions

is_running_in_async_context()

Source code in diracx-core/src/diracx/core/config/sources.py
def is_running_in_async_context():
    try:
        asyncio.get_running_loop()
        return True
    except RuntimeError:
        return False