"""The endpoints for registration_domain objects.
SPDX-License-Identifier: AGPL-3.0-only OR BSD-3-Clause-Clear
"""
from __future__ import annotations
import os
import typing as t
import cg_request_args as rqa
from cg_maybe import Maybe, Nothing
from cg_maybe.utils import maybe_from_nullable
from .. import paginated, parsers, utils
if t.TYPE_CHECKING or os.getenv("CG_EAGERIMPORT", False):
from .. import client
from ..models.create_registration_domain_data import (
CreateRegistrationDomainData,
)
from ..models.patch_registration_domain_data import (
PatchRegistrationDomainData,
)
from ..models.registration_domain import RegistrationDomain
_ClientT = t.TypeVar("_ClientT", bound="client._BaseClient")
[docs]
class RegistrationDomainService(t.Generic[_ClientT]):
__slots__ = ("__client",)
def __init__(self, client: _ClientT) -> None:
self.__client = client
[docs]
def get_all(
self: RegistrationDomainService[client.AuthenticatedClient],
*,
tenant_id: Maybe[str] = Nothing,
domain: Maybe[str] = Nothing,
page_size: int = 25,
) -> paginated.Response[RegistrationDomain]:
"""List registration domain rules.
:param tenant_id: If given, only return results for the given tenant.
:param domain: If given, filter by domain substring (case-insensitive).
:param page_size: The size of a single page, maximum is 100.
:returns: Paginated list of registration domain rules.
"""
url = "/api/v1/registration_domains/"
params: t.Dict[str, str | int | bool] = {
"page-size": page_size,
}
tenant_id_as_maybe = maybe_from_nullable(tenant_id)
if tenant_id_as_maybe.is_just:
params["tenant_id"] = tenant_id_as_maybe.value
domain_as_maybe = maybe_from_nullable(domain)
if domain_as_maybe.is_just:
params["domain"] = domain_as_maybe.value
if t.TYPE_CHECKING:
import httpx
def do_request(next_token: str | None) -> httpx.Response:
if next_token is None:
params.pop("next-token", "")
else:
params["next-token"] = next_token
with self.__client as client:
resp = client.http.get(url=url, params=params)
utils.log_warnings(resp)
return resp
def parse_response(
resp: httpx.Response,
) -> t.Sequence[RegistrationDomain]:
if utils.response_code_matches(resp.status_code, 200):
from ..models.registration_domain import (
RegistrationDomainParser,
)
return parsers.JsonResponseParser(
rqa.List(RegistrationDomainParser)
).try_parse(resp)
from ..models.any_error import AnyError
raise utils.get_error(
resp,
(
(
(400, 409, 401, 403, 404, 429, 500),
utils.unpack_union(AnyError),
),
),
)
return paginated.Response(do_request, parse_response)
[docs]
def create(
self: RegistrationDomainService[client.AuthenticatedClient],
json_body: CreateRegistrationDomainData,
) -> RegistrationDomain:
"""Create a new registration domain rule.
The request body is a tagged union discriminated on the `type` field:
- `allow`: maps a domain to a tenant role (requires
`tenant_role_id`). - `block`: blocks registration for a domain
globally.
:param json_body: The body of the request. See
:class:`.CreateRegistrationDomainData` for information about the
possible fields. You can provide this data as a
:class:`.CreateRegistrationDomainData` or as a dictionary.
:returns: The created registration domain rule.
"""
url = "/api/v1/registration_domains/"
params = None
with self.__client as client:
resp = client.http.post(
url=url, json=utils.to_dict(json_body), params=params
)
utils.log_warnings(resp)
if utils.response_code_matches(resp.status_code, 200):
from ..models.registration_domain import RegistrationDomainParser
return parsers.JsonResponseParser(
RegistrationDomainParser
).try_parse(resp)
from ..models.any_error import AnyError
raise utils.get_error(
resp,
(
(
(400, 409, 401, 403, 404, 429, 500),
utils.unpack_union(AnyError),
),
),
)
[docs]
def delete(
self: RegistrationDomainService[client.AuthenticatedClient],
*,
domain_id: str,
) -> None:
"""Delete a registration domain rule.
:param domain_id: The id of the rule to delete.
:returns: Empty response.
"""
url = "/api/v1/registration_domains/{domainId}".format(
domainId=domain_id
)
params = None
with self.__client as client:
resp = client.http.delete(url=url, params=params)
utils.log_warnings(resp)
if utils.response_code_matches(resp.status_code, 204):
return parsers.ConstantlyParser(None).try_parse(resp)
from ..models.any_error import AnyError
raise utils.get_error(
resp,
(
(
(400, 409, 401, 403, 404, 429, 500),
utils.unpack_union(AnyError),
),
),
)
[docs]
def patch(
self: RegistrationDomainService[client.AuthenticatedClient],
json_body: PatchRegistrationDomainData,
*,
domain_id: str,
) -> RegistrationDomain:
"""Update an existing registration domain rule.
All fields are optional — only the provided fields are updated.
:param json_body: The body of the request. See
:class:`.PatchRegistrationDomainData` for information about the
possible fields. You can provide this data as a
:class:`.PatchRegistrationDomainData` or as a dictionary.
:param domain_id: The id of the rule to update.
:returns: The updated registration domain rule.
"""
url = "/api/v1/registration_domains/{domainId}".format(
domainId=domain_id
)
params = None
with self.__client as client:
resp = client.http.patch(
url=url, json=utils.to_dict(json_body), params=params
)
utils.log_warnings(resp)
if utils.response_code_matches(resp.status_code, 200):
from ..models.registration_domain import RegistrationDomainParser
return parsers.JsonResponseParser(
RegistrationDomainParser
).try_parse(resp)
from ..models.any_error import AnyError
raise utils.get_error(
resp,
(
(
(400, 409, 401, 403, 404, 429, 500),
utils.unpack_union(AnyError),
),
),
)