Skip to content

Cluster

geneva.cluster.mgr.GenevaCluster

A Geneva Cluster represents the backend compute infrastructure for the execution environment.

cluster_type

cluster_type: GenevaClusterType = field(
    metadata={"pa_type": string()}
)

name

name: str = field()

kuberay

kuberay: Optional[KubeRayConfig] = field(
    default=None, metadata={"pa_type": "string"}
)

created_at

created_at: datetime = field(
    factory=lambda: now(utc),
    metadata={"pa_type": timestamp("us", tz="UTC")},
)

created_by

created_by: str = field(factory=current_user)

ray_address

ray_address: Optional[str] = field(default=None)

ray_init_kwargs

ray_init_kwargs: Optional[dict[str, Any]] = field(
    default=None, metadata={"pa_type": string()}
)

validate

validate() -> None

to_ray_cluster

to_ray_cluster() -> RayCluster

Convert the persisted cluster definition into internal RayCluster model

as_dict

as_dict() -> dict

create_kuberay

create_kuberay(name: str) -> KubeRayClusterBuilder

Create a KubeRay cluster builder.

Example:

cluster = GenevaCluster.create_kuberay("my-cluster").namespace("ml").build()

create_local

create_local(name: str) -> LocalRayClusterBuilder

Create a Local Ray cluster builder.

Example:

cluster = GenevaCluster.create_local("local-dev").build()

create_external

create_external(
    name: str, ray_address: str
) -> ExternalRayClusterBuilder

Create an External Ray cluster builder.

Example:

cluster = GenevaCluster.create_external("remote", "ray://host:10001").build()

geneva.cluster.builder.KubeRayClusterBuilder

Type-safe builder for KubeRay clusters deployed on Kubernetes.

Use this builder for clusters that will be deployed via KubeRay. For local development, use LocalRayClusterBuilder instead.

Example:

cluster = (
    KubeRayClusterBuilder.create("my-cluster")
    .namespace("ml-team")
    .add_worker_group(
        KubeRayClusterBuilder.gpu_worker(4)
        .memory("64Gi")
        .build()
    )
    .add_worker_group(
        KubeRayClusterBuilder.cpu_worker()
        .cpus(16)
        .memory("32Gi")
        .build()
    )
    .build()
)

name

name(name: str) -> KubeRayClusterBuilder

Set the cluster name.

namespace

namespace(namespace: str) -> KubeRayClusterBuilder

Set the Kubernetes namespace.

config_method

config_method(
    method: K8sConfigMethod,
) -> KubeRayClusterBuilder

Set the Kubernetes config method.

portforwarding

portforwarding(
    enabled: bool = True,
) -> KubeRayClusterBuilder

Enable or disable port forwarding.

aws_config

aws_config(
    region: str | None = None, role_name: str | None = None
) -> KubeRayClusterBuilder

Configure AWS settings.

ray_init_kwargs

ray_init_kwargs(kwargs: dict) -> KubeRayClusterBuilder

Set arbitrary kwargs to pass to ray.init() when starting the cluster, such as env vars. Example:

.ray_init_kwargs({
    "runtime_env": {
        "env_vars": {
            "MY_VAR": "value",
            "AWS_ACCESS_KEY_ID": os.environ["AWS_ACCESS_KEY_ID"]
        },
    },
})

head_group

head_group(
    *,
    image: str | None = None,
    cpus: int | None = None,
    memory: str | None = None,
    gpus: int | None = None,
    service_account: str | None = None,
    node_selector: dict[str, str] | None = None,
    labels: dict[str, str] | None = None,
    tolerations: list[dict[str, str]] | None = None,
) -> KubeRayClusterBuilder

Configure the head group with optional parameters.

add_worker_group

add_worker_group(
    worker: WorkerGroupConfig,
) -> KubeRayClusterBuilder

Add a worker group configuration.

build

build() -> GenevaCluster

Build the GenevaCluster with the configured settings.

create

create(name: str) -> KubeRayClusterBuilder

Create a new builder with the given cluster name.

cpu_worker

cpu_worker() -> CpuWorkerBuilder

Create a CPU worker builder.

CPU workers do not have a gpus() method - use gpu_worker() for GPU workers.

Example:

cluster = (
    KubeRayClusterBuilder.create("test")
    .add_worker_group(KubeRayClusterBuilder.cpu_worker().cpus(8).build())
    .build()
)

gpu_worker

gpu_worker(gpus: int = 1) -> GpuWorkerBuilder

Create a GPU worker builder.

GPU workers have memory validation:

  • Minimum 4GiB memory required (raises ValueError)
  • Warning for memory > 100GB (may exceed node capacity)

Example:

cluster = (
    KubeRayClusterBuilder.create("test")
    .add_worker_group(KubeRayClusterBuilder.gpu_worker(4).memory("64Gi").build())
    .build()
)

geneva.cluster.builder.LocalRayClusterBuilder

Builder for local Ray clusters.

Resources are managed by the local Ray runtime, so this builder does NOT have memory(), cpus(), or worker configuration methods.

Example:

cluster = LocalRayClusterBuilder.create("local-dev").build()

name

name(name: str) -> LocalRayClusterBuilder

Set the cluster name.

build

build() -> GenevaCluster

Build the GenevaCluster for local Ray.

create

create(name: str) -> LocalRayClusterBuilder

Create a new builder with the given cluster name.

geneva.cluster.builder.ExternalRayClusterBuilder

Builder for connecting to an existing external Ray cluster.

This builder requires a ray_address to be set. Does NOT have memory(), cpus(), or worker configuration methods.

Example:

cluster = (
    ExternalRayClusterBuilder.create("remote")
    .ray_address("ray://10.0.0.1:10001")
    .build()
)

name

name(name: str) -> ExternalRayClusterBuilder

Set the cluster name.

ray_address

ray_address(addr: str) -> ExternalRayClusterBuilder

Set the Ray address (required). e.g., 'ray://host:port'

ray_init_kwargs

ray_init_kwargs(kwargs: dict) -> ExternalRayClusterBuilder

Set kwargs passed to ray.init() when connecting (e.g. env_vars). For example:

.ray_init_kwargs({
    "runtime_env": {
        "env_vars": {
            "MY_VAR": "value",
            "AWS_ACCESS_KEY_ID": os.environ["AWS_ACCESS_KEY_ID"]
        },
    },
})

build

build() -> GenevaCluster

Build the GenevaCluster for external Ray.

create

create(
    name: str, ray_address: str | None = None
) -> ExternalRayClusterBuilder

Create a new builder with the given cluster name and optional ray_address.

geneva.cluster.builder.CpuWorkerBuilder

Bases: _WorkerResourceMixin

Builder for CPU-only worker groups in KubeRay clusters.

This builder does NOT have a gpus() method - use GpuWorkerBuilder for GPU workers.

Example:

worker = (
    CpuWorkerBuilder()
    .cpus(8)
    .memory("16Gi")
    .replicas(2)
    .build()
)

name

name(name: str) -> CpuWorkerBuilder

Set the worker group name. Must be unique within the cluster.

build

build() -> WorkerGroupConfig

Build the WorkerGroupConfig.

image

image(image: str) -> Self

Set the container image.

cpus

cpus(cpus: int) -> Self

Set the number of CPUs.

memory

memory(memory: str) -> Self

Set the memory allocation (e.g., '8Gi', '16Gi').

service_account

service_account(service_account: str) -> Self

Set the Kubernetes service account.

node_selector

node_selector(node_selector: dict[str, str]) -> Self

Set the node selector for pod placement.

labels

labels(labels: dict[str, str]) -> Self

Set the pod labels.

tolerations

tolerations(tolerations: list[dict[str, str]]) -> Self

Set the pod tolerations.

replicas

replicas(replicas: int) -> Self

Set the number of replicas.

min_replicas

min_replicas(min_replicas: int) -> Self

Set the minimum number of replicas for autoscaling.

max_replicas

max_replicas(max_replicas: int) -> Self

Set the maximum number of replicas for autoscaling.

idle_timeout_seconds

idle_timeout_seconds(seconds: int) -> Self

Set the idle timeout in seconds for autoscaling down workers.

add_label

add_label(key: str, value: str) -> Self

Add a single label.

add_toleration

add_toleration(
    key: str,
    operator: str = "Equal",
    value: str = "",
    effect: str = "",
) -> Self

Add a single toleration.

geneva.cluster.builder.GpuWorkerBuilder

Bases: _WorkerResourceMixin

Builder for GPU worker groups in KubeRay clusters.

Includes validation:

  • Minimum 4GiB memory for GPU workers (raises ValueError)
  • Warning for memory > 100GB (may exceed node capacity)
  • Warning for high memory/CPU ratio (> 16 GiB/CPU)

Example:

worker = (
    GpuWorkerBuilder()
    .gpus(4)
    .cpus(8)
    .memory("64Gi")
    .build()
)

name

name(name: str) -> GpuWorkerBuilder

Set the worker group name. Must be unique within the cluster.

gpus

gpus(gpus: int) -> GpuWorkerBuilder

Set the number of GPUs (must be >= 1).

build

build() -> WorkerGroupConfig

Build the WorkerGroupConfig with memory validation.

image

image(image: str) -> Self

Set the container image.

cpus

cpus(cpus: int) -> Self

Set the number of CPUs.

memory

memory(memory: str) -> Self

Set the memory allocation (e.g., '8Gi', '16Gi').

service_account

service_account(service_account: str) -> Self

Set the Kubernetes service account.

node_selector

node_selector(node_selector: dict[str, str]) -> Self

Set the node selector for pod placement.

labels

labels(labels: dict[str, str]) -> Self

Set the pod labels.

tolerations

tolerations(tolerations: list[dict[str, str]]) -> Self

Set the pod tolerations.

replicas

replicas(replicas: int) -> Self

Set the number of replicas.

min_replicas

min_replicas(min_replicas: int) -> Self

Set the minimum number of replicas for autoscaling.

max_replicas

max_replicas(max_replicas: int) -> Self

Set the maximum number of replicas for autoscaling.

idle_timeout_seconds

idle_timeout_seconds(seconds: int) -> Self

Set the idle timeout in seconds for autoscaling down workers.

add_label

add_label(key: str, value: str) -> Self

Add a single label.

add_toleration

add_toleration(
    key: str,
    operator: str = "Equal",
    value: str = "",
    effect: str = "",
) -> Self

Add a single toleration.

geneva.cluster.mgr.HeadGroupConfig

Bases: RayGroupConfig

Configuration for Ray Head pod

k8s_spec_override

k8s_spec_override: dict[str, Any] | None = field(
    default=None,
    metadata={"pa_type": "string", "nullable": True},
)

geneva.cluster.mgr.WorkerGroupConfig

Bases: RayGroupConfig

Configuration for Ray Worker pods

name

name: str | None = field(default=None)

k8s_spec_override

k8s_spec_override: dict[str, Any] | None = field(
    default=None
)

replicas

replicas: int = field(default=1)

min_replicas

min_replicas: int = field(default=0)

max_replicas

max_replicas: int = field(
    default=DEFAULT_MAX_WORKER_REPLICAS
)

idle_timeout_seconds

idle_timeout_seconds: int = field(default=60)

geneva.cluster.mgr.KubeRayConfig

namespace

namespace: str = field()

head_group

head_group: HeadGroupConfig = field()

worker_groups

worker_groups: list[WorkerGroupConfig] = field()

config_method

config_method: K8sConfigMethod = field(default=LOCAL)

use_portforwarding

use_portforwarding: bool = field(default=True)

aws_region

aws_region: Optional[str] = field(default=None)

aws_role_name

aws_role_name: Optional[str] = field(default=None)

ray_init_kwargs

ray_init_kwargs: Optional[dict[str, Any]] = field(
    factory=dict
)

geneva.cluster.mgr.ClusterConfigManager

Bases: BaseManager

get_table_name

get_table_name() -> str

get_model

get_model() -> Any

upsert

upsert(cluster: GenevaCluster) -> None

list

list(limit: int = 1000) -> list[GenevaCluster]

load

load(name: str) -> GenevaCluster | None

delete

delete(name: str) -> None

geneva.cluster.GenevaClusterType

Bases: Enum

Type of Geneva Cluster

KUBE_RAY

KUBE_RAY = 'KUBE_RAY'

LOCAL_RAY

LOCAL_RAY = 'LOCAL_RAY'

EXTERNAL_RAY

EXTERNAL_RAY = 'EXTERNAL_RAY'

geneva.cluster.K8sConfigMethod

Bases: Enum

Method for retrieving kubernetes config:

  • LOCAL: Load the kube config from the local environment.
  • EKS_AUTH: Load the kube config from AWS EKS service (requires AWS credentials).
  • IN_CLUSTER: Load the kube config when running inside a pod in the cluster.

EKS_AUTH

EKS_AUTH = 'EKS_AUTH'

IN_CLUSTER

IN_CLUSTER = 'IN_CLUSTER'

LOCAL

LOCAL = 'LOCAL'