Skip to content

Command Line Interface

DiracX command-line interface modules and commands.

Jobs Commands

jobs

Attributes

app = AsyncTyper() module-attribute

available_operators = f'Scalar operators: {', '.join([(op.value) for op in ScalarSearchOperator])}. Vector operators: {', '.join([(op.value) for op in VectorSearchOperator])}.' module-attribute

Classes

ContentRange

Source code in diracx-cli/src/diracx/cli/jobs.py
class ContentRange:
    unit: str | None = None
    start: int | None = None
    end: int | None = None
    total: int | None = None

    def __init__(self, header: str):
        if match := re.fullmatch(r"(\w+) (\d+-\d+|\*)/(\d+|\*)", header):
            self.unit, range, total = match.groups()
            self.total = int(total)
            if range != "*":
                self.start, self.end = map(int, range.split("-"))
        elif match := re.fullmatch(r"\w+", header):
            self.unit = match.group()

    @property
    def caption(self):
        if self.start is None and self.end is None:
            range_str = "all"
        else:
            range_str = (
                f"{self.start if self.start is not None else 'unknown'}-"
                f"{self.end if self.end is not None else 'unknown'} "
                f"of {self.total or 'unknown'}"
            )
        return f"Showing {range_str} {self.unit}"
Attributes
unit = None class-attribute instance-attribute
start = None class-attribute instance-attribute
end = None class-attribute instance-attribute
total = None class-attribute instance-attribute
caption property

Functions

parse_condition(value)

Source code in diracx-cli/src/diracx/cli/jobs.py
def parse_condition(value: str) -> SearchSpec:
    parameter, operator, rest = value.split(" ", 2)
    if operator in set(ScalarSearchOperator):
        return {
            "parameter": parameter,
            "operator": ScalarSearchOperator(operator),
            "value": rest,
        }
    elif operator in set(VectorSearchOperator):
        return {
            "parameter": parameter,
            "operator": VectorSearchOperator(operator),
            "values": json.loads(rest),
        }
    else:
        raise ValueError(f"Unknown operator {operator}")

search(parameter=['JobID', 'Status', 'MinorStatus', 'ApplicationStatus', 'JobGroup', 'Site', 'JobName', 'Owner', 'LastUpdateTime'], condition=[], all=False, page=1, per_page=10) async

Source code in diracx-cli/src/diracx/cli/jobs.py
@app.async_command()
async def search(
    parameter: list[str] = [
        "JobID",
        "Status",
        "MinorStatus",
        "ApplicationStatus",
        "JobGroup",
        "Site",
        "JobName",
        "Owner",
        "LastUpdateTime",
    ],
    condition: Annotated[
        list[str], Option(help=f'Example: "JobID eq 1000". {available_operators}')
    ] = [],
    all: bool = False,
    page: int = 1,
    per_page: int = 10,
):
    search_specs = [parse_condition(cond) for cond in condition]
    async with AsyncDiracClient() as api:
        jobs, content_range = await api.jobs.search(
            parameters=None if all else parameter,
            search=search_specs if search_specs else None,
            page=page,
            per_page=per_page,
            cls=lambda _, jobs, headers: (
                jobs,
                ContentRange(headers.get("Content-Range", "jobs")),
            ),
        )

    display(jobs, cast(ContentRange, content_range))

display(data, content_range)

Source code in diracx-cli/src/diracx/cli/jobs.py
def display(data, content_range: ContentRange):
    output_format = get_diracx_preferences().output_format
    match output_format:
        case OutputFormats.JSON:
            print(json.dumps(data, indent=2))
        case OutputFormats.RICH:
            display_rich(data, content_range)
        case _:
            raise NotImplementedError(output_format)

display_rich(data, content_range)

Source code in diracx-cli/src/diracx/cli/jobs.py
def display_rich(data, content_range: ContentRange) -> None:
    if not data:
        print(f"No {content_range.unit} found")
        return

    console = Console()
    columns = [str(c) for c in data[0].keys()]
    if sum(map(len, columns)) > 0.75 * console.width:
        table = Table(
            "Parameter",
            "Value",
            caption=content_range.caption,
            caption_justify="right",
        )
        for job in data:
            for k, v in job.items():
                table.add_row(k, str(v))
            table.add_section()
    else:
        table = Table(
            *columns,
            caption=content_range.caption,
            caption_justify="right",
        )
        for job in data:
            table.add_row(*map(str, job.values()))
    console.print(table)

submit(jdl) async

Source code in diracx-cli/src/diracx/cli/jobs.py
@app.async_command()
async def submit(jdl: list[FileText]):
    async with AsyncDiracClient() as api:
        jobs = await api.jobs.submit_jdl_jobs([x.read() for x in jdl])
    print(
        f"Inserted {len(jobs)} jobs with ids: {','.join(map(str, (job.job_id for job in jobs)))}"
    )

Auth Commands

auth

Attributes

app = AsyncTyper() module-attribute

Classes

Functions

installation_metadata() async

Source code in diracx-cli/src/diracx/cli/auth.py
async def installation_metadata():
    async with AsyncDiracClient() as api:
        return await api.well_known.get_installation_metadata()

vo_callback(vo)

Source code in diracx-cli/src/diracx/cli/auth.py
def vo_callback(vo: str | None) -> str:
    metadata = asyncio.run(installation_metadata())
    vos = list(metadata.virtual_organizations)
    if not vo:
        raise typer.BadParameter(
            f"VO must be specified, available options are: {' '.join(vos)}"
        )
    if vo not in vos:
        raise typer.BadParameter(
            f"Unknown VO {vo}, available options are: {' '.join(vos)}"
        )
    return vo

login(vo=None, group=typer.Option(None, help='Group name within the VO. If not provided, the default group for the VO will be used.'), property=typer.Option(None, help='List of properties to add to the default properties of the group. If not provided, default properties of the group will be used.')) async

Login to the DIRAC system using the device flow.

  • If only VO is provided: Uses the default group and its properties for the VO.

  • If VO and group are provided: Uses the specified group and its properties for the VO.

  • If VO and properties are provided: Uses the default group and combines its properties with the provided properties.

  • If VO, group, and properties are provided: Uses the specified group and combines its properties with the provided properties.

Source code in diracx-cli/src/diracx/cli/auth.py
@app.async_command()
async def login(
    vo: Annotated[
        Optional[str],
        typer.Argument(callback=vo_callback, help="Virtual Organization name"),
    ] = None,
    group: Optional[str] = typer.Option(
        None,
        help="Group name within the VO. If not provided, the default group for the VO will be used.",
    ),
    property: Optional[list[str]] = typer.Option(
        None,
        help=(
            "List of properties to add to the default properties of the group. "
            "If not provided, default properties of the group will be used."
        ),
    ),
):
    """Login to the DIRAC system using the device flow.

    - If only VO is provided: Uses the default group and its properties for the VO.

    - If VO and group are provided: Uses the specified group and its properties for the VO.

    - If VO and properties are provided: Uses the default group and combines its properties with the
      provided properties.

    - If VO, group, and properties are provided: Uses the specified group and combines its properties with the
      provided properties.
    """
    scopes = [f"vo:{vo}"]
    if group:
        scopes.append(f"group:{group}")
    if property:
        scopes += [f"property:{p}" for p in property]

    print(f"Logging in with scopes: {scopes}")
    async with AsyncDiracClient() as api:
        data = await api.auth.initiate_device_flow(
            client_id=api.client_id,
            scope=" ".join(scopes),
        )
        print("Now go to:", data.verification_uri_complete)
        expires = datetime.now(tz=timezone.utc) + timedelta(
            seconds=data.expires_in - 30
        )
        while expires > datetime.now(tz=timezone.utc):
            print(".", end="", flush=True)
            response = await api.auth.get_oidc_token(
                device_code=data.device_code, client_id=api.client_id
            )  # type: ignore
            if isinstance(response, DeviceFlowErrorResponse):
                if response.error == "authorization_pending":
                    # TODO: Setting more than 5 seconds results in an error
                    # Related to keep-alive disconnects from uvicon (--timeout-keep-alive)
                    await sleep(2)
                    continue
                raise RuntimeError(f"Device flow failed with {response}")
            break
        else:
            raise RuntimeError("Device authorization flow expired")

        # Save credentials
        write_credentials(response)
        credentials_path = get_diracx_preferences().credentials_path
        print(f"Saved credentials to {credentials_path}")
    print("\nLogin successful!")

whoami() async

Source code in diracx-cli/src/diracx/cli/auth.py
@app.async_command()
async def whoami():
    async with AsyncDiracClient() as api:
        user_info = await api.auth.userinfo()
        # TODO: Add a RICH output format
        print(json.dumps(user_info.as_dict(), indent=2))

logout() async

Source code in diracx-cli/src/diracx/cli/auth.py
@app.async_command()
async def logout():
    async with AsyncDiracClient() as api:
        credentials_path = get_diracx_preferences().credentials_path
        if credentials_path.exists():
            credentials = read_credentials(credentials_path)

            # Revoke refresh token
            try:
                await api.auth.revoke_refresh_token_by_refresh_token(
                    client_id=api.client_id, token=credentials.refresh_token
                )
            except Exception as e:
                print(f"Error revoking the refresh token {e!r}")
                pass

            # Remove credentials
            credentials_path.unlink(missing_ok=True)
            print(f"Removed credentials from {credentials_path}")
        else:
            print("You are not connected to DiracX, or your credentials are missing.")
            return
    print("\nLogout successful!")

callback(output_format=None)

Source code in diracx-cli/src/diracx/cli/auth.py
@app.callback()
def callback(output_format: Optional[str] = None):
    if output_format is not None:
        os.environ["DIRACX_OUTPUT_FORMAT"] = output_format

Config Commands

config

Attributes

app = AsyncTyper() module-attribute

Classes

Functions

dump() async

Source code in diracx-cli/src/diracx/cli/config.py
@app.async_command()
async def dump():
    async with AsyncDiracClient() as api:
        config = await api.config.serve_config()
        display(config)

display(data)

Source code in diracx-cli/src/diracx/cli/config.py
def display(data):
    output_format = get_diracx_preferences().output_format
    match output_format:
        case OutputFormats.JSON:
            print(json.dumps(data, indent=2))
        case OutputFormats.RICH:
            print_json(data=data)
        case _:
            raise NotImplementedError(output_format)

Internal Commands

Legacy Commands

legacy

Attributes

app = AsyncTyper() module-attribute

BASE_64_URL_SAFE_PATTERN = '(?:[A-Za-z0-9\\-_]{4})*(?:[A-Za-z0-9\\-_]{2}==|[A-Za-z0-9\\-_]{3}=)?' module-attribute

LEGACY_EXCHANGE_PATTERN = f'diracx:legacy:({BASE_64_URL_SAFE_PATTERN})' module-attribute

Classes

IdPConfig pydantic-model

Bases: BaseModel

Fields:

Source code in diracx-cli/src/diracx/cli/internal/legacy.py
class IdPConfig(BaseModel):
    URL: str
    ClientID: str
Attributes
URL pydantic-field
ClientID pydantic-field

VOConfig pydantic-model

Bases: BaseModel

Fields:

Source code in diracx-cli/src/diracx/cli/internal/legacy.py
class VOConfig(BaseModel):
    DefaultGroup: str
    IdP: IdPConfig
    UserSubjects: dict[str, str]
    Support: SupportInfo = Field(default_factory=SupportInfo)
Attributes
DefaultGroup pydantic-field
IdP pydantic-field
UserSubjects pydantic-field
Support pydantic-field

ConversionConfig pydantic-model

Bases: BaseModel

Fields:

Source code in diracx-cli/src/diracx/cli/internal/legacy.py
class ConversionConfig(BaseModel):
    VOs: dict[str, VOConfig]
Attributes
VOs pydantic-field

Functions

cs_sync(old_file, new_file)

Load the old CS and convert it to the new YAML format.

Source code in diracx-cli/src/diracx/cli/internal/legacy.py
@app.command()
def cs_sync(old_file: Path, new_file: Path):
    """Load the old CS and convert it to the new YAML format."""
    if not os.environ.get("DIRAC_COMPAT_ENABLE_CS_CONVERSION"):
        raise RuntimeError(
            "DIRAC_COMPAT_ENABLE_CS_CONVERSION must be set for the conversion to be possible"
        )

    old_data = old_file.read_text()
    cfg = diraccfg.CFG().loadFromBuffer(old_data)
    raw = cfg.getAsDict()

    diracx_section = cast("CFGAsDict", raw["DiracX"])
    # DisabledVOs cannot be set if any Legacy clients are enabled
    disabled_vos = diracx_section.get("DisabledVOs")
    enabled_clients = []
    for _, client_status in cast(
        "CFGAsDict", diracx_section.get("LegacyClientEnabled", {})
    ).items():
        for _, str_status in cast("CFGAsDict", client_status).items():
            enabled_clients.append(str_status == "True")
    if disabled_vos and any(enabled_clients):
        raise RuntimeError(
            "DisabledVOs cannot be set if any Legacy clients are enabled"
        )

    _apply_fixes(raw)
    config_class: Config = select_from_extension(group="diracx", name="config")[
        0
    ].load()
    config = config_class.model_validate(raw)
    new_file.write_text(
        yaml.safe_dump(config.model_dump(exclude_unset=True, mode="json"))
    )

generate_helm_values(public_cfg=Option(help='Path to the cfg file served by the CS'), secret_cfg=Option(default=None, help='Path to the cfg containing the secret'), output_file=Option(help='Where to dump the yam file'))

Generate an initial values.yaml to run a DiracX installation.

The file generated is not complete, and needs manual editing.

Source code in diracx-cli/src/diracx/cli/internal/legacy.py
@app.command()
def generate_helm_values(
    public_cfg: Path = Option(help="Path to the cfg file served by the CS"),
    secret_cfg: Path = Option(
        default=None, help="Path to the cfg containing the secret"
    ),
    output_file: Path = Option(help="Where to dump the yam file"),
):
    """Generate an initial values.yaml to run a DiracX installation.

    The file generated is not complete, and needs manual editing.
    """
    helm_values = {
        "developer": {"enabled": False},
        "initCs": {"enabled": True},
        "initSecrets": {"enabled": True},
        "initSql": {"enabled": False},
        "cert-manager": {"enabled": False},
        "cert-manager-issuer": {"enabled": False},
        "minio": {"enabled": False},
        "dex": {"enabled": False},
        "opensearch": {"enabled": False},
        # This is Openshift specific, change it maybe
        "ingress": {
            "enabled": True,
            "className": None,
            "tlsSecretName": None,
            "annotations": {
                "route.openshift.io/termination": "edge",
                "haproxy.router.openshift.io/ip_whitelist": "",
            },
        },
        "rabbitmq": {"enabled": False},
        "mysql": {"enabled": False},
    }

    cfg = diraccfg.CFG().loadFromBuffer(public_cfg.read_text())

    if secret_cfg:
        cfg = cfg.mergeWith(diraccfg.CFG().loadFromBuffer(secret_cfg.read_text()))

    cfg = cast(dict, cfg.getAsDict())

    diracx_url = cfg["DiracX"]["URL"]
    diracx_hostname = urlparse(diracx_url).netloc.split(":", 1)[0]

    diracx_config: dict = {
        "sqlDbs": {},
        "osDbs": {},
    }

    diracx_settings: dict[str, str] = {"DIRACX_CONFIG_BACKEND_URL": "FILL ME"}
    diracx_config["settings"] = diracx_settings
    helm_values["diracx"] = diracx_config
    diracx_config["hostname"] = diracx_hostname

    diracx_settings["DIRACX_SERVICE_AUTH_TOKEN_ISSUER"] = diracx_url
    diracx_settings["DIRACX_SERVICE_AUTH_ALLOWED_REDIRECTS"] = json.dumps(
        [
            urljoin(diracx_url, "api/docs/oauth2-redirect"),
            urljoin(diracx_url, "/#authentication-callback"),
        ]
    )

    ### SQL DBs

    default_db_user = cfg["Systems"].get("Databases", {}).get("User")
    default_db_password = cfg["Systems"].get("Databases", {}).get("Password")
    default_db_host = cfg["Systems"].get("Databases", {}).get("Host", "FILL ME")
    default_db_port = cfg["Systems"].get("Databases", {}).get("Port", "FILL ME")

    all_db_configs = {}
    sql_dbs = {
        "dbs": {},
        "default": {
            "host": f"{default_db_host}:{default_db_port}",
            "password": default_db_password,
            "rootPassword": "FILL ME",
            "rootUser": "FILL ME",
            "user": default_db_user,
        },
    }
    for _system, system_config in cfg["Systems"].items():
        all_db_configs.update(system_config.get("Databases", {}))

    from diracx.core.extensions import select_from_extension

    for entry_point in select_from_extension(group="diracx.dbs.sql"):
        db_name = entry_point.name
        db_config = all_db_configs.get(db_name, {})

        sql_dbs["dbs"][db_name] = {}
        # There is a DIRAC AuthDB, but it is not the same
        # as the DiracX one
        if db_name == "AuthDB":
            sql_dbs["dbs"]["AuthDB"] = {"internalName": "DiracXAuthDB"}

        if "DBName" in db_config:
            indb_name = db_config["DBName"]
            if indb_name != db_name:
                sql_dbs["dbs"]["internalName"] = indb_name
        if "User" in db_config:
            sql_dbs["dbs"][db_name]["user"] = db_config.get("User")
        if "Password" in db_config:
            sql_dbs["dbs"][db_name]["password"] = db_config.get("Password")
        if "Host" in db_config or "Port" in db_config:
            sql_dbs["dbs"][db_name]["host"] = (
                f"{db_config.get('Host', default_db_host)}:{db_config.get('Port', default_db_port)}"
            )
        if not sql_dbs["dbs"][db_name]:
            sql_dbs["dbs"][db_name] = None

    diracx_config["sqlDbs"] = sql_dbs

    #### END SQL DB

    # #### OS DBs

    default_os_db_user = cfg["Systems"].get("NoSQLDatabases", {}).get("User")
    default_os_db_password = cfg["Systems"].get("NoSQLDatabases", {}).get("Password")
    default_os_db_host = cfg["Systems"].get("NoSQLDatabases", {}).get("Host", "FILL ME")

    os_dbs = {
        "dbs": {},
        "default": {
            "host": f"{default_os_db_host}",
            "password": default_os_db_password,
            "rootPassword": "FILL ME",
            "rootUser": "FILL ME",
            "user": default_os_db_user,
        },
    }

    for entry_point in select_from_extension(group="diracx.dbs.os"):
        db_name = entry_point.name
        db_config = all_db_configs.get(db_name, {})

        os_dbs["dbs"][db_name] = {}
        # There is a DIRAC AuthDB, but it is not the same
        # as the DiracX one

        if "DBName" in db_config:
            indb_name = db_config["DBName"]
            if indb_name != db_name:
                os_dbs["dbs"]["internalName"] = indb_name
        if "User" in db_config:
            os_dbs["dbs"][db_name]["user"] = db_config["User"]
        if "Password" in db_config:
            os_dbs["dbs"][db_name]["password"] = db_config["Password"]
        if "Host" in db_config:
            os_dbs["dbs"][db_name]["host"] = db_config["Host"]

        if not os_dbs["dbs"][db_name]:
            os_dbs["dbs"][db_name] = None

    diracx_config["osDbs"] = os_dbs

    #### End OS DBs

    # Settings for the legacy
    try:
        if match := re.fullmatch(
            LEGACY_EXCHANGE_PATTERN, cfg["DiracX"]["LegacyExchangeApiKey"]
        ):
            raw_token = base64.urlsafe_b64decode(match.group(1))
        else:
            raise ValueError(
                "Invalid authorization header",
            )

        diracx_settings["DIRACX_LEGACY_EXCHANGE_HASHED_API_KEY"] = hashlib.sha256(
            raw_token
        ).hexdigest()
    except KeyError:
        error_msg = """
            ERROR: you must have '/DiracX/LegacyExchangeApiKey' already set.
            See the `legacy_exchange` function definition for how to generate it in python
        """
        typer.echo(error_msg, err=True)
        raise typer.Exit(1) from None
    # Sandboxstore settings
    # TODO: Integrate minio for production use (ingress, etc)
    # By default, take the server hostname and prepend "sandboxes"
    diracx_settings["DIRACX_SANDBOX_STORE_BUCKET_NAME"] = (
        f"{diracx_hostname.split('.')[0]}-sandboxes"
    )
    try:
        diracx_settings["DIRACX_SANDBOX_STORE_SE_NAME"] = cfg["Systems"][
            "WorkloadManagement"
        ]["Services"]["SandboxStore"]["LocalSE"]
    except KeyError:
        pass

    diracx_settings["DIRACX_SANDBOX_STORE_S3_CLIENT_KWARGS"] = json.dumps(
        {
            "endpoint_url": "FILL ME",
            "aws_access_key_id": "FILL ME",
            "aws_secret_access_key": "FILL ME",
        }
    )

    diracx_settings["DIRACX_SERVICE_JOBS_ENABLED"] = "true"
    diracx_settings["DIRACX_SANDBOX_STORE_AUTO_CREATE_BUCKET"] = "true"
    output_file.write_text(yaml.safe_dump(helm_values))
    typer.echo(
        "The file is incomplete and needs manual editing (grep for 'FILL ME')", err=True
    )

Internal Config

config

Attributes

app = AsyncTyper() module-attribute

Classes

Functions

get_repo_path(config_repo_str)

Source code in diracx-cli/src/diracx/cli/internal/config.py
def get_repo_path(config_repo_str: str) -> Path:
    config_repo = TypeAdapter(ConfigSourceUrl).validate_python(config_repo_str)
    if config_repo.scheme != "git+file" or config_repo.path is None:
        raise NotImplementedError("Only git+file:// URLs are supported")

    repo_path = Path(config_repo.path)

    return repo_path

get_config_from_repo_path(repo_path)

Source code in diracx-cli/src/diracx/cli/internal/config.py
def get_config_from_repo_path(repo_path: Path) -> Config:
    return ConfigSource.create_from_url(backend_url=repo_path).read_config()

generate_cs(config_repo)

Generate a minimal DiracX configuration repository.

Source code in diracx-cli/src/diracx/cli/internal/config.py
@app.command()
def generate_cs(config_repo: str):
    """Generate a minimal DiracX configuration repository."""
    # TODO: The use of TypeAdapter should be moved in to typer itself

    repo_path = get_repo_path(config_repo)

    if repo_path.exists() and list(repo_path.iterdir()):
        typer.echo(f"ERROR: Directory {repo_path} already exists", err=True)
        raise typer.Exit(1)

    config = Config(
        Registry={},
        DIRAC=DIRACConfig(),
        Operations={"Defaults": OperationsConfig()},
    )

    git.Repo.init(repo_path, initial_branch="master")
    update_config_and_commit(
        repo_path=repo_path, config=config, message="Initial commit"
    )
    typer.echo(f"Successfully created repo in {config_repo}", err=True)

add_vo(config_repo, *, vo, default_group='user', idp_url, idp_client_id)

Add a registry entry (vo) to an existing configuration repository.

Source code in diracx-cli/src/diracx/cli/internal/config.py
@app.command()
def add_vo(
    config_repo: str,
    *,
    vo: Annotated[str, typer.Option()],
    default_group: Optional[str] = "user",
    idp_url: Annotated[str, typer.Option()],
    idp_client_id: Annotated[str, typer.Option()],
):
    """Add a registry entry (vo) to an existing configuration repository."""
    # TODO: The use of TypeAdapter should be moved in to typer itself
    repo_path = get_repo_path(config_repo)
    config = get_config_from_repo_path(repo_path)

    # A VO should at least contain a default group
    new_registry = RegistryConfig(
        IdP=IdpConfig(URL=idp_url, ClientID=idp_client_id),
        DefaultGroup=default_group,
        Users={},
        Groups={
            default_group: GroupConfig(
                Properties={"NormalUser"}, Quota=None, Users=set()
            )
        },
    )

    if vo in config.Registry:
        typer.echo(f"ERROR: VO {vo} already exists", err=True)
        raise typer.Exit(1)

    config.Registry[vo] = new_registry

    update_config_and_commit(
        repo_path=repo_path,
        config=config,
        message=f"Added vo {vo} registry (default group {default_group} and idp {idp_url})",
    )
    typer.echo(f"Successfully added vo to {config_repo}", err=True)

add_group(config_repo, *, vo, group, properties=['NormalUser'])

Add a group to an existing vo in the configuration repository.

Source code in diracx-cli/src/diracx/cli/internal/config.py
@app.command()
def add_group(
    config_repo: str,
    *,
    vo: Annotated[str, typer.Option()],
    group: Annotated[str, typer.Option()],
    properties: list[str] = ["NormalUser"],
):
    """Add a group to an existing vo in the configuration repository."""
    # TODO: The use of TypeAdapter should be moved in to typer itself
    repo_path = get_repo_path(config_repo)
    config = get_config_from_repo_path(repo_path)

    new_group = GroupConfig(Properties=set(properties), Quota=None, Users=set())

    if vo not in config.Registry:
        typer.echo(f"ERROR: Virtual Organization {vo} does not exist", err=True)
        raise typer.Exit(1)

    if group in config.Registry[vo].Groups.keys():
        typer.echo(f"ERROR: Group {group} already exists in {vo}", err=True)
        raise typer.Exit(1)

    config.Registry[vo].Groups[group] = new_group

    update_config_and_commit(
        repo_path=repo_path, config=config, message=f"Added group {group} in {vo}"
    )
    typer.echo(f"Successfully added group to {config_repo}", err=True)

add_user(config_repo, *, vo, groups=None, sub, preferred_username)

Add a user to an existing vo and group.

Source code in diracx-cli/src/diracx/cli/internal/config.py
@app.command()
def add_user(
    config_repo: str,
    *,
    vo: Annotated[str, typer.Option()],
    groups: Annotated[Optional[list[str]], typer.Option("--group")] = None,
    sub: Annotated[str, typer.Option()],
    preferred_username: Annotated[str, typer.Option()],
):
    """Add a user to an existing vo and group."""
    # TODO: The use of TypeAdapter should be moved in to typer itself
    repo_path = get_repo_path(config_repo)
    config = get_config_from_repo_path(repo_path)

    new_user = UserConfig(PreferedUsername=preferred_username)

    if vo not in config.Registry:
        typer.echo(f"ERROR: Virtual Organization {vo} does not exist", err=True)
        raise typer.Exit(1)

    if sub in config.Registry[vo].Users:
        typer.echo(f"ERROR: User {sub} already exists", err=True)
        raise typer.Exit(1)

    config.Registry[vo].Users[sub] = new_user

    if not groups:
        groups = [config.Registry[vo].DefaultGroup]

    for group in set(groups):
        if group not in config.Registry[vo].Groups:
            typer.echo(f"ERROR: Group {group} does not exist in {vo}", err=True)
            raise typer.Exit(1)
        if sub in config.Registry[vo].Groups[group].Users:
            typer.echo(f"ERROR: User {sub} already exists in group {group}", err=True)
            raise typer.Exit(1)

        config.Registry[vo].Groups[group].Users.add(sub)

    update_config_and_commit(
        repo_path=repo_path,
        config=config,
        message=f"Added user {sub} ({preferred_username}) to vo {vo} and groups {groups}",
    )
    typer.echo(f"Successfully added user to {config_repo}", err=True)

update_config_and_commit(repo_path, config, message)

Update the yaml file in the repo and commit it.

Source code in diracx-cli/src/diracx/cli/internal/config.py
def update_config_and_commit(repo_path: Path, config: Config, message: str):
    """Update the yaml file in the repo and commit it."""
    repo = git.Repo(repo_path)
    yaml_path = repo_path / "default.yml"
    typer.echo(f"Writing back configuration to {yaml_path}", err=True)
    yaml_path.write_text(
        yaml.safe_dump(config.model_dump(exclude_unset=True, mode="json"))
    )
    repo.index.add([yaml_path.relative_to(repo_path)])
    repo.index.commit(message)

CLI Utilities

utils

Classes

AsyncTyper

Bases: Typer

Source code in diracx-cli/src/diracx/cli/utils.py
class AsyncTyper(typer.Typer):
    def async_command(self, *args, **kwargs):
        def decorator(async_func):
            @wraps(async_func)
            def sync_func(*_args, **_kwargs):
                try:
                    return run(async_func(*_args, **_kwargs))
                except ClientAuthenticationError:
                    print(
                        ":x: [bold red]You are not authenticated. Log in with:[/bold red] "
                        "[bold] dirac login [OPTIONS] [VO] [/bold]"
                    )
                except ConnectError:
                    print(
                        ":x: [bold red]Please configure a valid DiracX server.[/bold red]"
                    )

            self.command(*args, **kwargs)(sync_func)
            return async_func

        return decorator
Functions
async_command(*args, **kwargs)
Source code in diracx-cli/src/diracx/cli/utils.py
def async_command(self, *args, **kwargs):
    def decorator(async_func):
        @wraps(async_func)
        def sync_func(*_args, **_kwargs):
            try:
                return run(async_func(*_args, **_kwargs))
            except ClientAuthenticationError:
                print(
                    ":x: [bold red]You are not authenticated. Log in with:[/bold red] "
                    "[bold] dirac login [OPTIONS] [VO] [/bold]"
                )
            except ConnectError:
                print(
                    ":x: [bold red]Please configure a valid DiracX server.[/bold red]"
                )

        self.command(*args, **kwargs)(sync_func)
        return async_func

    return decorator