Skip to content

Auth Router

Authentication and authorization endpoints.

auth

Attributes

router = DiracxRouter(require_auth=False) module-attribute

__all__ = ['has_properties', 'verify_dirac_access_token'] module-attribute

Classes

Functions

verify_dirac_access_token(authorization, settings) async

Verify dirac user token and return a UserInfo class Used for each API endpoint.

Source code in diracx-routers/src/diracx/routers/utils/users.py
async def verify_dirac_access_token(
    authorization: Annotated[str, Depends(oidc_scheme)],
    settings: AuthSettings,
) -> AuthorizedUserInfo:
    """Verify dirac user token and return a UserInfo class
    Used for each API endpoint.
    """
    if not authorization:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Authorization header is missing",
            headers={"WWW-Authenticate": "Bearer"},
        )
    if match := re.fullmatch(r"Bearer (.+)", authorization):
        raw_token = match.group(1)
    else:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Invalid authorization header",
        )

    try:
        claims = read_token(
            raw_token,
            settings.token_keystore.jwks,
            settings.token_allowed_algorithms,
            claims_requests=JWTClaimsRegistry(
                iss={"essential": True, "value": settings.token_issuer},
            ),
        )
    except JoseError as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid JWT",
        ) from e

    return AuthorizedUserInfo(
        bearer_token=raw_token,
        token_id=claims["jti"],
        properties=claims["dirac_properties"],
        sub=claims["sub"],
        preferred_username=claims["preferred_username"],
        dirac_group=claims["dirac_group"],
        vo=claims["vo"],
        policies=claims.get("dirac_policies", {}),
    )

has_properties(expression)

Check if the user has the given properties.

Source code in diracx-routers/src/diracx/routers/auth/utils.py
def has_properties(expression: UnevaluatedProperty | SecurityProperty):
    """Check if the user has the given properties."""
    evaluator = (
        expression
        if isinstance(expression, UnevaluatedProperty)
        else UnevaluatedProperty(expression)
    )

    async def require_property(
        user: Annotated[AuthorizedUserInfo, Depends(verify_dirac_access_token)],
    ):
        if not evaluator(user.properties):
            raise HTTPException(status.HTTP_403_FORBIDDEN)

    return Depends(require_property)

Token Management

token

Token endpoint.

Attributes

router = DiracxRouter(require_auth=False) module-attribute

logger = logging.getLogger(__name__) module-attribute

BASE_64_URL_SAFE_PATTERN = '(?:[A-Za-z0-9\\-_]{4})*(?:[A-Za-z0-9\\-_]{2}==|[A-Za-z0-9\\-_]{3}=)?' module-attribute

LEGACY_EXCHANGE_PATTERN = f'Bearer diracx:legacy:({BASE_64_URL_SAFE_PATTERN})' module-attribute

Classes

Functions

mint_token(access_payload, refresh_payload, existing_refresh_token, all_access_policies, settings) async

Enrich the token with policy specific content and mint it.

Source code in diracx-routers/src/diracx/routers/auth/token.py
async def mint_token(
    access_payload: AccessTokenPayload,
    refresh_payload: RefreshTokenPayload | None,
    existing_refresh_token: str | None,
    all_access_policies: dict[str, BaseAccessPolicy],
    settings: AuthSettings,
) -> TokenResponse:
    """Enrich the token with policy specific content and mint it."""
    if not refresh_payload and not existing_refresh_token:
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail="Refresh token is not set and no refresh token was provided",
        )

    # Enrich the token with policy specific content
    dirac_access_policies = {}
    dirac_refresh_policies = {}
    for policy_name, policy in all_access_policies.items():
        access_extra, refresh_extra = policy.enrich_tokens(
            access_payload, refresh_payload
        )
        if access_extra:
            dirac_access_policies[policy_name] = access_extra
        if refresh_extra:
            dirac_refresh_policies[policy_name] = refresh_extra

    # Create the access token
    access_payload["dirac_policies"] = dirac_access_policies
    access_token = create_token(access_payload, settings)

    # Create the refresh token
    if refresh_payload:
        refresh_payload["dirac_policies"] = dirac_refresh_policies
        refresh_token = create_token(refresh_payload, settings)
    elif existing_refresh_token:
        refresh_token = existing_refresh_token

    return TokenResponse(
        access_token=access_token,
        expires_in=settings.access_token_expire_minutes * 60,
        refresh_token=refresh_token,
    )

get_oidc_token(grant_type, client_id, auth_db, config, settings, available_properties, all_access_policies, device_code=None, code=None, redirect_uri=None, code_verifier=None, refresh_token=None) async

Token endpoint to retrieve the token at the end of a flow. This is the endpoint being pulled by dirac-login when doing the device flow.

Source code in diracx-routers/src/diracx/routers/auth/token.py
@router.post("/token")
async def get_oidc_token(
    # Autorest does not support the GrantType annotation
    # We need to specify each option with Literal[]
    grant_type: Annotated[
        Literal[GrantType.authorization_code]
        | Literal[GrantType.device_code]
        | Literal[GrantType.refresh_token],
        Form(description="OAuth2 Grant type"),
    ],
    client_id: Annotated[str, Form(description="OAuth2 client id")],
    auth_db: AuthDB,
    config: Config,
    settings: AuthSettings,
    available_properties: AvailableSecurityProperties,
    all_access_policies: Annotated[
        dict[str, BaseAccessPolicy], Depends(BaseAccessPolicy.all_used_access_policies)
    ],
    device_code: Annotated[
        str | None, Form(description="device code for OAuth2 device flow")
    ] = None,
    code: Annotated[
        str | None, Form(description="Code for OAuth2 authorization code flow")
    ] = None,
    redirect_uri: Annotated[
        str | None,
        Form(description="redirect_uri used with OAuth2 authorization code flow"),
    ] = None,
    code_verifier: Annotated[
        str | None,
        Form(
            description="Verifier for the code challenge for the OAuth2 authorization flow with PKCE"
        ),
    ] = None,
    refresh_token: Annotated[
        str | None,
        Form(description="Refresh token used with OAuth2 refresh token flow"),
    ] = None,
) -> TokenResponse:
    """Token endpoint to retrieve the token at the end of a flow.
    This is the endpoint being pulled by dirac-login when doing the device flow.
    """
    try:
        access_payload, refresh_payload = await get_oidc_token_bl(
            grant_type,
            client_id,
            auth_db,
            config,
            settings,
            available_properties,
            device_code=device_code,
            code=code,
            redirect_uri=redirect_uri,
            code_verifier=code_verifier,
            refresh_token=refresh_token,
        )
    except PendingAuthorizationError as e:
        raise DiracHttpResponseError(
            status_code=status.HTTP_400_BAD_REQUEST,
            data={"error": "authorization_pending"},
        ) from e
    except ValueError as e:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=str(e),
        ) from e
    except (
        InvalidCredentialsError,
        JoseError,
    ) as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=str(e),
            headers={"WWW-Authenticate": "Bearer"},
        ) from e
    except PermissionError as e:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail=str(e),
        ) from e
    return await mint_token(
        access_payload, refresh_payload, refresh_token, all_access_policies, settings
    )

perform_legacy_exchange(preferred_username, scope, authorization, auth_db, available_properties, settings, config, all_access_policies, expires_minutes=None) async

Endpoint used by legacy DIRAC to mint tokens for proxy -> token exchange.

This route is disabled if DIRACX_LEGACY_EXCHANGE_HASHED_API_KEY is not set in the environment.

Source code in diracx-routers/src/diracx/routers/auth/token.py
@router.get("/legacy-exchange", include_in_schema=False)
async def perform_legacy_exchange(
    preferred_username: str,
    scope: str,
    authorization: Annotated[str, Header()],
    auth_db: AuthDB,
    available_properties: AvailableSecurityProperties,
    settings: AuthSettings,
    config: Config,
    all_access_policies: Annotated[
        dict[str, BaseAccessPolicy], Depends(BaseAccessPolicy.all_used_access_policies)
    ],
    expires_minutes: int | None = None,
) -> TokenResponse:
    """Endpoint used by legacy DIRAC to mint tokens for proxy -> token exchange.

    This route is disabled if DIRACX_LEGACY_EXCHANGE_HASHED_API_KEY is not set
    in the environment.
    """
    if not (
        expected_api_key := os.environ.get("DIRACX_LEGACY_EXCHANGE_HASHED_API_KEY")
    ):
        raise HTTPException(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            detail="Legacy exchange is not enabled",
        )

    try:
        access_payload, refresh_payload = await perform_legacy_exchange_bl(
            expected_api_key=expected_api_key,
            preferred_username=preferred_username,
            scope=scope,
            authorization=authorization,
            auth_db=auth_db,
            available_properties=available_properties,
            settings=settings,
            config=config,
            expires_minutes=expires_minutes,
        )
    except ValueError as e:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=str(e),
        ) from e
    except InvalidCredentialsError as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=str(e),
            headers={"WWW-Authenticate": "Bearer"},
        ) from e
    except PermissionError as e:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail=str(e),
        ) from e
    return await mint_token(
        access_payload, refresh_payload, None, all_access_policies, settings
    )

Authorization Code Flow

authorize_code_flow

Authorization code flow.

See docs/admin/explanations/authentication.md

Attributes

router = DiracxRouter(require_auth=False) module-attribute

Classes

Functions

initiate_authorization_flow(request, response_type, code_challenge, code_challenge_method, client_id, redirect_uri, scope, state, auth_db, config, available_properties, settings) async

Initiate the authorization flow. It will redirect to the actual OpenID server (IAM, CheckIn) to perform a authorization code flow.

Scope details: - If only VO is provided: Uses the default group and its properties for the VO.

  • If VO and group are provided: Uses the specified group and its properties for the VO.

  • If VO and properties are provided: Uses the default group and combines its properties with the provided properties.

  • If VO, group, and properties are provided: Uses the specified group and combines its properties with the provided properties.

We set the user details obtained from the user authorize flow in a cookie to be able to map the authorization flow with the corresponding user authorize flow.

Source code in diracx-routers/src/diracx/routers/auth/authorize_code_flow.py
@router.get("/authorize")
async def initiate_authorization_flow(
    request: Request,
    response_type: Literal["code"],
    code_challenge: str,
    code_challenge_method: Literal["S256"],
    client_id: str,
    redirect_uri: str,
    scope: str,
    state: str,
    auth_db: AuthDB,
    config: Config,
    available_properties: AvailableSecurityProperties,
    settings: AuthSettings,
) -> responses.RedirectResponse:
    """Initiate the authorization flow.
    It will redirect to the actual OpenID server (IAM, CheckIn) to
    perform a authorization code flow.

    Scope details:
    - If only VO is provided: Uses the default group and its properties for the VO.

    - If VO and group are provided: Uses the specified group and its properties for the VO.

    - If VO and properties are provided: Uses the default group and combines its properties with the
      provided properties.

    - If VO, group, and properties are provided: Uses the specified group and combines its properties with the
      provided properties.

    We set the user details obtained from the user authorize flow in a cookie
    to be able to map the authorization flow with the corresponding
    user authorize flow.
    """
    try:
        redirect_uri = await initiate_authorization_flow_bl(
            request_url=f"{request.url.replace(query='')}",
            code_challenge=code_challenge,
            code_challenge_method=code_challenge_method,
            client_id=client_id,
            redirect_uri=redirect_uri,
            scope=scope,
            state=state,
            auth_db=auth_db,
            config=config,
            settings=settings,
            available_properties=available_properties,
        )
    except ValueError as e:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=str(e),
        ) from e

    return responses.RedirectResponse(redirect_uri)

complete_authorization_flow(code, state, request, auth_db, config, settings) async

Complete the authorization flow.

The user is redirected back to the DIRAC auth service after completing the IAM's authorization flow. We retrieve the original flow details from the decrypted state and store the ID token requested from the IAM. The user is then redirected to the client's redirect URI.

Source code in diracx-routers/src/diracx/routers/auth/authorize_code_flow.py
@router.get("/authorize/complete")
async def complete_authorization_flow(
    code: str,
    state: str,
    request: Request,
    auth_db: AuthDB,
    config: Config,
    settings: AuthSettings,
) -> responses.RedirectResponse:
    """Complete the authorization flow.

    The user is redirected back to the DIRAC auth service after completing the IAM's authorization flow.
    We retrieve the original flow details from the decrypted state and store the ID token requested from the IAM.
    The user is then redirected to the client's redirect URI.
    """
    try:
        redirect_uri = await complete_authorization_flow_bl(
            code=code,
            state=state,
            request_url=str(request.url.replace(query="")),
            auth_db=auth_db,
            config=config,
            settings=settings,
        )
    except AuthorizationError as e:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid state"
        ) from e
    except IAMServerError as e:
        raise HTTPException(
            status_code=status.HTTP_502_BAD_GATEWAY,
            detail="Failed to contact IAM server",
        ) from e
    except IAMClientError as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid code"
        ) from e
    return responses.RedirectResponse(redirect_uri)

Device Flow

device_flow

Device flow.

See docs/admin/explanations/authentication.md

Attributes

router = DiracxRouter(require_auth=False) module-attribute

Classes

Functions

initiate_device_flow(client_id, scope, request, auth_db, config, available_properties, settings) async

Initiate the device flow against DIRAC authorization Server.

Scope details: - If only VO is provided: Uses the default group and its properties for the VO.

  • If VO and group are provided: Uses the specified group and its properties for the VO.

  • If VO and properties are provided: Uses the default group and combines its properties with the provided properties.

  • If VO, group, and properties are provided: Uses the specified group and combines its properties with the provided properties.

Offers the user to go with the browser to auth/<vo>/device?user_code=XYZ

Source code in diracx-routers/src/diracx/routers/auth/device_flow.py
@router.post("/device")
async def initiate_device_flow(
    client_id: str,
    scope: str,
    request: Request,
    auth_db: AuthDB,
    config: Config,
    available_properties: AvailableSecurityProperties,
    settings: AuthSettings,
) -> InitiateDeviceFlowResponse:
    """Initiate the device flow against DIRAC authorization Server.

    Scope details:
    - If only VO is provided: Uses the default group and its properties for the VO.

    - If VO and group are provided: Uses the specified group and its properties for the VO.

    - If VO and properties are provided: Uses the default group and combines its properties with the
      provided properties.

    - If VO, group, and properties are provided: Uses the specified group and combines its properties with the
      provided properties.

    Offers the user to go with the browser to
    `auth/<vo>/device?user_code=XYZ`
    """
    try:
        device_flow_response = await initiate_device_flow_bl(
            client_id=client_id,
            scope=scope,
            verification_uri=str(request.url.replace(query={})),
            auth_db=auth_db,
            config=config,
            available_properties=available_properties,
            settings=settings,
        )
    except ValueError as e:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=e.args[0],
        ) from e

    return device_flow_response

do_device_flow(request, auth_db, user_code, config, available_properties, settings) async

This is called as the verification URI for the device flow. It will redirect to the actual OpenID server (IAM, CheckIn) to perform a authorization code flow.

We set the user_code obtained from the device flow in a cookie to be able to map the authorization flow with the corresponding device flow. (note: it can't be put as parameter or in the URL)

Source code in diracx-routers/src/diracx/routers/auth/device_flow.py
@router.get("/device")
async def do_device_flow(
    request: Request,
    auth_db: AuthDB,
    user_code: str,
    config: Config,
    available_properties: AvailableSecurityProperties,
    settings: AuthSettings,
) -> RedirectResponse:
    """This is called as the verification URI for the device flow.
    It will redirect to the actual OpenID server (IAM, CheckIn) to
    perform a authorization code flow.

    We set the user_code obtained from the device flow in a cookie
    to be able to map the authorization flow with the corresponding
    device flow.
    (note: it can't be put as parameter or in the URL)
    """
    authorization_flow_url = await do_device_flow_bl(
        request_url=str(request.url.replace(query="")),
        auth_db=auth_db,
        user_code=user_code,
        config=config,
        available_properties=available_properties,
        settings=settings,
    )
    return RedirectResponse(authorization_flow_url)

finish_device_flow(request, code, state, auth_db, config, settings) async

This the url callbacked by IAM/CheckIn after the authorization flow was granted. It gets us the code we need for the authorization flow, and we can map it to the corresponding device flow using the user_code in the cookie/session.

Source code in diracx-routers/src/diracx/routers/auth/device_flow.py
@router.get("/device/complete")
async def finish_device_flow(
    request: Request,
    code: str,
    state: str,
    auth_db: AuthDB,
    config: Config,
    settings: AuthSettings,
) -> RedirectResponse:
    """This the url callbacked by IAM/CheckIn after the authorization
    flow was granted.
    It gets us the code we need for the authorization flow, and we
    can map it to the corresponding device flow using the user_code
    in the cookie/session.
    """
    request_url = str(request.url.replace(query={}))

    try:
        await finish_device_flow_bl(
            request_url,
            code,
            state,
            auth_db,
            config,
            settings,
        )
    except IAMServerError as e:
        raise HTTPException(
            status_code=status.HTTP_502_BAD_GATEWAY,
            detail=e.args[0],
        ) from e
    except IAMClientError as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=e.args[0],
        ) from e

    return responses.RedirectResponse(f"{request_url}/finished")

finished(response)

This is the final step of the device flow.

Source code in diracx-routers/src/diracx/routers/auth/device_flow.py
@router.get("/device/complete/finished")
def finished(response: Response):
    """This is the final step of the device flow."""
    response.body = b"<h1>Please close the window</h1>"
    response.status_code = 200
    response.media_type = "text/html"
    return response

Management

management

This module contains the auth management endpoints.

These endpoints are used to manage the user's authentication tokens and to get information about the user's identity.

Attributes

router = DiracxRouter(require_auth=False) module-attribute

logger = logging.getLogger(__name__) module-attribute

Classes

UserInfoResponse

Bases: TypedDict

Response for the userinfo endpoint.

Source code in diracx-routers/src/diracx/routers/auth/management.py
class UserInfoResponse(TypedDict):
    """Response for the userinfo endpoint."""

    sub: str
    vo: str
    dirac_group: str
    policies: dict[str, Any]
    properties: list[SecurityProperty]
    preferred_username: str
Attributes
sub instance-attribute
vo instance-attribute
dirac_group instance-attribute
policies instance-attribute
properties instance-attribute
preferred_username instance-attribute

Functions

get_refresh_tokens(auth_db, user_info) async

Get all refresh tokens for the user. If the user has the proxy_management property, then the subject is not used to filter the refresh tokens.

Source code in diracx-routers/src/diracx/routers/auth/management.py
@router.get("/refresh-tokens")
async def get_refresh_tokens(
    auth_db: AuthDB,
    user_info: Annotated[AuthorizedUserInfo, Depends(verify_dirac_access_token)],
) -> list:
    """Get all refresh tokens for the user. If the user has the `proxy_management` property, then
    the subject is not used to filter the refresh tokens.
    """
    subject: str | None = user_info.sub
    if PROXY_MANAGEMENT in user_info.properties:
        subject = None

    return await get_refresh_tokens_bl(auth_db, subject)

revoke_refresh_token_by_refresh_token(auth_db, settings, token, token_type_hint=None, client_id='myDIRACClientID') async

Revoke a refresh token. Closely follows RFC 7009 (or try to, at least).

Source code in diracx-routers/src/diracx/routers/auth/management.py
@router.post("/revoke")
async def revoke_refresh_token_by_refresh_token(
    auth_db: AuthDB,
    settings: AuthSettings,
    token: Annotated[str, Form(description="The refresh token to revoke")],
    token_type_hint: Annotated[
        str | None,
        Form(description="Hint for the type of token being revoked"),
    ] = None,
    client_id: Annotated[
        str,
        Form(description="The client ID of the application requesting the revocation"),
    ] = "myDIRACClientID",
) -> str:
    """Revoke a refresh token. Closely follows RFC 7009 (or try to, at least)."""
    try:
        await revoke_refresh_token_by_refresh_token_bl(
            auth_db, None, token, token_type_hint, client_id, settings
        )
    except (DecodeError, KeyError):
        logger.warning("Someone tried to revoke its token but failed.")
    except ValueError as e:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=str(e),
        ) from e
    except InvalidCredentialsError as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=str(e),
            headers={"WWW-Authenticate": "Bearer"},
        ) from e

    return "Refresh token revoked"

revoke_refresh_token_by_jti(auth_db, user_info, jti) async

Revoke a refresh token. If the user has the proxy_management property, then the subject is not used to filter the refresh tokens.

Source code in diracx-routers/src/diracx/routers/auth/management.py
@router.delete("/refresh-tokens/{jti}")
async def revoke_refresh_token_by_jti(
    auth_db: AuthDB,
    user_info: Annotated[AuthorizedUserInfo, Depends(verify_dirac_access_token)],
    jti: str,
) -> str:
    """Revoke a refresh token. If the user has the `proxy_management` property, then
    the subject is not used to filter the refresh tokens.
    """
    subject: str | None = user_info.sub
    if PROXY_MANAGEMENT in user_info.properties:
        subject = None

    try:
        await revoke_refresh_token_by_jti_bl(auth_db, subject, UUID(jti))
    except ValueError as e:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=str(e),
        ) from e
    except PermissionError as e:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail=str(e),
        ) from e
    except TokenNotFoundError as e:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=str(e),
        ) from e
    return "Refresh token revoked"

userinfo(user_info) async

Get information about the user's identity.

Source code in diracx-routers/src/diracx/routers/auth/management.py
@router.get("/userinfo")
async def userinfo(
    user_info: Annotated[AuthorizedUserInfo, Depends(verify_dirac_access_token)],
) -> UserInfoResponse:
    """Get information about the user's identity."""
    return {
        "sub": user_info.sub,
        "vo": user_info.vo,
        "dirac_group": user_info.dirac_group,
        "properties": user_info.properties,
        "policies": user_info.policies,
        "preferred_username": user_info.preferred_username,
    }

Well Known

well_known

Attributes

router = DiracxRouter(require_auth=False, path_root='') module-attribute

Classes

Functions

get_openid_configuration(request, config, settings) async

OpenID Connect discovery endpoint.

Source code in diracx-routers/src/diracx/routers/auth/well_known.py
@router.get("/openid-configuration")
async def get_openid_configuration(
    request: Request,
    config: Config,
    settings: AuthSettings,
) -> OpenIDConfiguration:
    """OpenID Connect discovery endpoint."""
    return await get_openid_configuration_bl(
        str(request.url_for("get_oidc_token")),
        str(request.url_for("userinfo")),
        str(request.url_for("initiate_authorization_flow")),
        str(request.url_for("initiate_device_flow")),
        str(request.url_for("revoke_refresh_token_by_refresh_token")),
        str(request.url_for("get_jwks")),
        config,
        settings,
    )

get_jwks(settings) async

Get the JWKs (public keys).

Source code in diracx-routers/src/diracx/routers/auth/well_known.py
@router.get("/jwks.json")
async def get_jwks(
    settings: AuthSettings,
) -> dict:
    """Get the JWKs (public keys)."""
    return await get_jwks_bl(settings)

get_installation_metadata(config) async

Get metadata about the dirac installation.

Source code in diracx-routers/src/diracx/routers/auth/well_known.py
@router.get("/dirac-metadata")
async def get_installation_metadata(
    config: Config,
) -> Metadata:
    """Get metadata about the dirac installation."""
    return await get_installation_metadata_bl(config)

get_security_txt() async

Get the security.txt file.

Source code in diracx-routers/src/diracx/routers/auth/well_known.py
@router.get("/security.txt")
async def get_security_txt() -> str:
    """Get the security.txt file."""
    return """Contact: https://github.com/DIRACGrid/diracx/security/advisories/new
Expires: 2026-07-02T23:59:59.000Z
Preferred-Languages: en
"""

Utilities

utils

Classes

Functions

has_properties(expression)

Check if the user has the given properties.

Source code in diracx-routers/src/diracx/routers/auth/utils.py
def has_properties(expression: UnevaluatedProperty | SecurityProperty):
    """Check if the user has the given properties."""
    evaluator = (
        expression
        if isinstance(expression, UnevaluatedProperty)
        else UnevaluatedProperty(expression)
    )

    async def require_property(
        user: Annotated[AuthorizedUserInfo, Depends(verify_dirac_access_token)],
    ):
        if not evaluator(user.properties):
            raise HTTPException(status.HTTP_403_FORBIDDEN)

    return Depends(require_property)