Skip to content

Connection

geneva.db.Connection

Bases: DBConnection

Geneva Connection.

Source code in geneva/db.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
class Connection(DBConnection):
    """Geneva Connection."""

    def __init__(
        self,
        uri: str,
        *,
        region: str = "us-east-1",
        api_key: Credential | None = None,
        host_override: str | None = None,
        storage_options: dict[str, str] | None = None,
        checkpoint_store: CheckpointStore | None = None,
        packager: UDFPackager | None = None,
        **kwargs,
    ) -> None:
        super().__init__()

        self._uri = uri
        self._region = region
        self._api_key = api_key
        self._host_override = host_override
        self._storage_options = storage_options
        self._ldb: DBConnection | None = None
        self._checkpoint_store = checkpoint_store
        self._packager = packager or DockerUDFPackager()
        self._jobs_manager: JobStateManager | None = None  # noqa: F821
        self._cluster_manager: ClusterManager | None = None  # noqa: F821
        self._manifest_manager: ManifestManager | None = None  # noqa: F821
        self._flight_client: FlightSQLClient | None = None
        self._kwargs = kwargs

    def __repr__(self) -> str:
        return f"<Geneva uri={self.uri}>"

    def __getstate__(self) -> dict:
        return {
            "uri": self._uri,
            "api_key": self._api_key,
            "host_override": self._host_override,
            "storage_options": self._storage_options,
            "region": self._region,
        }

    def __setstate__(self, state) -> None:
        self.__init__(state.pop("uri"), **state)

    def __enter__(self) -> "Connection":
        return self

    def __exit__(self, exc_type, exc_value, traceback) -> None:
        self.close()
        return None  # Don't suppress exceptions

    def close(self) -> None:
        """Close the connection."""
        if self._flight_client is not None:
            self._flight_client.close()
        if self._ldb is not None:
            # go to the async client and eagerly close the connection
            self._ldb._conn.close()

    @cached_property
    def _connect(self) -> DBConnection:
        """Returns the underlying lancedb connection."""
        if self._ldb is None:
            self._ldb = lancedb.connect(
                self.uri,
                region=self._region,
                api_key=self._api_key,
                host_override=self._host_override,
                storage_options=self._storage_options,
                **self._kwargs,
            )
        return self._ldb

    @cached_property
    def _history(self) -> "JobStateManager":  # noqa: F821
        """Returns a JobStateManager that persists job executions and statuses"""
        from geneva.jobs import JobStateManager

        if self._jobs_manager is None:
            self._jobs_manager = JobStateManager(self)
        return self._jobs_manager

    @cached_property
    def flight_client(self) -> "flightsql.FlightSQLClient":  # noqa: F821
        from flightsql import FlightSQLClient

        if self._flight_client is not None:
            return self._flight_client
        url = urlparse(self._host_override)
        hostname = url.hostname
        client = FlightSQLClient(
            host=hostname,
            port=10025,
            token="DATABASE_TOKEN",  # Dummy auth, not plugged in yet
            metadata={"database": self.uri},  # Name of the project-id
            features={"metadata-reflection": "true"},
            insecure=True,  # or False, up to you
        )
        self._flight_client = client
        return client

    @override
    def table_names(
        self,
        page_token: str | None = None,
        limit: int | None = None,
    ) -> Iterable[str]:
        """List all available tables and views."""
        return self._connect.table_names(page_token=page_token, limit=limit or 10)

    @override
    def open_table(
        self,
        name: str,
        *,
        storage_options: dict[str, str] | None = None,
        index_cache_size: int | None = None,
        version: int | None = None,
    ) -> "Table":
        """Open a Lance Table.

        Parameters
        ----------
        name: str
            Name of the table.
        storage_options: dict[str, str], optional
            Additional options for the storage backend.
            Options already set on the connection will be inherited by the table,
            but can be overridden here. See available options at
            [https://lancedb.github.io/lancedb/guides/storage/](https://lancedb.github.io/lancedb/guides/storage/)


        """
        from .table import Table

        storage_options = storage_options or self._storage_options

        return Table(
            self,
            name,
            index_cache_size=index_cache_size,
            storage_options=storage_options,
            version=version,
        )

    @override
    def create_table(  # type: ignore
        self,
        name: str,
        data: DATA | None = None,
        schema: pa.Schema | LanceModel | None = None,
        mode: str = "create",
        exist_ok: bool = False,
        on_bad_vectors: str = "error",
        fill_value: float = 0.0,
        *,
        storage_options: dict[str, str] | None = None,
        **kwargs,
    ) -> "Table":  # type: ignore
        """Create a Table in the lake

        Parameters
        ----------
        name: str
            The name of the table
        data: The data to initialize the table, *optional*
            User must provide at least one of `data` or `schema`.
            Acceptable types are:

            - list-of-dict
            - pandas.DataFrame
            - pyarrow.Table or pyarrow.RecordBatch
        schema: The schema of the table, *optional*
            Acceptable types are:

            - pyarrow.Schema
            - [LanceModel][lancedb.pydantic.LanceModel]
        mode: str; default "create"
            The mode to use when creating the table.
            Can be either "create" or "overwrite".
            By default, if the table already exists, an exception is raised.
            If you want to overwrite the table, use mode="overwrite".
        exist_ok: bool, default False
            If a table by the same name already exists, then raise an exception
            if exist_ok=False. If exist_ok=True, then open the existing table;
            it will not add the provided data but will validate against any
            schema that's specified.
        on_bad_vectors: str, default "error"
            What to do if any of the vectors are not the same size or contain NaNs.
            One of "error", "drop", "fill".
        """
        from .table import Table

        self._connect.create_table(
            name,
            data,
            schema,
            mode,
            exist_ok=exist_ok,
            on_bad_vectors=on_bad_vectors,
            fill_value=fill_value,
            storage_options=storage_options,
            **kwargs,
        )
        return Table(self, name, storage_options=storage_options)

    def create_view(
        self,
        name: str,
        query: str,
        materialized: bool = False,
    ) -> "Table":
        """Create a View from a Query.

        Parameters
        ----------
        name: str
            Name of the view.
        query: str
            SQL query to create the view.
        materialized: bool, optional
            If True, the view is materialized.
        """
        if materialized:
            # idea, rename the provided name, and use it as the basis for the
            # materialized view.
            # - how do we add the udfs to the final materialized view table?
            NotImplementedError(
                "creating materialized view via sql query is not supported yet."
            )

        # TODO add test coverage here
        self.sql(f"CREATE VIEW {name} AS ({query})")
        return self.open_table(name)

    def create_materialized_view(
        self,
        name: str,
        query: "GenevaQueryBuilder",
        with_no_data: bool = True,
    ) -> "Table":
        """
        Create a materialized view

        Parameters
        ----------
        name: str
            Name of the materialized view.
        query: GenevaQueryBuilder
            Query to create the view.
        with_no_data: bool, optional
            If True, the view is materialized, if false it is ready for refresh.
        """
        from geneva.query import GenevaQueryBuilder

        if not isinstance(query, GenevaQueryBuilder):
            raise ValueError(
                "Materialized views only support plain queries (where, select)"
            )

        tbl = query.create_materialized_view(self, name)
        if not with_no_data:
            tbl.refresh_view(name)

        return tbl

    def drop_view(self, name: str) -> pa.Table:
        """Drop a view."""
        return self.sql(f"DROP VIEW {name}")

    @override
    def drop_table(self, name: str) -> None:
        """Drop a table."""
        self._connect.drop_table(name)

    def define_cluster(self, name: str, cluster: "GenevaCluster") -> None:  # noqa: F821
        from geneva.cluster.mgr import ClusterConfigManager

        if self._cluster_manager is None:
            self._cluster_manager = ClusterConfigManager(self)

        cluster.name = name
        cluster.validate()
        self._cluster_manager.upsert(cluster)

    def list_clusters(self) -> list["GenevaCluster"]:  # noqa: F821
        from geneva.cluster.mgr import ClusterConfigManager

        if self._cluster_manager is None:
            self._cluster_manager = ClusterConfigManager(self)
        return self._cluster_manager.list()

    def delete_cluster(self, name: str) -> None:  # noqa: F821
        from geneva.cluster.mgr import ClusterConfigManager

        if self._cluster_manager is None:
            self._cluster_manager = ClusterConfigManager(self)

        self._cluster_manager.delete(name)

    def define_manifest(
        self,
        name: str,
        manifest: "GenevaManifest",  # noqa: F821
        uploader: Uploader = None,
    ) -> None:
        from geneva.manifest.mgr import ManifestConfigManager

        if self._manifest_manager is None:
            self._manifest_manager = ManifestConfigManager(self)

        with upload_local_env(
            # todo: implement excludes
            uploader=uploader,
            zip_output_dir=manifest.local_zip_output_dir,
            delete_local_zips=manifest.delete_local_zips,
            skip_site_packages=manifest.skip_site_packages,
        ) as zips:
            m = copy.deepcopy(manifest)
            m.name = name
            m.zips = zips
            m.checksum = manifest.compute_checksum()
            self._manifest_manager.upsert(m)

    def list_manifests(self) -> list["GenevaManifest"]:  # noqa: F821
        from geneva.manifest.mgr import ManifestConfigManager

        if self._manifest_manager is None:
            self._manifest_manager = ManifestConfigManager(self)
        return self._manifest_manager.list()

    def delete_manifest(self, name: str) -> None:  # noqa: F821
        from geneva.manifest.mgr import ManifestConfigManager

        if self._manifest_manager is None:
            self._manifest_manager = ManifestConfigManager(self)

        self._manifest_manager.delete(name)

    def context(
        self,
        cluster: str | None = None,
        manifest: str | None = None,
        cluster_type=GenevaClusterType.KUBE_RAY,
        on_exit=None,
    ) -> Generator[None, None, None]:
        """Context manager for a Geneva Execution Environment.
            This will provision a cluster based on the cluster
            definition and the manifest provided.
            By default, the context manager will delete the cluster on exit -
            this can be configured with the on_exit parameter.
        Parameters
        ----------
        cluster: str
            Name of the persisted cluster definition to use. This will
            raise an exception if the cluster definition was not
            defined via define_cluster().
        manifest: str
            Name of the persisted manifest to use. This will
            raise an exception if the cluster definition was not
            defined via define_manifest().
        cluster_type: GenevaClusterType, optional
            Type of the cluster to use. By default, KUBE_RAY will be used.
            To start a local Ray cluster, use GenevaClusterType.LOCAL_RAY.
            The cluster parameter is not used for LOCAL_RAY.
        on_exit: ExitMode, optional
            Exit mode for the cluster. By default (ExitMode.DELETE),
            the cluster will be deleted when the context manager exits.
        """
        from geneva.cluster.mgr import ClusterConfigManager
        from geneva.manifest.mgr import ManifestConfigManager
        from geneva.runners.ray._mgr import ray_cluster
        from geneva.runners.ray.raycluster import ExitMode

        if self._cluster_manager is None:
            self._cluster_manager = ClusterConfigManager(self)
        if self._manifest_manager is None:
            self._manifest_manager = ManifestConfigManager(self)

        if GenevaClusterType(cluster_type) == GenevaClusterType.LOCAL_RAY:
            if manifest is not None:
                raise ValueError("custom manifest not supported with LOCAL_RAY")
            return ray_cluster(local=True)

        cluster_def = self._cluster_manager.load(cluster)
        if cluster_def is None:
            raise Exception(
                f"cluster definition '{cluster}' not found. "
                f"Create a new cluster with define_cluster()"
            )

        supported_types = {GenevaClusterType.KUBE_RAY}
        if GenevaClusterType(cluster_def.cluster_type) not in supported_types:
            raise ValueError(
                f"cluster_type must be one of {supported_types} to use context()"
            )
        c = cluster_def.as_dict()
        use_portforwarding = c["kuberay"].get("use_portforwarding", True)
        rc = cluster_def.to_ray_cluster()
        rc.on_exit = on_exit or ExitMode.DELETE

        manifest_def = None
        if manifest is not None:
            manifest_def = self._manifest_manager.load(manifest)
            if manifest_def is None:
                raise Exception(
                    f"manifest definition '{manifest}' not found. "
                    f"Create a new manifest with define_manifest()"
                )

        return ray_cluster(
            use_portforwarding=use_portforwarding, ray_cluster=rc, manifest=manifest_def
        )

    def sql(self, query: str) -> pa.Table:
        """Execute a raw SQL query.

        It uses the Flight SQL engine to execute the query.

        Parameters
        ----------
        query: str
            SQL query to execute

        Returns
        -------
        pyarrow.Table
            Result of the query in a `pyarrow.Table`

        TODO
        ----
        - Support pagination
        - Support query parameters
        """
        info = self.flight_client.execute(query)
        return self.flight_client.do_get(info.endpoints[0].ticket).read_all()

flight_client

flight_client: FlightSQLClient

close

close() -> None

Close the connection.

Source code in geneva/db.py
def close(self) -> None:
    """Close the connection."""
    if self._flight_client is not None:
        self._flight_client.close()
    if self._ldb is not None:
        # go to the async client and eagerly close the connection
        self._ldb._conn.close()

table_names

table_names(
    page_token: str | None = None, limit: int | None = None
) -> Iterable[str]

List all available tables and views.

Source code in geneva/db.py
@override
def table_names(
    self,
    page_token: str | None = None,
    limit: int | None = None,
) -> Iterable[str]:
    """List all available tables and views."""
    return self._connect.table_names(page_token=page_token, limit=limit or 10)

open_table

open_table(
    name: str,
    *,
    storage_options: dict[str, str] | None = None,
    index_cache_size: int | None = None,
    version: int | None = None,
) -> Table

Open a Lance Table.

Parameters:

  • name (str) –

    Name of the table.

  • storage_options (dict[str, str] | None, default: None ) –

    Additional options for the storage backend. Options already set on the connection will be inherited by the table, but can be overridden here. See available options at https://lancedb.github.io/lancedb/guides/storage/

Source code in geneva/db.py
@override
def open_table(
    self,
    name: str,
    *,
    storage_options: dict[str, str] | None = None,
    index_cache_size: int | None = None,
    version: int | None = None,
) -> "Table":
    """Open a Lance Table.

    Parameters
    ----------
    name: str
        Name of the table.
    storage_options: dict[str, str], optional
        Additional options for the storage backend.
        Options already set on the connection will be inherited by the table,
        but can be overridden here. See available options at
        [https://lancedb.github.io/lancedb/guides/storage/](https://lancedb.github.io/lancedb/guides/storage/)


    """
    from .table import Table

    storage_options = storage_options or self._storage_options

    return Table(
        self,
        name,
        index_cache_size=index_cache_size,
        storage_options=storage_options,
        version=version,
    )

create_table

create_table(
    name: str,
    data: DATA | None = None,
    schema: Schema | LanceModel | None = None,
    mode: str = "create",
    exist_ok: bool = False,
    on_bad_vectors: str = "error",
    fill_value: float = 0.0,
    *,
    storage_options: dict[str, str] | None = None,
    **kwargs,
) -> Table

Create a Table in the lake

Parameters:

  • name (str) –

    The name of the table

  • data (DATA | None, default: None ) –

    User must provide at least one of data or schema. Acceptable types are:

    • list-of-dict
    • pandas.DataFrame
    • pyarrow.Table or pyarrow.RecordBatch
  • schema (Schema | LanceModel | None, default: None ) –

    Acceptable types are:

    • pyarrow.Schema
    • [LanceModel][lancedb.pydantic.LanceModel]
  • mode (str, default: 'create' ) –

    The mode to use when creating the table. Can be either "create" or "overwrite". By default, if the table already exists, an exception is raised. If you want to overwrite the table, use mode="overwrite".

  • exist_ok (bool, default: False ) –

    If a table by the same name already exists, then raise an exception if exist_ok=False. If exist_ok=True, then open the existing table; it will not add the provided data but will validate against any schema that's specified.

  • on_bad_vectors (str, default: 'error' ) –

    What to do if any of the vectors are not the same size or contain NaNs. One of "error", "drop", "fill".

Source code in geneva/db.py
@override
def create_table(  # type: ignore
    self,
    name: str,
    data: DATA | None = None,
    schema: pa.Schema | LanceModel | None = None,
    mode: str = "create",
    exist_ok: bool = False,
    on_bad_vectors: str = "error",
    fill_value: float = 0.0,
    *,
    storage_options: dict[str, str] | None = None,
    **kwargs,
) -> "Table":  # type: ignore
    """Create a Table in the lake

    Parameters
    ----------
    name: str
        The name of the table
    data: The data to initialize the table, *optional*
        User must provide at least one of `data` or `schema`.
        Acceptable types are:

        - list-of-dict
        - pandas.DataFrame
        - pyarrow.Table or pyarrow.RecordBatch
    schema: The schema of the table, *optional*
        Acceptable types are:

        - pyarrow.Schema
        - [LanceModel][lancedb.pydantic.LanceModel]
    mode: str; default "create"
        The mode to use when creating the table.
        Can be either "create" or "overwrite".
        By default, if the table already exists, an exception is raised.
        If you want to overwrite the table, use mode="overwrite".
    exist_ok: bool, default False
        If a table by the same name already exists, then raise an exception
        if exist_ok=False. If exist_ok=True, then open the existing table;
        it will not add the provided data but will validate against any
        schema that's specified.
    on_bad_vectors: str, default "error"
        What to do if any of the vectors are not the same size or contain NaNs.
        One of "error", "drop", "fill".
    """
    from .table import Table

    self._connect.create_table(
        name,
        data,
        schema,
        mode,
        exist_ok=exist_ok,
        on_bad_vectors=on_bad_vectors,
        fill_value=fill_value,
        storage_options=storage_options,
        **kwargs,
    )
    return Table(self, name, storage_options=storage_options)

create_view

create_view(
    name: str, query: str, materialized: bool = False
) -> Table

Create a View from a Query.

Parameters:

  • name (str) –

    Name of the view.

  • query (str) –

    SQL query to create the view.

  • materialized (bool, default: False ) –

    If True, the view is materialized.

Source code in geneva/db.py
def create_view(
    self,
    name: str,
    query: str,
    materialized: bool = False,
) -> "Table":
    """Create a View from a Query.

    Parameters
    ----------
    name: str
        Name of the view.
    query: str
        SQL query to create the view.
    materialized: bool, optional
        If True, the view is materialized.
    """
    if materialized:
        # idea, rename the provided name, and use it as the basis for the
        # materialized view.
        # - how do we add the udfs to the final materialized view table?
        NotImplementedError(
            "creating materialized view via sql query is not supported yet."
        )

    # TODO add test coverage here
    self.sql(f"CREATE VIEW {name} AS ({query})")
    return self.open_table(name)

create_materialized_view

create_materialized_view(
    name: str,
    query: GenevaQueryBuilder,
    with_no_data: bool = True,
) -> Table

Create a materialized view

Parameters:

  • name (str) –

    Name of the materialized view.

  • query (GenevaQueryBuilder) –

    Query to create the view.

  • with_no_data (bool, default: True ) –

    If True, the view is materialized, if false it is ready for refresh.

Source code in geneva/db.py
def create_materialized_view(
    self,
    name: str,
    query: "GenevaQueryBuilder",
    with_no_data: bool = True,
) -> "Table":
    """
    Create a materialized view

    Parameters
    ----------
    name: str
        Name of the materialized view.
    query: GenevaQueryBuilder
        Query to create the view.
    with_no_data: bool, optional
        If True, the view is materialized, if false it is ready for refresh.
    """
    from geneva.query import GenevaQueryBuilder

    if not isinstance(query, GenevaQueryBuilder):
        raise ValueError(
            "Materialized views only support plain queries (where, select)"
        )

    tbl = query.create_materialized_view(self, name)
    if not with_no_data:
        tbl.refresh_view(name)

    return tbl

drop_view

drop_view(name: str) -> Table

Drop a view.

Source code in geneva/db.py
def drop_view(self, name: str) -> pa.Table:
    """Drop a view."""
    return self.sql(f"DROP VIEW {name}")

drop_table

drop_table(name: str) -> None

Drop a table.

Source code in geneva/db.py
@override
def drop_table(self, name: str) -> None:
    """Drop a table."""
    self._connect.drop_table(name)

define_cluster

define_cluster(name: str, cluster: GenevaCluster) -> None
Source code in geneva/db.py
def define_cluster(self, name: str, cluster: "GenevaCluster") -> None:  # noqa: F821
    from geneva.cluster.mgr import ClusterConfigManager

    if self._cluster_manager is None:
        self._cluster_manager = ClusterConfigManager(self)

    cluster.name = name
    cluster.validate()
    self._cluster_manager.upsert(cluster)

list_clusters

list_clusters() -> list[GenevaCluster]
Source code in geneva/db.py
def list_clusters(self) -> list["GenevaCluster"]:  # noqa: F821
    from geneva.cluster.mgr import ClusterConfigManager

    if self._cluster_manager is None:
        self._cluster_manager = ClusterConfigManager(self)
    return self._cluster_manager.list()

delete_cluster

delete_cluster(name: str) -> None
Source code in geneva/db.py
def delete_cluster(self, name: str) -> None:  # noqa: F821
    from geneva.cluster.mgr import ClusterConfigManager

    if self._cluster_manager is None:
        self._cluster_manager = ClusterConfigManager(self)

    self._cluster_manager.delete(name)

define_manifest

define_manifest(
    name: str,
    manifest: GenevaManifest,
    uploader: Uploader = None,
) -> None
Source code in geneva/db.py
def define_manifest(
    self,
    name: str,
    manifest: "GenevaManifest",  # noqa: F821
    uploader: Uploader = None,
) -> None:
    from geneva.manifest.mgr import ManifestConfigManager

    if self._manifest_manager is None:
        self._manifest_manager = ManifestConfigManager(self)

    with upload_local_env(
        # todo: implement excludes
        uploader=uploader,
        zip_output_dir=manifest.local_zip_output_dir,
        delete_local_zips=manifest.delete_local_zips,
        skip_site_packages=manifest.skip_site_packages,
    ) as zips:
        m = copy.deepcopy(manifest)
        m.name = name
        m.zips = zips
        m.checksum = manifest.compute_checksum()
        self._manifest_manager.upsert(m)

list_manifests

list_manifests() -> list[GenevaManifest]
Source code in geneva/db.py
def list_manifests(self) -> list["GenevaManifest"]:  # noqa: F821
    from geneva.manifest.mgr import ManifestConfigManager

    if self._manifest_manager is None:
        self._manifest_manager = ManifestConfigManager(self)
    return self._manifest_manager.list()

delete_manifest

delete_manifest(name: str) -> None
Source code in geneva/db.py
def delete_manifest(self, name: str) -> None:  # noqa: F821
    from geneva.manifest.mgr import ManifestConfigManager

    if self._manifest_manager is None:
        self._manifest_manager = ManifestConfigManager(self)

    self._manifest_manager.delete(name)

context

context(
    cluster: str | None = None,
    manifest: str | None = None,
    cluster_type=KUBE_RAY,
    on_exit=None,
) -> Generator[None, None, None]

Context manager for a Geneva Execution Environment. This will provision a cluster based on the cluster definition and the manifest provided. By default, the context manager will delete the cluster on exit - this can be configured with the on_exit parameter.

Parameters:

  • cluster (str | None, default: None ) –

    Name of the persisted cluster definition to use. This will raise an exception if the cluster definition was not defined via define_cluster().

  • manifest (str | None, default: None ) –

    Name of the persisted manifest to use. This will raise an exception if the cluster definition was not defined via define_manifest().

  • cluster_type

    Type of the cluster to use. By default, KUBE_RAY will be used. To start a local Ray cluster, use GenevaClusterType.LOCAL_RAY. The cluster parameter is not used for LOCAL_RAY.

  • on_exit

    Exit mode for the cluster. By default (ExitMode.DELETE), the cluster will be deleted when the context manager exits.

Source code in geneva/db.py
def context(
    self,
    cluster: str | None = None,
    manifest: str | None = None,
    cluster_type=GenevaClusterType.KUBE_RAY,
    on_exit=None,
) -> Generator[None, None, None]:
    """Context manager for a Geneva Execution Environment.
        This will provision a cluster based on the cluster
        definition and the manifest provided.
        By default, the context manager will delete the cluster on exit -
        this can be configured with the on_exit parameter.
    Parameters
    ----------
    cluster: str
        Name of the persisted cluster definition to use. This will
        raise an exception if the cluster definition was not
        defined via define_cluster().
    manifest: str
        Name of the persisted manifest to use. This will
        raise an exception if the cluster definition was not
        defined via define_manifest().
    cluster_type: GenevaClusterType, optional
        Type of the cluster to use. By default, KUBE_RAY will be used.
        To start a local Ray cluster, use GenevaClusterType.LOCAL_RAY.
        The cluster parameter is not used for LOCAL_RAY.
    on_exit: ExitMode, optional
        Exit mode for the cluster. By default (ExitMode.DELETE),
        the cluster will be deleted when the context manager exits.
    """
    from geneva.cluster.mgr import ClusterConfigManager
    from geneva.manifest.mgr import ManifestConfigManager
    from geneva.runners.ray._mgr import ray_cluster
    from geneva.runners.ray.raycluster import ExitMode

    if self._cluster_manager is None:
        self._cluster_manager = ClusterConfigManager(self)
    if self._manifest_manager is None:
        self._manifest_manager = ManifestConfigManager(self)

    if GenevaClusterType(cluster_type) == GenevaClusterType.LOCAL_RAY:
        if manifest is not None:
            raise ValueError("custom manifest not supported with LOCAL_RAY")
        return ray_cluster(local=True)

    cluster_def = self._cluster_manager.load(cluster)
    if cluster_def is None:
        raise Exception(
            f"cluster definition '{cluster}' not found. "
            f"Create a new cluster with define_cluster()"
        )

    supported_types = {GenevaClusterType.KUBE_RAY}
    if GenevaClusterType(cluster_def.cluster_type) not in supported_types:
        raise ValueError(
            f"cluster_type must be one of {supported_types} to use context()"
        )
    c = cluster_def.as_dict()
    use_portforwarding = c["kuberay"].get("use_portforwarding", True)
    rc = cluster_def.to_ray_cluster()
    rc.on_exit = on_exit or ExitMode.DELETE

    manifest_def = None
    if manifest is not None:
        manifest_def = self._manifest_manager.load(manifest)
        if manifest_def is None:
            raise Exception(
                f"manifest definition '{manifest}' not found. "
                f"Create a new manifest with define_manifest()"
            )

    return ray_cluster(
        use_portforwarding=use_portforwarding, ray_cluster=rc, manifest=manifest_def
    )

sql

sql(query: str) -> Table

Execute a raw SQL query.

It uses the Flight SQL engine to execute the query.

Parameters:

  • query (str) –

    SQL query to execute

Returns:

  • Table

    Result of the query in a pyarrow.Table

TODO
  • Support pagination
  • Support query parameters
Source code in geneva/db.py
def sql(self, query: str) -> pa.Table:
    """Execute a raw SQL query.

    It uses the Flight SQL engine to execute the query.

    Parameters
    ----------
    query: str
        SQL query to execute

    Returns
    -------
    pyarrow.Table
        Result of the query in a `pyarrow.Table`

    TODO
    ----
    - Support pagination
    - Support query parameters
    """
    info = self.flight_client.execute(query)
    return self.flight_client.do_get(info.endpoints[0].ticket).read_all()