Skip to content

Router Utilities

Helper functions and utilities for building DiracX routers.

These utilities provide common functionality needed across multiple routers, such as user management, response formatting, and request validation.

User Utilities

users

Attributes

oidc_scheme = OpenIdConnect(openIdConnectUrl='/.well-known/openid-configuration', auto_error=False) module-attribute

Classes

UUID

Bases: UUID

Subclass of uuid_utils.UUID to add pydantic support.

Source code in diracx-routers/src/diracx/routers/utils/users.py
class UUID(_UUID):
    """Subclass of uuid_utils.UUID to add pydantic support."""

    @classmethod
    def __get_pydantic_core_schema__(
        cls, source_type: Any, handler: GetCoreSchemaHandler
    ) -> CoreSchema:
        """Use the stdlib uuid.UUID schema for validation and serialization."""
        std_schema = handler(std_uuid.UUID)

        def to_uuid_utils(u: std_uuid.UUID) -> UUID:
            return cls(str(u))

        return core_schema.no_info_after_validator_function(to_uuid_utils, std_schema)

    @classmethod
    def __get_pydantic_json_schema__(
        cls, core_schema: CoreSchema, handler: GetJsonSchemaHandler
    ) -> dict[str, Any]:
        """Return the stdlib uuid.UUID schema for JSON serialization."""
        return handler(core_schema)
Functions
__get_pydantic_core_schema__(source_type, handler) classmethod

Use the stdlib uuid.UUID schema for validation and serialization.

Source code in diracx-routers/src/diracx/routers/utils/users.py
@classmethod
def __get_pydantic_core_schema__(
    cls, source_type: Any, handler: GetCoreSchemaHandler
) -> CoreSchema:
    """Use the stdlib uuid.UUID schema for validation and serialization."""
    std_schema = handler(std_uuid.UUID)

    def to_uuid_utils(u: std_uuid.UUID) -> UUID:
        return cls(str(u))

    return core_schema.no_info_after_validator_function(to_uuid_utils, std_schema)
__get_pydantic_json_schema__(core_schema, handler) classmethod

Return the stdlib uuid.UUID schema for JSON serialization.

Source code in diracx-routers/src/diracx/routers/utils/users.py
@classmethod
def __get_pydantic_json_schema__(
    cls, core_schema: CoreSchema, handler: GetJsonSchemaHandler
) -> dict[str, Any]:
    """Return the stdlib uuid.UUID schema for JSON serialization."""
    return handler(core_schema)

AuthInfo pydantic-model

Bases: BaseModel

Fields:

Source code in diracx-routers/src/diracx/routers/utils/users.py
class AuthInfo(BaseModel):
    # raw token for propagation
    bearer_token: str

    # token ID in the DB for Component
    # unique jwt identifier for user
    token_id: UUID

    # list of DIRAC properties
    properties: list[SecurityProperty]

    policies: dict[str, Any] = {}
Attributes
bearer_token pydantic-field
token_id pydantic-field
properties pydantic-field
policies = {} pydantic-field

AuthorizedUserInfo pydantic-model

Bases: AuthInfo, UserInfo

Fields:

Source code in diracx-routers/src/diracx/routers/utils/users.py
class AuthorizedUserInfo(AuthInfo, UserInfo):
    pass

Functions

verify_dirac_access_token(authorization, settings) async

Verify dirac user token and return a UserInfo class Used for each API endpoint.

Source code in diracx-routers/src/diracx/routers/utils/users.py
async def verify_dirac_access_token(
    authorization: Annotated[str, Depends(oidc_scheme)],
    settings: AuthSettings,
) -> AuthorizedUserInfo:
    """Verify dirac user token and return a UserInfo class
    Used for each API endpoint.
    """
    if not authorization:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Authorization header is missing",
            headers={"WWW-Authenticate": "Bearer"},
        )
    if match := re.fullmatch(r"Bearer (.+)", authorization):
        raw_token = match.group(1)
    else:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Invalid authorization header",
        )

    try:
        claims = read_token(
            raw_token,
            settings.token_keystore.jwks,
            settings.token_allowed_algorithms,
            claims_requests=JWTClaimsRegistry(
                iss={"essential": True, "value": settings.token_issuer},
            ),
        )
    except JoseError as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid JWT",
        ) from e

    return AuthorizedUserInfo(
        bearer_token=raw_token,
        token_id=claims["jti"],
        properties=claims["dirac_properties"],
        sub=claims["sub"],
        preferred_username=claims["preferred_username"],
        dirac_group=claims["dirac_group"],
        vo=claims["vo"],
        policies=claims.get("dirac_policies", {}),
    )