Skip to content

Cluster

geneva.cluster.mgr.GenevaCluster

A Geneva Cluster represents the backend compute infrastructure for the execution environment.

Source code in geneva/cluster/mgr.py
@attrs.define
class GenevaCluster:
    """A Geneva Cluster represents the backend compute infrastructure
    for the execution environment."""

    cluster_type: GenevaClusterType = attrs.field(metadata={"pa_type": pa.string()})
    name: str = attrs.field()

    # serialize nested structs as json strings, so we can add new fields
    # without requiring schema evolution
    kuberay: Optional[KubeRayConfig] = attrs.field(
        default=None, metadata={"pa_type": "string"}
    )
    created_at: datetime = attrs.field(
        factory=lambda: datetime.now(timezone.utc),
        metadata={"pa_type": pa.timestamp("us", tz="UTC")},
    )
    created_by: str = attrs.field(factory=current_user)

    def validate(self) -> None:
        # use attrs validation on RayCluster
        self.to_ray_cluster()

    def to_ray_cluster(self) -> "RayCluster":
        """Convert the persisted cluster definition into internal RayCluster model"""
        from geneva.runners.ray.raycluster import (
            RayCluster,
            _HeadGroupSpec,
            _WorkerGroupSpec,
        )

        c = asdict(self)
        k = c["kuberay"]
        k.pop("use_portforwarding")
        k["region"] = k.pop("aws_region")
        k["role_name"] = k.pop("aws_role_name")
        k["name"] = c["name"]
        k["config_method"] = K8sConfigMethod(k["config_method"])

        # Create spec objects directly - k8s_spec_override deep-merged in definition()
        k["head_group"] = _HeadGroupSpec(**k["head_group"])
        k["worker_groups"] = [_WorkerGroupSpec(**wg) for wg in k["worker_groups"]]

        # Extract ray_init_kwargs
        ray_init_kwargs = k.pop("ray_init_kwargs", {})
        rc = RayCluster(**k, ray_init_kwargs=ray_init_kwargs)
        return rc

    def as_dict(self) -> dict:
        return attrs.asdict(
            self,
            value_serializer=lambda obj, a, v: v.value
            if isinstance(v, enum.Enum)
            else v,
        )

cluster_type

cluster_type: GenevaClusterType = field(
    metadata={"pa_type": string()}
)

name

name: str = field()

kuberay

kuberay: Optional[KubeRayConfig] = field(
    default=None, metadata={"pa_type": "string"}
)

created_at

created_at: datetime = field(
    factory=lambda: now(utc),
    metadata={"pa_type": timestamp("us", tz="UTC")},
)

created_by

created_by: str = field(factory=current_user)

validate

validate() -> None
Source code in geneva/cluster/mgr.py
def validate(self) -> None:
    # use attrs validation on RayCluster
    self.to_ray_cluster()

to_ray_cluster

to_ray_cluster() -> RayCluster

Convert the persisted cluster definition into internal RayCluster model

Source code in geneva/cluster/mgr.py
def to_ray_cluster(self) -> "RayCluster":
    """Convert the persisted cluster definition into internal RayCluster model"""
    from geneva.runners.ray.raycluster import (
        RayCluster,
        _HeadGroupSpec,
        _WorkerGroupSpec,
    )

    c = asdict(self)
    k = c["kuberay"]
    k.pop("use_portforwarding")
    k["region"] = k.pop("aws_region")
    k["role_name"] = k.pop("aws_role_name")
    k["name"] = c["name"]
    k["config_method"] = K8sConfigMethod(k["config_method"])

    # Create spec objects directly - k8s_spec_override deep-merged in definition()
    k["head_group"] = _HeadGroupSpec(**k["head_group"])
    k["worker_groups"] = [_WorkerGroupSpec(**wg) for wg in k["worker_groups"]]

    # Extract ray_init_kwargs
    ray_init_kwargs = k.pop("ray_init_kwargs", {})
    rc = RayCluster(**k, ray_init_kwargs=ray_init_kwargs)
    return rc

as_dict

as_dict() -> dict
Source code in geneva/cluster/mgr.py
def as_dict(self) -> dict:
    return attrs.asdict(
        self,
        value_serializer=lambda obj, a, v: v.value
        if isinstance(v, enum.Enum)
        else v,
    )

geneva.cluster.builder.GenevaClusterBuilder

Fluent builder for GenevaCluster. name is required, all optional fields will use defaults. example usage: >>> GenevaClusterBuilder >>> .name("my-cluster") >>> .head_group_builder(HeadGroupBuilder().cpus(8).memory("16Gi")) >>> .add_worker_group(WorkerGroupBuilder() >>> .gpu_worker(memory="8Gi", gpus=1)) >>> .build()

Source code in geneva/cluster/builder.py
class GenevaClusterBuilder:
    """Fluent builder for GenevaCluster. `name` is required, all optional
    fields will use defaults.
        example usage:
         >>> GenevaClusterBuilder
         >>>    .name("my-cluster")
         >>>    .head_group_builder(HeadGroupBuilder().cpus(8).memory("16Gi"))
         >>>    .add_worker_group(WorkerGroupBuilder()
         >>>                         .gpu_worker(memory="8Gi", gpus=1))
         >>>    .build()
    """

    def __init__(self) -> None:
        self._name: str | None = None
        self._cluster_type: GenevaClusterType = GenevaClusterType.KUBE_RAY
        self._namespace: str = "geneva"
        self._config_method: K8sConfigMethod = K8sConfigMethod.LOCAL
        self._use_portforwarding: bool = True
        self._aws_region: str | None = None
        self._aws_role_name: str | None = None
        self._ray_init_kwargs: dict = {}

        # Head group defaults
        self._head_image: str = default_image()
        self._head_cpus: int = 2
        self._head_memory: str = "4Gi"
        self._head_gpus: int = 0
        self._head_service_account: str = "geneva-service-account"
        self._head_node_selector: dict[str, str] = {}
        self._head_labels: dict[str, str] = {}
        self._head_tolerations: list[dict[str, str]] = []

        # Worker group defaults
        self._worker_groups: list[WorkerGroupConfig] = []

    def name(self, name: str) -> "GenevaClusterBuilder":
        """Set the cluster name."""
        self._name = name
        return self

    def cluster_type(self, cluster_type: GenevaClusterType) -> "GenevaClusterBuilder":
        """Set the cluster type."""
        self._cluster_type = cluster_type
        return self

    def namespace(self, namespace: str) -> "GenevaClusterBuilder":
        """Set the Kubernetes namespace."""
        self._namespace = namespace
        return self

    def config_method(self, method: K8sConfigMethod) -> "GenevaClusterBuilder":
        """Set the Kubernetes config method."""
        self._config_method = method
        return self

    def portforwarding(self, enabled: bool = True) -> "GenevaClusterBuilder":
        """Enable or disable port forwarding."""
        self._use_portforwarding = enabled
        return self

    def aws_config(
        self, region: str | None = None, role_name: str | None = None
    ) -> "GenevaClusterBuilder":
        """Configure AWS settings."""
        self._aws_region = region
        self._aws_role_name = role_name
        return self

    def ray_init_kwargs(self, kwargs: dict) -> "GenevaClusterBuilder":
        """
        Set arbitrary kwargs to pass to ray.init() when starting the cluster.

        Commonly used for runtime_env configuration with conda or pip
        dependencies.

        Example:
            >>> builder.ray_init_kwargs({
            ...     "runtime_env": {
            ...         "conda": {
            ...             "channels": ["conda-forge"],
            ...             "dependencies": [
            ...                 "python=3.10", "ffmpeg<8", "torchvision=0.22.1"
            ...             ]
            ...         },
            ...         "config": {"eager_install": True}
            ...     }
            ... })

        WARNING: This accepts arbitrary kwargs without validation allowing for injection
        of fault or potentially malicious configuration. Use with caution.
        """
        self._ray_init_kwargs = copy.deepcopy(kwargs)
        return self

    # Head group configuration
    def head_group(
        self,
        *,
        image: str | None = None,
        cpus: int | None = None,
        memory: str | None = None,
        gpus: int | None = None,
        service_account: str | None = None,
        node_selector: dict[str, str] | None = None,
        labels: dict[str, str] | None = None,
        tolerations: list[dict[str, str]] | None = None,
    ) -> "GenevaClusterBuilder":
        """Configure the head group with optional parameters."""
        if image is not None:
            self._head_image = image
        if cpus is not None:
            self._head_cpus = cpus
        if memory is not None:
            self._head_memory = memory
        if gpus is not None:
            self._head_gpus = gpus
        if service_account is not None:
            self._head_service_account = service_account
        if node_selector is not None:
            self._head_node_selector = node_selector
        if labels is not None:
            self._head_labels = labels
        if tolerations is not None:
            self._head_tolerations = tolerations
        return self

    def head_group_builder(self, builder: HeadGroupBuilder) -> "GenevaClusterBuilder":
        """Configure the head group using a HeadGroupBuilder."""
        head_config = builder.build()
        self._head_image = head_config.image
        self._head_cpus = head_config.num_cpus
        self._head_memory = head_config.memory
        self._head_gpus = head_config.num_gpus
        self._head_service_account = head_config.service_account
        self._head_node_selector = head_config.node_selector
        self._head_labels = head_config.labels
        self._head_tolerations = head_config.tolerations
        return self

    def add_cpu_worker_group(
        self,
        *,
        image: str | None = None,
        cpus: int = 4,
        memory: str = "8Gi",
        service_account: str | None = None,
        node_selector: dict[str, str] | None = None,
        labels: dict[str, str] | None = None,
        tolerations: list[dict[str, str]] | None = None,
    ) -> "GenevaClusterBuilder":
        """Add a CPU worker group."""
        worker = WorkerGroupConfig(
            image=image or self._head_image,
            num_cpus=cpus,
            memory=memory,
            num_gpus=0,
            service_account=service_account or "geneva-service-account",
            node_selector=node_selector or {GENEVA_RAY_CPU_NODE: "true"},
            labels=labels or {},
            tolerations=tolerations or [],
        )
        self._worker_groups.append(worker)
        return self

    def add_gpu_worker_group(
        self,
        *,
        image: str | None = None,
        cpus: int = 4,
        memory: str = "8Gi",
        gpus: int = 1,
        service_account: str | None = None,
        node_selector: dict[str, str] | None = None,
        labels: dict[str, str] | None = None,
        tolerations: list[dict[str, str]] | None = None,
    ) -> "GenevaClusterBuilder":
        """Add a GPU worker group."""
        worker = WorkerGroupConfig(
            image=image or self._head_image,
            num_cpus=cpus,
            memory=memory,
            num_gpus=gpus,
            service_account=service_account or "geneva-service-account",
            node_selector=node_selector or {GENEVA_RAY_GPU_NODE: "true"},
            labels=labels or {},
            tolerations=tolerations or [],
        )
        self._worker_groups.append(worker)
        return self

    def add_worker_group(self, builder: WorkerGroupBuilder) -> "GenevaClusterBuilder":
        """Add a worker group using a WorkerGroupBuilder."""
        worker = builder.build()
        self._worker_groups.append(worker)
        return self

    def build(self) -> GenevaCluster:
        """Build the GenevaCluster with the configured settings."""
        if self._name is None:
            raise ValueError("Cluster name is required. Use .name() to set it.")

        # Build head group config
        head_group = HeadGroupConfig(
            service_account=self._head_service_account,
            num_cpus=self._head_cpus,
            memory=self._head_memory,
            image=self._head_image,
            num_gpus=self._head_gpus,
            node_selector=self._head_node_selector,
            labels=self._head_labels,
            tolerations=self._head_tolerations,
        )

        # Use worker group configs directly (already WorkerGroupConfig objects)
        worker_groups = self._worker_groups.copy()

        # If no worker groups were explicitly added, add a default CPU worker
        if not worker_groups:
            worker_groups.append(
                WorkerGroupConfig(
                    service_account=self._head_service_account,
                    num_cpus=4,
                    memory="8Gi",
                    image=default_image(False),
                    num_gpus=0,
                    node_selector={GENEVA_RAY_CPU_NODE: "true"},
                    labels={},
                    tolerations=[],
                )
            )

        supported_cluster_types = [GenevaClusterType.KUBE_RAY]
        if self._cluster_type not in supported_cluster_types:
            raise ValueError(
                f"cluster_type must be one of "
                f"{[e.name for e in supported_cluster_types]}"
            )

        kuberay_config = KubeRayConfig(
            namespace=self._namespace,
            head_group=head_group,
            worker_groups=worker_groups,
            config_method=self._config_method,
            use_portforwarding=self._use_portforwarding,
            aws_region=self._aws_region,
            aws_role_name=self._aws_role_name,
            ray_init_kwargs=self._ray_init_kwargs,
        )

        # Build and return the cluster
        return GenevaCluster(
            cluster_type=self._cluster_type,
            name=self._name,
            kuberay=kuberay_config,
        )

    @classmethod
    def create(cls, name: str) -> "GenevaClusterBuilder":
        """Create a new builder with the given cluster name."""
        return cls().name(name)

    @classmethod
    def local_cpu_cluster(
        cls, name: str, namespace: str = "geneva"
    ) -> "GenevaClusterBuilder":
        """Create a builder for a local CPU-only cluster with good defaults."""
        return (
            cls()
            .name(name)
            .namespace(namespace)
            .config_method(K8sConfigMethod.LOCAL)
            .portforwarding(True)
            .head_group(cpus=2, memory="4Gi")
            .add_cpu_worker_group(cpus=4, memory="8Gi")
        )

    @classmethod
    def gpu_cluster(
        cls, name: str, namespace: str = "geneva"
    ) -> "GenevaClusterBuilder":
        """Create a builder for a GPU cluster with good defaults."""
        return (
            cls()
            .name(name)
            .namespace(namespace)
            .config_method(K8sConfigMethod.IN_CLUSTER)
            .portforwarding(False)
            .head_group(cpus=2, memory="4Gi")
            .add_gpu_worker_group(cpus=4, memory="8Gi", gpus=1)
        )

name

name(name: str) -> GenevaClusterBuilder

Set the cluster name.

Source code in geneva/cluster/builder.py
def name(self, name: str) -> "GenevaClusterBuilder":
    """Set the cluster name."""
    self._name = name
    return self

cluster_type

cluster_type(
    cluster_type: GenevaClusterType,
) -> GenevaClusterBuilder

Set the cluster type.

Source code in geneva/cluster/builder.py
def cluster_type(self, cluster_type: GenevaClusterType) -> "GenevaClusterBuilder":
    """Set the cluster type."""
    self._cluster_type = cluster_type
    return self

namespace

namespace(namespace: str) -> GenevaClusterBuilder

Set the Kubernetes namespace.

Source code in geneva/cluster/builder.py
def namespace(self, namespace: str) -> "GenevaClusterBuilder":
    """Set the Kubernetes namespace."""
    self._namespace = namespace
    return self

config_method

config_method(
    method: K8sConfigMethod,
) -> GenevaClusterBuilder

Set the Kubernetes config method.

Source code in geneva/cluster/builder.py
def config_method(self, method: K8sConfigMethod) -> "GenevaClusterBuilder":
    """Set the Kubernetes config method."""
    self._config_method = method
    return self

portforwarding

portforwarding(
    enabled: bool = True,
) -> GenevaClusterBuilder

Enable or disable port forwarding.

Source code in geneva/cluster/builder.py
def portforwarding(self, enabled: bool = True) -> "GenevaClusterBuilder":
    """Enable or disable port forwarding."""
    self._use_portforwarding = enabled
    return self

aws_config

aws_config(
    region: str | None = None, role_name: str | None = None
) -> GenevaClusterBuilder

Configure AWS settings.

Source code in geneva/cluster/builder.py
def aws_config(
    self, region: str | None = None, role_name: str | None = None
) -> "GenevaClusterBuilder":
    """Configure AWS settings."""
    self._aws_region = region
    self._aws_role_name = role_name
    return self

ray_init_kwargs

ray_init_kwargs(kwargs: dict) -> GenevaClusterBuilder

Set arbitrary kwargs to pass to ray.init() when starting the cluster.

Commonly used for runtime_env configuration with conda or pip dependencies.

Example: >>> builder.ray_init_kwargs({ ... "runtime_env": { ... "conda": { ... "channels": ["conda-forge"], ... "dependencies": [ ... "python=3.10", "ffmpeg<8", "torchvision=0.22.1" ... ] ... }, ... "config": {"eager_install": True} ... } ... })

WARNING: This accepts arbitrary kwargs without validation allowing for injection of fault or potentially malicious configuration. Use with caution.

Source code in geneva/cluster/builder.py
def ray_init_kwargs(self, kwargs: dict) -> "GenevaClusterBuilder":
    """
    Set arbitrary kwargs to pass to ray.init() when starting the cluster.

    Commonly used for runtime_env configuration with conda or pip
    dependencies.

    Example:
        >>> builder.ray_init_kwargs({
        ...     "runtime_env": {
        ...         "conda": {
        ...             "channels": ["conda-forge"],
        ...             "dependencies": [
        ...                 "python=3.10", "ffmpeg<8", "torchvision=0.22.1"
        ...             ]
        ...         },
        ...         "config": {"eager_install": True}
        ...     }
        ... })

    WARNING: This accepts arbitrary kwargs without validation allowing for injection
    of fault or potentially malicious configuration. Use with caution.
    """
    self._ray_init_kwargs = copy.deepcopy(kwargs)
    return self

head_group

head_group(
    *,
    image: str | None = None,
    cpus: int | None = None,
    memory: str | None = None,
    gpus: int | None = None,
    service_account: str | None = None,
    node_selector: dict[str, str] | None = None,
    labels: dict[str, str] | None = None,
    tolerations: list[dict[str, str]] | None = None,
) -> GenevaClusterBuilder

Configure the head group with optional parameters.

Source code in geneva/cluster/builder.py
def head_group(
    self,
    *,
    image: str | None = None,
    cpus: int | None = None,
    memory: str | None = None,
    gpus: int | None = None,
    service_account: str | None = None,
    node_selector: dict[str, str] | None = None,
    labels: dict[str, str] | None = None,
    tolerations: list[dict[str, str]] | None = None,
) -> "GenevaClusterBuilder":
    """Configure the head group with optional parameters."""
    if image is not None:
        self._head_image = image
    if cpus is not None:
        self._head_cpus = cpus
    if memory is not None:
        self._head_memory = memory
    if gpus is not None:
        self._head_gpus = gpus
    if service_account is not None:
        self._head_service_account = service_account
    if node_selector is not None:
        self._head_node_selector = node_selector
    if labels is not None:
        self._head_labels = labels
    if tolerations is not None:
        self._head_tolerations = tolerations
    return self

head_group_builder

head_group_builder(
    builder: HeadGroupBuilder,
) -> GenevaClusterBuilder

Configure the head group using a HeadGroupBuilder.

Source code in geneva/cluster/builder.py
def head_group_builder(self, builder: HeadGroupBuilder) -> "GenevaClusterBuilder":
    """Configure the head group using a HeadGroupBuilder."""
    head_config = builder.build()
    self._head_image = head_config.image
    self._head_cpus = head_config.num_cpus
    self._head_memory = head_config.memory
    self._head_gpus = head_config.num_gpus
    self._head_service_account = head_config.service_account
    self._head_node_selector = head_config.node_selector
    self._head_labels = head_config.labels
    self._head_tolerations = head_config.tolerations
    return self

add_cpu_worker_group

add_cpu_worker_group(
    *,
    image: str | None = None,
    cpus: int = 4,
    memory: str = "8Gi",
    service_account: str | None = None,
    node_selector: dict[str, str] | None = None,
    labels: dict[str, str] | None = None,
    tolerations: list[dict[str, str]] | None = None,
) -> GenevaClusterBuilder

Add a CPU worker group.

Source code in geneva/cluster/builder.py
def add_cpu_worker_group(
    self,
    *,
    image: str | None = None,
    cpus: int = 4,
    memory: str = "8Gi",
    service_account: str | None = None,
    node_selector: dict[str, str] | None = None,
    labels: dict[str, str] | None = None,
    tolerations: list[dict[str, str]] | None = None,
) -> "GenevaClusterBuilder":
    """Add a CPU worker group."""
    worker = WorkerGroupConfig(
        image=image or self._head_image,
        num_cpus=cpus,
        memory=memory,
        num_gpus=0,
        service_account=service_account or "geneva-service-account",
        node_selector=node_selector or {GENEVA_RAY_CPU_NODE: "true"},
        labels=labels or {},
        tolerations=tolerations or [],
    )
    self._worker_groups.append(worker)
    return self

add_gpu_worker_group

add_gpu_worker_group(
    *,
    image: str | None = None,
    cpus: int = 4,
    memory: str = "8Gi",
    gpus: int = 1,
    service_account: str | None = None,
    node_selector: dict[str, str] | None = None,
    labels: dict[str, str] | None = None,
    tolerations: list[dict[str, str]] | None = None,
) -> GenevaClusterBuilder

Add a GPU worker group.

Source code in geneva/cluster/builder.py
def add_gpu_worker_group(
    self,
    *,
    image: str | None = None,
    cpus: int = 4,
    memory: str = "8Gi",
    gpus: int = 1,
    service_account: str | None = None,
    node_selector: dict[str, str] | None = None,
    labels: dict[str, str] | None = None,
    tolerations: list[dict[str, str]] | None = None,
) -> "GenevaClusterBuilder":
    """Add a GPU worker group."""
    worker = WorkerGroupConfig(
        image=image or self._head_image,
        num_cpus=cpus,
        memory=memory,
        num_gpus=gpus,
        service_account=service_account or "geneva-service-account",
        node_selector=node_selector or {GENEVA_RAY_GPU_NODE: "true"},
        labels=labels or {},
        tolerations=tolerations or [],
    )
    self._worker_groups.append(worker)
    return self

add_worker_group

add_worker_group(
    builder: WorkerGroupBuilder,
) -> GenevaClusterBuilder

Add a worker group using a WorkerGroupBuilder.

Source code in geneva/cluster/builder.py
def add_worker_group(self, builder: WorkerGroupBuilder) -> "GenevaClusterBuilder":
    """Add a worker group using a WorkerGroupBuilder."""
    worker = builder.build()
    self._worker_groups.append(worker)
    return self

build

build() -> GenevaCluster

Build the GenevaCluster with the configured settings.

Source code in geneva/cluster/builder.py
def build(self) -> GenevaCluster:
    """Build the GenevaCluster with the configured settings."""
    if self._name is None:
        raise ValueError("Cluster name is required. Use .name() to set it.")

    # Build head group config
    head_group = HeadGroupConfig(
        service_account=self._head_service_account,
        num_cpus=self._head_cpus,
        memory=self._head_memory,
        image=self._head_image,
        num_gpus=self._head_gpus,
        node_selector=self._head_node_selector,
        labels=self._head_labels,
        tolerations=self._head_tolerations,
    )

    # Use worker group configs directly (already WorkerGroupConfig objects)
    worker_groups = self._worker_groups.copy()

    # If no worker groups were explicitly added, add a default CPU worker
    if not worker_groups:
        worker_groups.append(
            WorkerGroupConfig(
                service_account=self._head_service_account,
                num_cpus=4,
                memory="8Gi",
                image=default_image(False),
                num_gpus=0,
                node_selector={GENEVA_RAY_CPU_NODE: "true"},
                labels={},
                tolerations=[],
            )
        )

    supported_cluster_types = [GenevaClusterType.KUBE_RAY]
    if self._cluster_type not in supported_cluster_types:
        raise ValueError(
            f"cluster_type must be one of "
            f"{[e.name for e in supported_cluster_types]}"
        )

    kuberay_config = KubeRayConfig(
        namespace=self._namespace,
        head_group=head_group,
        worker_groups=worker_groups,
        config_method=self._config_method,
        use_portforwarding=self._use_portforwarding,
        aws_region=self._aws_region,
        aws_role_name=self._aws_role_name,
        ray_init_kwargs=self._ray_init_kwargs,
    )

    # Build and return the cluster
    return GenevaCluster(
        cluster_type=self._cluster_type,
        name=self._name,
        kuberay=kuberay_config,
    )

create

create(name: str) -> GenevaClusterBuilder

Create a new builder with the given cluster name.

Source code in geneva/cluster/builder.py
@classmethod
def create(cls, name: str) -> "GenevaClusterBuilder":
    """Create a new builder with the given cluster name."""
    return cls().name(name)

local_cpu_cluster

local_cpu_cluster(
    name: str, namespace: str = "geneva"
) -> GenevaClusterBuilder

Create a builder for a local CPU-only cluster with good defaults.

Source code in geneva/cluster/builder.py
@classmethod
def local_cpu_cluster(
    cls, name: str, namespace: str = "geneva"
) -> "GenevaClusterBuilder":
    """Create a builder for a local CPU-only cluster with good defaults."""
    return (
        cls()
        .name(name)
        .namespace(namespace)
        .config_method(K8sConfigMethod.LOCAL)
        .portforwarding(True)
        .head_group(cpus=2, memory="4Gi")
        .add_cpu_worker_group(cpus=4, memory="8Gi")
    )

gpu_cluster

gpu_cluster(
    name: str, namespace: str = "geneva"
) -> GenevaClusterBuilder

Create a builder for a GPU cluster with good defaults.

Source code in geneva/cluster/builder.py
@classmethod
def gpu_cluster(
    cls, name: str, namespace: str = "geneva"
) -> "GenevaClusterBuilder":
    """Create a builder for a GPU cluster with good defaults."""
    return (
        cls()
        .name(name)
        .namespace(namespace)
        .config_method(K8sConfigMethod.IN_CLUSTER)
        .portforwarding(False)
        .head_group(cpus=2, memory="4Gi")
        .add_gpu_worker_group(cpus=4, memory="8Gi", gpus=1)
    )

geneva.cluster.builder.HeadGroupBuilder

Builder for HeadGroupConfig. This can be used to build the configuration for a Ray head group with reasonable defaults. See GenevaClusterBuilder for examples

Source code in geneva/cluster/builder.py
class HeadGroupBuilder:
    """Builder for HeadGroupConfig. This can be used to build the configuration for
    a Ray head group with reasonable defaults. See GenevaClusterBuilder for examples"""

    def __init__(self) -> None:
        self._image: str = default_image()
        self._num_cpus: int = 2
        self._memory: str = "4Gi"
        self._num_gpus: int = 0
        self._service_account: str = "geneva-service-account"
        self._node_selector: dict[str, str] = {GENEVA_RAY_HEAD_NODE: "true"}
        self._labels: dict[str, str] = {}
        self._tolerations: list[dict[str, str]] = []

    def image(self, image: str) -> "HeadGroupBuilder":
        """Set the container image."""
        self._image = image
        return self

    def cpus(self, cpus: int) -> "HeadGroupBuilder":
        """Set the number of CPUs."""
        self._num_cpus = cpus
        return self

    def memory(self, memory: str) -> "HeadGroupBuilder":
        """Set the memory allocation (e.g., '4Gi', '8Gi')."""
        self._memory = memory
        return self

    def gpus(self, gpus: int) -> "HeadGroupBuilder":
        """Set the number of GPUs."""
        self._num_gpus = gpus
        return self

    def service_account(self, service_account: str) -> "HeadGroupBuilder":
        """Set the Kubernetes service account."""
        self._service_account = service_account
        return self

    def node_selector(self, node_selector: dict[str, str]) -> "HeadGroupBuilder":
        """Set the node selector for pod placement."""
        self._node_selector = node_selector.copy()
        return self

    def labels(self, labels: dict[str, str]) -> "HeadGroupBuilder":
        """Set the pod labels."""
        self._labels = labels.copy()
        return self

    def tolerations(self, tolerations: list[dict[str, str]]) -> "HeadGroupBuilder":
        """Set the pod tolerations."""
        self._tolerations = tolerations.copy()
        return self

    def add_label(self, key: str, value: str) -> "HeadGroupBuilder":
        """Add a single label."""
        self._labels[key] = value
        return self

    def add_toleration(
        self, key: str, operator: str = "Equal", value: str = "", effect: str = ""
    ) -> "HeadGroupBuilder":
        """Add a single toleration."""
        toleration = {"key": key, "operator": operator}
        if value:
            toleration["value"] = value
        if effect:
            toleration["effect"] = effect
        self._tolerations.append(toleration)
        return self

    def build(self) -> HeadGroupConfig:
        """Build the HeadGroupConfig."""
        return HeadGroupConfig(
            service_account=self._service_account,
            num_cpus=self._num_cpus,
            memory=self._memory,
            image=self._image,
            num_gpus=self._num_gpus,
            node_selector=self._node_selector,
            labels=self._labels,
            tolerations=self._tolerations,
        )

    @classmethod
    def create(cls) -> "HeadGroupBuilder":
        """Create a new head group builder with defaults."""
        return cls()

    @classmethod
    def cpu_head(cls, cpus: int = 2, memory: str = "4Gi") -> "HeadGroupBuilder":
        """Create a CPU-only head group."""
        return cls().cpus(cpus).memory(memory).gpus(0)

image

image(image: str) -> HeadGroupBuilder

Set the container image.

Source code in geneva/cluster/builder.py
def image(self, image: str) -> "HeadGroupBuilder":
    """Set the container image."""
    self._image = image
    return self

cpus

cpus(cpus: int) -> HeadGroupBuilder

Set the number of CPUs.

Source code in geneva/cluster/builder.py
def cpus(self, cpus: int) -> "HeadGroupBuilder":
    """Set the number of CPUs."""
    self._num_cpus = cpus
    return self

memory

memory(memory: str) -> HeadGroupBuilder

Set the memory allocation (e.g., '4Gi', '8Gi').

Source code in geneva/cluster/builder.py
def memory(self, memory: str) -> "HeadGroupBuilder":
    """Set the memory allocation (e.g., '4Gi', '8Gi')."""
    self._memory = memory
    return self

gpus

gpus(gpus: int) -> HeadGroupBuilder

Set the number of GPUs.

Source code in geneva/cluster/builder.py
def gpus(self, gpus: int) -> "HeadGroupBuilder":
    """Set the number of GPUs."""
    self._num_gpus = gpus
    return self

service_account

service_account(service_account: str) -> HeadGroupBuilder

Set the Kubernetes service account.

Source code in geneva/cluster/builder.py
def service_account(self, service_account: str) -> "HeadGroupBuilder":
    """Set the Kubernetes service account."""
    self._service_account = service_account
    return self

node_selector

node_selector(
    node_selector: dict[str, str],
) -> HeadGroupBuilder

Set the node selector for pod placement.

Source code in geneva/cluster/builder.py
def node_selector(self, node_selector: dict[str, str]) -> "HeadGroupBuilder":
    """Set the node selector for pod placement."""
    self._node_selector = node_selector.copy()
    return self

labels

labels(labels: dict[str, str]) -> HeadGroupBuilder

Set the pod labels.

Source code in geneva/cluster/builder.py
def labels(self, labels: dict[str, str]) -> "HeadGroupBuilder":
    """Set the pod labels."""
    self._labels = labels.copy()
    return self

tolerations

tolerations(
    tolerations: list[dict[str, str]],
) -> HeadGroupBuilder

Set the pod tolerations.

Source code in geneva/cluster/builder.py
def tolerations(self, tolerations: list[dict[str, str]]) -> "HeadGroupBuilder":
    """Set the pod tolerations."""
    self._tolerations = tolerations.copy()
    return self

add_label

add_label(key: str, value: str) -> HeadGroupBuilder

Add a single label.

Source code in geneva/cluster/builder.py
def add_label(self, key: str, value: str) -> "HeadGroupBuilder":
    """Add a single label."""
    self._labels[key] = value
    return self

add_toleration

add_toleration(
    key: str,
    operator: str = "Equal",
    value: str = "",
    effect: str = "",
) -> HeadGroupBuilder

Add a single toleration.

Source code in geneva/cluster/builder.py
def add_toleration(
    self, key: str, operator: str = "Equal", value: str = "", effect: str = ""
) -> "HeadGroupBuilder":
    """Add a single toleration."""
    toleration = {"key": key, "operator": operator}
    if value:
        toleration["value"] = value
    if effect:
        toleration["effect"] = effect
    self._tolerations.append(toleration)
    return self

build

build() -> HeadGroupConfig

Build the HeadGroupConfig.

Source code in geneva/cluster/builder.py
def build(self) -> HeadGroupConfig:
    """Build the HeadGroupConfig."""
    return HeadGroupConfig(
        service_account=self._service_account,
        num_cpus=self._num_cpus,
        memory=self._memory,
        image=self._image,
        num_gpus=self._num_gpus,
        node_selector=self._node_selector,
        labels=self._labels,
        tolerations=self._tolerations,
    )

create

create() -> HeadGroupBuilder

Create a new head group builder with defaults.

Source code in geneva/cluster/builder.py
@classmethod
def create(cls) -> "HeadGroupBuilder":
    """Create a new head group builder with defaults."""
    return cls()

cpu_head

cpu_head(
    cpus: int = 2, memory: str = "4Gi"
) -> HeadGroupBuilder

Create a CPU-only head group.

Source code in geneva/cluster/builder.py
@classmethod
def cpu_head(cls, cpus: int = 2, memory: str = "4Gi") -> "HeadGroupBuilder":
    """Create a CPU-only head group."""
    return cls().cpus(cpus).memory(memory).gpus(0)

geneva.cluster.builder.WorkerGroupBuilder

Builder for WorkerGroupConfig. This can be used to build the configuration for a Ray worker group with reasonable defaults. See GenevaClusterBuilder for examples

Source code in geneva/cluster/builder.py
class WorkerGroupBuilder:
    """Builder for WorkerGroupConfig. This can be used to build the configuration for
    a Ray worker group with reasonable defaults. See GenevaClusterBuilder for
    examples"""

    def __init__(self) -> None:
        self._image: str = default_image()
        self._num_cpus: int = 4
        self._memory: str = "8Gi"
        self._num_gpus: int = 0
        self._service_account: str = "geneva-service-account"
        self._node_selector: dict[str, str] = {GENEVA_RAY_CPU_NODE: "true"}
        self._labels: dict[str, str] = {}
        self._tolerations: list[dict[str, str]] = []

    def image(self, image: str) -> "WorkerGroupBuilder":
        """Set the container image."""
        self._image = image
        return self

    def cpus(self, cpus: int) -> "WorkerGroupBuilder":
        """Set the number of CPUs."""
        self._num_cpus = cpus
        return self

    def memory(self, memory: str) -> "WorkerGroupBuilder":
        """Set the memory allocation (e.g., '8Gi', '16Gi')."""
        self._memory = memory
        return self

    def gpus(self, gpus: int) -> "WorkerGroupBuilder":
        """Set the number of GPUs."""
        self._num_gpus = gpus
        return self

    def service_account(self, service_account: str) -> "WorkerGroupBuilder":
        """Set the Kubernetes service account."""
        self._service_account = service_account
        return self

    def node_selector(self, node_selector: dict[str, str]) -> "WorkerGroupBuilder":
        """Set the node selector for pod placement."""
        self._node_selector = node_selector.copy()
        return self

    def labels(self, labels: dict[str, str]) -> "WorkerGroupBuilder":
        """Set the pod labels."""
        self._labels = labels.copy()
        return self

    def tolerations(self, tolerations: list[dict[str, str]]) -> "WorkerGroupBuilder":
        """Set the pod tolerations."""
        self._tolerations = tolerations.copy()
        return self

    def add_label(self, key: str, value: str) -> "WorkerGroupBuilder":
        """Add a single label."""
        self._labels[key] = value
        return self

    def add_toleration(
        self, key: str, operator: str = "Equal", value: str = "", effect: str = ""
    ) -> "WorkerGroupBuilder":
        """Add a single toleration."""
        toleration = {"key": key, "operator": operator}
        if value:
            toleration["value"] = value
        if effect:
            toleration["effect"] = effect
        self._tolerations.append(toleration)
        return self

    def build(self) -> WorkerGroupConfig:
        """Build the WorkerGroupConfig."""
        # modify default image to enable GPU if GPUs are requested
        if self._image == default_image() and self._num_gpus > 0:
            self._image = default_image(gpu=True)

        return WorkerGroupConfig(
            service_account=self._service_account,
            num_cpus=self._num_cpus,
            memory=self._memory,
            image=self._image,
            num_gpus=self._num_gpus,
            node_selector=self._node_selector,
            labels=self._labels,
            tolerations=self._tolerations,
        )

    @classmethod
    def create(cls) -> "WorkerGroupBuilder":
        """Create a new worker group builder with defaults."""
        return cls()

    @classmethod
    def cpu_worker(cls, cpus: int = 4, memory: str = "8Gi") -> "WorkerGroupBuilder":
        """Create a CPU worker group."""
        return cls().cpus(cpus).memory(memory).gpus(0)

    @classmethod
    def gpu_worker(
        cls, cpus: int = 8, memory: str = "16Gi", gpus: int = 1
    ) -> "WorkerGroupBuilder":
        """Create a GPU worker group."""
        return (
            cls()
            .cpus(cpus)
            .node_selector({GENEVA_RAY_GPU_NODE: "true"})
            .memory(memory)
            .gpus(gpus)
        )

image

image(image: str) -> WorkerGroupBuilder

Set the container image.

Source code in geneva/cluster/builder.py
def image(self, image: str) -> "WorkerGroupBuilder":
    """Set the container image."""
    self._image = image
    return self

cpus

cpus(cpus: int) -> WorkerGroupBuilder

Set the number of CPUs.

Source code in geneva/cluster/builder.py
def cpus(self, cpus: int) -> "WorkerGroupBuilder":
    """Set the number of CPUs."""
    self._num_cpus = cpus
    return self

memory

memory(memory: str) -> WorkerGroupBuilder

Set the memory allocation (e.g., '8Gi', '16Gi').

Source code in geneva/cluster/builder.py
def memory(self, memory: str) -> "WorkerGroupBuilder":
    """Set the memory allocation (e.g., '8Gi', '16Gi')."""
    self._memory = memory
    return self

gpus

gpus(gpus: int) -> WorkerGroupBuilder

Set the number of GPUs.

Source code in geneva/cluster/builder.py
def gpus(self, gpus: int) -> "WorkerGroupBuilder":
    """Set the number of GPUs."""
    self._num_gpus = gpus
    return self

service_account

service_account(service_account: str) -> WorkerGroupBuilder

Set the Kubernetes service account.

Source code in geneva/cluster/builder.py
def service_account(self, service_account: str) -> "WorkerGroupBuilder":
    """Set the Kubernetes service account."""
    self._service_account = service_account
    return self

node_selector

node_selector(
    node_selector: dict[str, str],
) -> WorkerGroupBuilder

Set the node selector for pod placement.

Source code in geneva/cluster/builder.py
def node_selector(self, node_selector: dict[str, str]) -> "WorkerGroupBuilder":
    """Set the node selector for pod placement."""
    self._node_selector = node_selector.copy()
    return self

labels

labels(labels: dict[str, str]) -> WorkerGroupBuilder

Set the pod labels.

Source code in geneva/cluster/builder.py
def labels(self, labels: dict[str, str]) -> "WorkerGroupBuilder":
    """Set the pod labels."""
    self._labels = labels.copy()
    return self

tolerations

tolerations(
    tolerations: list[dict[str, str]],
) -> WorkerGroupBuilder

Set the pod tolerations.

Source code in geneva/cluster/builder.py
def tolerations(self, tolerations: list[dict[str, str]]) -> "WorkerGroupBuilder":
    """Set the pod tolerations."""
    self._tolerations = tolerations.copy()
    return self

add_label

add_label(key: str, value: str) -> WorkerGroupBuilder

Add a single label.

Source code in geneva/cluster/builder.py
def add_label(self, key: str, value: str) -> "WorkerGroupBuilder":
    """Add a single label."""
    self._labels[key] = value
    return self

add_toleration

add_toleration(
    key: str,
    operator: str = "Equal",
    value: str = "",
    effect: str = "",
) -> WorkerGroupBuilder

Add a single toleration.

Source code in geneva/cluster/builder.py
def add_toleration(
    self, key: str, operator: str = "Equal", value: str = "", effect: str = ""
) -> "WorkerGroupBuilder":
    """Add a single toleration."""
    toleration = {"key": key, "operator": operator}
    if value:
        toleration["value"] = value
    if effect:
        toleration["effect"] = effect
    self._tolerations.append(toleration)
    return self

build

build() -> WorkerGroupConfig

Build the WorkerGroupConfig.

Source code in geneva/cluster/builder.py
def build(self) -> WorkerGroupConfig:
    """Build the WorkerGroupConfig."""
    # modify default image to enable GPU if GPUs are requested
    if self._image == default_image() and self._num_gpus > 0:
        self._image = default_image(gpu=True)

    return WorkerGroupConfig(
        service_account=self._service_account,
        num_cpus=self._num_cpus,
        memory=self._memory,
        image=self._image,
        num_gpus=self._num_gpus,
        node_selector=self._node_selector,
        labels=self._labels,
        tolerations=self._tolerations,
    )

create

create() -> WorkerGroupBuilder

Create a new worker group builder with defaults.

Source code in geneva/cluster/builder.py
@classmethod
def create(cls) -> "WorkerGroupBuilder":
    """Create a new worker group builder with defaults."""
    return cls()

cpu_worker

cpu_worker(
    cpus: int = 4, memory: str = "8Gi"
) -> WorkerGroupBuilder

Create a CPU worker group.

Source code in geneva/cluster/builder.py
@classmethod
def cpu_worker(cls, cpus: int = 4, memory: str = "8Gi") -> "WorkerGroupBuilder":
    """Create a CPU worker group."""
    return cls().cpus(cpus).memory(memory).gpus(0)

gpu_worker

gpu_worker(
    cpus: int = 8, memory: str = "16Gi", gpus: int = 1
) -> WorkerGroupBuilder

Create a GPU worker group.

Source code in geneva/cluster/builder.py
@classmethod
def gpu_worker(
    cls, cpus: int = 8, memory: str = "16Gi", gpus: int = 1
) -> "WorkerGroupBuilder":
    """Create a GPU worker group."""
    return (
        cls()
        .cpus(cpus)
        .node_selector({GENEVA_RAY_GPU_NODE: "true"})
        .memory(memory)
        .gpus(gpus)
    )

geneva.cluster.mgr.HeadGroupConfig

Bases: RayGroupConfig

Configuration for Ray Head pod

Source code in geneva/cluster/mgr.py
@attrs.define
class HeadGroupConfig(RayGroupConfig):
    """Configuration for Ray Head pod"""

    # Escape hatch for arbitrary Kubernetes/KubeRay spec customizations.
    # Allows setting any K8s fields not exposed as direct parameters.
    # Values are deep-merged into the generated K8s spec in definition().
    # Examples: priority_class, securityContext, initContainers, volumes, etc.
    k8s_spec_override: dict[str, Any] | None = attrs.field(
        default=None, metadata={"pa_type": "string", "nullable": True}
    )

k8s_spec_override

k8s_spec_override: dict[str, Any] | None = field(
    default=None,
    metadata={"pa_type": "string", "nullable": True},
)

geneva.cluster.mgr.WorkerGroupConfig

Bases: RayGroupConfig

Configuration for Ray Worker pods

Source code in geneva/cluster/mgr.py
@attrs.define
class WorkerGroupConfig(RayGroupConfig):
    """Configuration for Ray Worker pods"""

    # Escape hatch for arbitrary Kubernetes/KubeRay spec customizations.
    # Allows setting any K8s fields not exposed as direct parameters.
    # Values are deep-merged into the generated K8s spec in definition().
    # Examples: replicas, min_replicas, max_replicas, securityContext, etc.
    k8s_spec_override: dict[str, Any] | None = attrs.field(
        default=None,
    )

k8s_spec_override

k8s_spec_override: dict[str, Any] | None = field(
    default=None
)

geneva.cluster.mgr.KubeRayConfig

Source code in geneva/cluster/mgr.py
@attrs.define
class KubeRayConfig:
    namespace: str = attrs.field()
    head_group: HeadGroupConfig = attrs.field()
    worker_groups: list[WorkerGroupConfig] = attrs.field()
    config_method: K8sConfigMethod = attrs.field(default=K8sConfigMethod.LOCAL)
    use_portforwarding: bool = attrs.field(default=True)
    aws_region: Optional[str] = attrs.field(default=None)
    aws_role_name: Optional[str] = attrs.field(default=None)

    # Arbitrary kwargs to pass to ray.init()
    ray_init_kwargs: Optional[dict[str, Any]] = attrs.field(
        factory=dict,
    )

namespace

namespace: str = field()

head_group

head_group: HeadGroupConfig = field()

worker_groups

worker_groups: list[WorkerGroupConfig] = field()

config_method

config_method: K8sConfigMethod = field(default=LOCAL)

use_portforwarding

use_portforwarding: bool = field(default=True)

aws_region

aws_region: Optional[str] = field(default=None)

aws_role_name

aws_role_name: Optional[str] = field(default=None)

ray_init_kwargs

ray_init_kwargs: Optional[dict[str, Any]] = field(
    factory=dict
)

geneva.cluster.mgr.ClusterConfigManager

Bases: BaseManager

Source code in geneva/cluster/mgr.py
class ClusterConfigManager(BaseManager):
    def get_table_name(self) -> str:
        return CLUSTER_TABLE_NAME

    def get_model(self) -> Any:
        # Create a complete dummy model with all nested fields populated
        # so schema inference can correctly determine nullability of nested fields
        head_group = HeadGroupConfig(
            service_account="dummy",
            num_cpus=1,
            memory="1Gi",
            image="dummy",
            node_selector={},
            labels={},
            tolerations=[],
            k8s_spec_override=None,  # Include to ensure schema has nullable field
        )
        kuberay_config = KubeRayConfig(
            namespace="dummy",
            head_group=head_group,
            worker_groups=[],
            ray_init_kwargs={},  # Include to ensure schema has nullable field
        )
        return GenevaCluster(
            cluster_type=GenevaClusterType.KUBE_RAY,
            name="dummy",
            kuberay=kuberay_config,
        )

    @retry_lance
    def upsert(self, cluster: GenevaCluster) -> None:
        val = cluster.as_dict()
        val["kuberay"] = json.dumps(val["kuberay"])

        # note: merge_insert with fails with schema errors - use delete+add for now
        self.delete(cluster.name)
        self.get_table().add([val])

    @retry_lance
    def list(self, limit: int = 1000) -> list[GenevaCluster]:
        res = self.get_table(True).search().limit(limit).to_arrow().to_pylist()
        return [_make_cluster(cluster) for cluster in res]

    @retry_lance
    def load(self, name: str) -> GenevaCluster | None:
        res = (
            self.get_table(True)
            .search()
            .where(f"name = '{escape_sql_string(name)}'")
            .limit(1)
            .to_arrow()
            .to_pylist()
        )
        if not res:
            return None
        return _make_cluster(res[0])

    @retry_lance
    def delete(self, name: str) -> None:
        self.get_table().delete(f"name = '{escape_sql_string(name)}'")

get_table_name

get_table_name() -> str
Source code in geneva/cluster/mgr.py
def get_table_name(self) -> str:
    return CLUSTER_TABLE_NAME

get_model

get_model() -> Any
Source code in geneva/cluster/mgr.py
def get_model(self) -> Any:
    # Create a complete dummy model with all nested fields populated
    # so schema inference can correctly determine nullability of nested fields
    head_group = HeadGroupConfig(
        service_account="dummy",
        num_cpus=1,
        memory="1Gi",
        image="dummy",
        node_selector={},
        labels={},
        tolerations=[],
        k8s_spec_override=None,  # Include to ensure schema has nullable field
    )
    kuberay_config = KubeRayConfig(
        namespace="dummy",
        head_group=head_group,
        worker_groups=[],
        ray_init_kwargs={},  # Include to ensure schema has nullable field
    )
    return GenevaCluster(
        cluster_type=GenevaClusterType.KUBE_RAY,
        name="dummy",
        kuberay=kuberay_config,
    )

upsert

upsert(cluster: GenevaCluster) -> None
Source code in geneva/cluster/mgr.py
@retry_lance
def upsert(self, cluster: GenevaCluster) -> None:
    val = cluster.as_dict()
    val["kuberay"] = json.dumps(val["kuberay"])

    # note: merge_insert with fails with schema errors - use delete+add for now
    self.delete(cluster.name)
    self.get_table().add([val])

list

list(limit: int = 1000) -> list[GenevaCluster]
Source code in geneva/cluster/mgr.py
@retry_lance
def list(self, limit: int = 1000) -> list[GenevaCluster]:
    res = self.get_table(True).search().limit(limit).to_arrow().to_pylist()
    return [_make_cluster(cluster) for cluster in res]

load

load(name: str) -> GenevaCluster | None
Source code in geneva/cluster/mgr.py
@retry_lance
def load(self, name: str) -> GenevaCluster | None:
    res = (
        self.get_table(True)
        .search()
        .where(f"name = '{escape_sql_string(name)}'")
        .limit(1)
        .to_arrow()
        .to_pylist()
    )
    if not res:
        return None
    return _make_cluster(res[0])

delete

delete(name: str) -> None
Source code in geneva/cluster/mgr.py
@retry_lance
def delete(self, name: str) -> None:
    self.get_table().delete(f"name = '{escape_sql_string(name)}'")

geneva.cluster.GenevaClusterType

Bases: Enum

Type of Geneva Cluster

Source code in geneva/cluster/__init__.py
class GenevaClusterType(Enum):
    """Type of Geneva Cluster"""

    KUBE_RAY = "KUBE_RAY"
    LOCAL_RAY = "LOCAL_RAY"

KUBE_RAY

KUBE_RAY = 'KUBE_RAY'

LOCAL_RAY

LOCAL_RAY = 'LOCAL_RAY'

geneva.cluster.K8sConfigMethod

Bases: Enum

Method for retrieving kubernetes config. LOCAL: Load the kube config from the local environment. EKS_AUTH: Load the kube config from AWS EKS service (requires AWS credentials) IN_CLUSTER: Load the kube config when running inside a pod in the cluster

Source code in geneva/cluster/__init__.py
class K8sConfigMethod(Enum):
    """Method for retrieving kubernetes config.
    LOCAL: Load the kube config from the local environment.
    EKS_AUTH: Load the kube config from AWS EKS service (requires AWS credentials)
    IN_CLUSTER: Load the kube config when running inside a pod in the cluster
    """

    EKS_AUTH = "EKS_AUTH"
    IN_CLUSTER = "IN_CLUSTER"
    LOCAL = "LOCAL"

EKS_AUTH

EKS_AUTH = 'EKS_AUTH'

IN_CLUSTER

IN_CLUSTER = 'IN_CLUSTER'

LOCAL

LOCAL = 'LOCAL'