Skip to content

Manifest

geneva.manifest.mgr.GenevaManifest

A Geneva Manifest represents the files and dependencies used in the execution environment.

Source code in geneva/manifest/mgr.py
@attrs.define
class GenevaManifest:
    """A Geneva Manifest represents the files and dependencies used
    in the execution environment."""

    # metadata
    name: str = attrs.field()
    version: Optional[str] = attrs.field(default=None)

    # properties needed to init the cluster
    pip: list[str] = attrs.field(default=[])
    py_modules: list[str] = attrs.field(default=[])
    head_image: Optional[str] = attrs.field(default=None)
    worker_image: Optional[str] = attrs.field(default=None)

    # transient properties, only used during initial upload
    skip_site_packages: bool = attrs.field(default=False)
    delete_local_zips: bool = attrs.field(default=False)
    local_zip_output_dir: Optional[str] = attrs.field(default=None)

    # internal generated properties
    zips: list[list[str]] = attrs.field(default=[[]])
    checksum: Optional[str] = attrs.field(default=None)
    created_at: datetime = attrs.field(
        factory=lambda: datetime.now(timezone.utc),
        metadata={"pa_type": pa.timestamp("us", tz="UTC")},
    )
    created_by: str = attrs.field(factory=current_user)

    def __attrs_post_init__(self) -> None:
        self.checksum = self.compute_checksum()

    def compute_checksum(self) -> str:
        """Generate a stable checksum of the manifest, ignoring the checksum field.
        The zip file names include the checksum of the contents so this hash is
        comprehensive.
        """
        checksum_exclude_fields = {
            "name",  # excluding name since this may be autogenerated.
            "checksum",  # this is what remains unique
            "created_at",
            "created_by",
            "delete_local_zips",
            "local_zip_output_dir",
        }
        data = attrs.asdict(
            self,
            recurse=True,
            filter=lambda a, v: a.name not in checksum_exclude_fields,
        )
        payload = json.dumps(data, sort_keys=True, separators=(",", ":"), default=str)
        return hashlib.md5(payload.encode("utf-8")).hexdigest()

    def as_dict(self) -> dict:
        return attrs.asdict(
            self,
            value_serializer=lambda obj, a, v: v.value
            if isinstance(v, enum.Enum)
            else v,
        )

name

name: str = field()

version

version: Optional[str] = field(default=None)

pip

pip: list[str] = field(default=[])

py_modules

py_modules: list[str] = field(default=[])

head_image

head_image: Optional[str] = field(default=None)

worker_image

worker_image: Optional[str] = field(default=None)

skip_site_packages

skip_site_packages: bool = field(default=False)

delete_local_zips

delete_local_zips: bool = field(default=False)

local_zip_output_dir

local_zip_output_dir: Optional[str] = field(default=None)

zips

zips: list[list[str]] = field(default=[[]])

checksum

checksum: Optional[str] = field(default=None)

created_at

created_at: datetime = field(
    factory=lambda: now(utc),
    metadata={"pa_type": timestamp("us", tz="UTC")},
)

created_by

created_by: str = field(factory=current_user)

compute_checksum

compute_checksum() -> str

Generate a stable checksum of the manifest, ignoring the checksum field. The zip file names include the checksum of the contents so this hash is comprehensive.

Source code in geneva/manifest/mgr.py
def compute_checksum(self) -> str:
    """Generate a stable checksum of the manifest, ignoring the checksum field.
    The zip file names include the checksum of the contents so this hash is
    comprehensive.
    """
    checksum_exclude_fields = {
        "name",  # excluding name since this may be autogenerated.
        "checksum",  # this is what remains unique
        "created_at",
        "created_by",
        "delete_local_zips",
        "local_zip_output_dir",
    }
    data = attrs.asdict(
        self,
        recurse=True,
        filter=lambda a, v: a.name not in checksum_exclude_fields,
    )
    payload = json.dumps(data, sort_keys=True, separators=(",", ":"), default=str)
    return hashlib.md5(payload.encode("utf-8")).hexdigest()

as_dict

as_dict() -> dict
Source code in geneva/manifest/mgr.py
def as_dict(self) -> dict:
    return attrs.asdict(
        self,
        value_serializer=lambda obj, a, v: v.value
        if isinstance(v, enum.Enum)
        else v,
    )

geneva.manifest.builder.GenevaManifestBuilder

Fluent builder for GenevaManifest. name is required, all optional fields will use defaults. Manifests can be saved using db.define_manifest() and loaded using db.context()

Example usage: >>> import geneva >>> m = GenevaManifestBuilder >>> .create("my-manifest") >>> .pip(["numpy", "pandas"]) >>> .py_modules(["mymodule"]) >>> .head_image("my-custom-image:latest") >>> .skip_site_packages(True) >>> .build() >>> conn = geneva.connect("s3://my-bucket/my-db") >>> conn.define_manifest("my-manifest", m) >>> with conn.context(cluster="my-cluster", manifest="my-manifest"): >>> conn.open_table("my-table").backfill("my-column")

Source code in geneva/manifest/builder.py
class GenevaManifestBuilder:
    """Fluent builder for GenevaManifest. `name` is required, all optional
    fields will use defaults. Manifests can be saved using db.define_manifest() and
    loaded using db.context()

    Example usage:
        >>> import geneva
        >>> m = GenevaManifestBuilder
        >>>    .create("my-manifest")
        >>>    .pip(["numpy", "pandas"])
        >>>    .py_modules(["mymodule"])
        >>>    .head_image("my-custom-image:latest")
        >>>    .skip_site_packages(True)
        >>>    .build()
        >>> conn = geneva.connect("s3://my-bucket/my-db")
        >>> conn.define_manifest("my-manifest", m)
        >>> with conn.context(cluster="my-cluster", manifest="my-manifest"):
        >>>     conn.open_table("my-table").backfill("my-column")
    """

    def __init__(self) -> None:
        self._name: str | None = None
        self._version: str | None = None
        self._pip: list[str] = []
        self._py_modules: list[str] = []
        self._head_image: str | None = None
        self._worker_image: str | None = None
        self._skip_site_packages: bool = False
        self._delete_local_zips: bool = False
        self._local_zip_output_dir: str | None = None

    def name(self, name: str) -> "GenevaManifestBuilder":
        """Set the manifest name."""
        self._name = name
        return self

    def version(self, version: str) -> "GenevaManifestBuilder":
        """Set the manifest version."""
        self._version = version
        return self

    def pip(self, packages: list[str]) -> "GenevaManifestBuilder":
        """Set the runtime pip packages list.
        See
        https://docs.ray.io/en/latest/ray-core/api/doc/ray.runtime_env.RuntimeEnv.html
        """
        self._pip = packages.copy()
        return self

    def add_pip(self, package: str) -> "GenevaManifestBuilder":
        """Add a single pip package to the runtime environment.
        See
        https://docs.ray.io/en/latest/ray-core/api/doc/ray.runtime_env.RuntimeEnv.html
        """
        self._pip.append(package)
        return self

    def py_modules(self, modules: list[str]) -> "GenevaManifestBuilder":
        """Set the Python modules for the runtime environment.
        See
        https://docs.ray.io/en/latest/ray-core/api/doc/ray.runtime_env.RuntimeEnv.html
        """
        self._py_modules = modules.copy()
        return self

    def add_py_module(self, module: str) -> "GenevaManifestBuilder":
        """Add a single Python module to the runtime environment.
        See
        https://docs.ray.io/en/latest/ray-core/api/doc/ray.runtime_env.RuntimeEnv.html
        """
        self._py_modules.append(module)
        return self

    def head_image(self, head_image: str) -> "GenevaManifestBuilder":
        """Set the container image for Ray head. If set, this will take priority
        over the head image specified in the cluster definition.
        """
        self._head_image = head_image
        return self

    def worker_image(self, worker_image: str) -> "GenevaManifestBuilder":
        """Set the container image for Ray workers. If set, this will take priority
        over the head image specified in the cluster definition.
        """
        self._worker_image = worker_image
        return self

    def default_head_image(self) -> "GenevaManifestBuilder":
        """Set the container image for Ray head to the default for the
        current platform"""
        self._head_image = default_image()
        return self

    def default_worker_image(self) -> "GenevaManifestBuilder":
        """Set the container image for Ray workers to the default for the
        current platform."""
        self._worker_image = default_image()
        return self

    def skip_site_packages(self, skip: bool = True) -> "GenevaManifestBuilder":
        """Set whether to skip site packages during packaging."""
        self._skip_site_packages = skip
        return self

    def delete_local_zips(self, delete: bool = True) -> "GenevaManifestBuilder":
        """Set whether to delete local zip files after upload."""
        self._delete_local_zips = delete
        return self

    def local_zip_output_dir(self, output_dir: str) -> "GenevaManifestBuilder":
        """Set the local directory for zip file output."""
        self._local_zip_output_dir = output_dir
        return self

    def build(self) -> "GenevaManifest":
        """Build the GenevaManifest with the configured settings."""
        if self._name is None:
            raise ValueError("Manifest name is required. Use .name() to set it.")

        from .mgr import GenevaManifest

        return GenevaManifest(
            name=self._name,
            version=self._version,
            pip=self._pip,
            py_modules=self._py_modules,
            head_image=self._head_image,
            worker_image=self._worker_image,
            skip_site_packages=self._skip_site_packages,
            delete_local_zips=self._delete_local_zips,
            local_zip_output_dir=self._local_zip_output_dir,
        )

    @classmethod
    def create(cls, name: str) -> "GenevaManifestBuilder":
        """Create a new builder with the given manifest name."""
        return cls().name(name)

name

name(name: str) -> GenevaManifestBuilder

Set the manifest name.

Source code in geneva/manifest/builder.py
def name(self, name: str) -> "GenevaManifestBuilder":
    """Set the manifest name."""
    self._name = name
    return self

version

version(version: str) -> GenevaManifestBuilder

Set the manifest version.

Source code in geneva/manifest/builder.py
def version(self, version: str) -> "GenevaManifestBuilder":
    """Set the manifest version."""
    self._version = version
    return self

pip

pip(packages: list[str]) -> GenevaManifestBuilder

Set the runtime pip packages list. See https://docs.ray.io/en/latest/ray-core/api/doc/ray.runtime_env.RuntimeEnv.html

Source code in geneva/manifest/builder.py
def pip(self, packages: list[str]) -> "GenevaManifestBuilder":
    """Set the runtime pip packages list.
    See
    https://docs.ray.io/en/latest/ray-core/api/doc/ray.runtime_env.RuntimeEnv.html
    """
    self._pip = packages.copy()
    return self

add_pip

add_pip(package: str) -> GenevaManifestBuilder

Add a single pip package to the runtime environment. See https://docs.ray.io/en/latest/ray-core/api/doc/ray.runtime_env.RuntimeEnv.html

Source code in geneva/manifest/builder.py
def add_pip(self, package: str) -> "GenevaManifestBuilder":
    """Add a single pip package to the runtime environment.
    See
    https://docs.ray.io/en/latest/ray-core/api/doc/ray.runtime_env.RuntimeEnv.html
    """
    self._pip.append(package)
    return self

py_modules

py_modules(modules: list[str]) -> GenevaManifestBuilder

Set the Python modules for the runtime environment. See https://docs.ray.io/en/latest/ray-core/api/doc/ray.runtime_env.RuntimeEnv.html

Source code in geneva/manifest/builder.py
def py_modules(self, modules: list[str]) -> "GenevaManifestBuilder":
    """Set the Python modules for the runtime environment.
    See
    https://docs.ray.io/en/latest/ray-core/api/doc/ray.runtime_env.RuntimeEnv.html
    """
    self._py_modules = modules.copy()
    return self

add_py_module

add_py_module(module: str) -> GenevaManifestBuilder

Add a single Python module to the runtime environment. See https://docs.ray.io/en/latest/ray-core/api/doc/ray.runtime_env.RuntimeEnv.html

Source code in geneva/manifest/builder.py
def add_py_module(self, module: str) -> "GenevaManifestBuilder":
    """Add a single Python module to the runtime environment.
    See
    https://docs.ray.io/en/latest/ray-core/api/doc/ray.runtime_env.RuntimeEnv.html
    """
    self._py_modules.append(module)
    return self

head_image

head_image(head_image: str) -> GenevaManifestBuilder

Set the container image for Ray head. If set, this will take priority over the head image specified in the cluster definition.

Source code in geneva/manifest/builder.py
def head_image(self, head_image: str) -> "GenevaManifestBuilder":
    """Set the container image for Ray head. If set, this will take priority
    over the head image specified in the cluster definition.
    """
    self._head_image = head_image
    return self

worker_image

worker_image(worker_image: str) -> GenevaManifestBuilder

Set the container image for Ray workers. If set, this will take priority over the head image specified in the cluster definition.

Source code in geneva/manifest/builder.py
def worker_image(self, worker_image: str) -> "GenevaManifestBuilder":
    """Set the container image for Ray workers. If set, this will take priority
    over the head image specified in the cluster definition.
    """
    self._worker_image = worker_image
    return self

default_head_image

default_head_image() -> GenevaManifestBuilder

Set the container image for Ray head to the default for the current platform

Source code in geneva/manifest/builder.py
def default_head_image(self) -> "GenevaManifestBuilder":
    """Set the container image for Ray head to the default for the
    current platform"""
    self._head_image = default_image()
    return self

default_worker_image

default_worker_image() -> GenevaManifestBuilder

Set the container image for Ray workers to the default for the current platform.

Source code in geneva/manifest/builder.py
def default_worker_image(self) -> "GenevaManifestBuilder":
    """Set the container image for Ray workers to the default for the
    current platform."""
    self._worker_image = default_image()
    return self

skip_site_packages

skip_site_packages(
    skip: bool = True,
) -> GenevaManifestBuilder

Set whether to skip site packages during packaging.

Source code in geneva/manifest/builder.py
def skip_site_packages(self, skip: bool = True) -> "GenevaManifestBuilder":
    """Set whether to skip site packages during packaging."""
    self._skip_site_packages = skip
    return self

delete_local_zips

delete_local_zips(
    delete: bool = True,
) -> GenevaManifestBuilder

Set whether to delete local zip files after upload.

Source code in geneva/manifest/builder.py
def delete_local_zips(self, delete: bool = True) -> "GenevaManifestBuilder":
    """Set whether to delete local zip files after upload."""
    self._delete_local_zips = delete
    return self

local_zip_output_dir

local_zip_output_dir(
    output_dir: str,
) -> GenevaManifestBuilder

Set the local directory for zip file output.

Source code in geneva/manifest/builder.py
def local_zip_output_dir(self, output_dir: str) -> "GenevaManifestBuilder":
    """Set the local directory for zip file output."""
    self._local_zip_output_dir = output_dir
    return self

build

build() -> GenevaManifest

Build the GenevaManifest with the configured settings.

Source code in geneva/manifest/builder.py
def build(self) -> "GenevaManifest":
    """Build the GenevaManifest with the configured settings."""
    if self._name is None:
        raise ValueError("Manifest name is required. Use .name() to set it.")

    from .mgr import GenevaManifest

    return GenevaManifest(
        name=self._name,
        version=self._version,
        pip=self._pip,
        py_modules=self._py_modules,
        head_image=self._head_image,
        worker_image=self._worker_image,
        skip_site_packages=self._skip_site_packages,
        delete_local_zips=self._delete_local_zips,
        local_zip_output_dir=self._local_zip_output_dir,
    )

create

create(name: str) -> GenevaManifestBuilder

Create a new builder with the given manifest name.

Source code in geneva/manifest/builder.py
@classmethod
def create(cls, name: str) -> "GenevaManifestBuilder":
    """Create a new builder with the given manifest name."""
    return cls().name(name)