Skip to content

Connection

geneva.db.Connection

Bases: DBConnection

Geneva Connection.

Source code in geneva/db.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
class Connection(DBConnection):
    """Geneva Connection."""

    def __init__(
        self,
        uri: str,
        *,
        region: str = "us-east-1",
        api_key: Credential | None = None,
        host_override: str | None = None,
        storage_options: dict[str, str] | None = None,
        checkpoint_store: CheckpointStore | None = None,
        packager: UDFPackager | None = None,
        **kwargs,
    ) -> None:
        super().__init__()

        self._uri = uri
        self._region = region
        self._api_key = api_key
        self._host_override = host_override
        self._storage_options = storage_options
        self._ldb: DBConnection | None = None
        self._checkpoint_store = checkpoint_store
        self._packager = packager or DockerUDFPackager()
        self._jobs_manager: JobStateManager | None = None
        self._cluster_manager: ClusterConfigManager | None = None
        self._manifest_manager: ManifestConfigManager | None = None
        self._flight_client: FlightSQLClient | None = None
        self._kwargs = kwargs

    def __repr__(self) -> str:
        return f"<Geneva uri={self.uri}>"

    def __getstate__(self) -> dict:
        return {
            "uri": self._uri,
            "api_key": self._api_key,
            "host_override": self._host_override,
            "storage_options": self._storage_options,
            "region": self._region,
        }

    def __setstate__(self, state) -> None:
        self.__init__(state.pop("uri"), **state)

    def __enter__(self) -> "Connection":
        return self

    def __exit__(self, exc_type, exc_value, traceback) -> None:
        self.close()
        return None  # Don't suppress exceptions

    def close(self) -> None:
        """Close the connection."""
        if self._flight_client is not None:
            self._flight_client.close()
        if self._ldb is not None and hasattr(self._ldb, "_conn"):
            # go to the async client and eagerly close the connection
            self._ldb._conn.close()  # type: ignore[attr-defined]

    @cached_property
    def _connect(self) -> DBConnection:
        """Returns the underlying lancedb connection."""
        if self._ldb is None:
            self._ldb = lancedb.connect(
                self.uri,
                region=self._region,
                api_key=self._api_key,
                host_override=self._host_override,
                storage_options=self._storage_options,
                **self._kwargs,
            )
        return self._ldb

    @cached_property
    def _history(self) -> "JobStateManager":  # noqa: F821
        """Returns a JobStateManager that persists job executions and statuses"""
        from geneva.jobs import JobStateManager

        if self._jobs_manager is None:
            self._jobs_manager = JobStateManager(self)
        return self._jobs_manager

    @cached_property
    def flight_client(self) -> "FlightSQLClient":
        from flightsql import FlightSQLClient

        if self._flight_client is not None:
            return self._flight_client
        url = urlparse(self._host_override)
        hostname = url.hostname
        client = FlightSQLClient(
            host=hostname,
            port=10025,
            token="DATABASE_TOKEN",  # Dummy auth, not plugged in yet
            metadata={"database": self.uri},  # Name of the project-id
            features={"metadata-reflection": "true"},
            insecure=True,  # or False, up to you
        )
        self._flight_client = client
        return client

    @override
    def table_names(
        self, page_token: str | None = None, limit: int | None = None, *args, **kwargs
    ) -> Iterable[str]:
        """List all available tables and views."""
        return self._connect.table_names(
            *args, page_token=page_token, limit=limit or 10, **kwargs
        )

    @override
    def open_table(
        self,
        name: str,
        storage_options: dict[str, str] | None = None,
        index_cache_size: int | None = None,
        version: int | None = None,
        *args,
        **kwargs,
    ) -> "Table":
        """Open a Lance Table.

        Parameters
        ----------
        name: str
            Name of the table.
        storage_options: dict[str, str], optional
            Additional options for the storage backend.
            Options already set on the connection will be inherited by the table,
            but can be overridden here. See available options at
            [https://lancedb.github.io/lancedb/guides/storage/](https://lancedb.github.io/lancedb/guides/storage/)


        """
        from .table import Table

        storage_options = storage_options or self._storage_options

        return Table(
            self,
            name,
            index_cache_size=index_cache_size,
            storage_options=storage_options,
            version=version,
        )

    @override
    def create_table(  # type: ignore
        self,
        name: str,
        data: DATA | None = None,
        schema: pa.Schema | LanceModel | None = None,
        mode: str = "create",
        exist_ok: bool = False,
        on_bad_vectors: str = "error",
        fill_value: float = 0.0,
        storage_options: dict[str, str] | None = None,
        *args,
        **kwargs,
    ) -> "Table":  # type: ignore
        """Create a Table in the lake

        Parameters
        ----------
        name: str
            The name of the table
        data: The data to initialize the table, *optional*
            User must provide at least one of `data` or `schema`.
            Acceptable types are:

            - list-of-dict
            - pandas.DataFrame
            - pyarrow.Table or pyarrow.RecordBatch
        schema: The schema of the table, *optional*
            Acceptable types are:

            - pyarrow.Schema
            - [LanceModel][lancedb.pydantic.LanceModel]
        mode: str; default "create"
            The mode to use when creating the table.
            Can be either "create" or "overwrite".
            By default, if the table already exists, an exception is raised.
            If you want to overwrite the table, use mode="overwrite".
        exist_ok: bool, default False
            If a table by the same name already exists, then raise an exception
            if exist_ok=False. If exist_ok=True, then open the existing table;
            it will not add the provided data but will validate against any
            schema that's specified.
        on_bad_vectors: str, default "error"
            What to do if any of the vectors are not the same size or contain NaNs.
            One of "error", "drop", "fill".
        """
        from .table import Table

        self._connect.create_table(
            name,
            data,
            schema,
            mode,
            *args,
            exist_ok=exist_ok,
            on_bad_vectors=on_bad_vectors,
            fill_value=fill_value,
            storage_options=storage_options,
            **kwargs,
        )
        return Table(self, name, storage_options=storage_options)

    def create_view(
        self,
        name: str,
        query: str,
        materialized: bool = False,
    ) -> "Table":
        """Create a View from a Query.

        Parameters
        ----------
        name: str
            Name of the view.
        query: str
            SQL query to create the view.
        materialized: bool, optional
            If True, the view is materialized.
        """
        if materialized:
            # idea, rename the provided name, and use it as the basis for the
            # materialized view.
            # - how do we add the udfs to the final materialized view table?
            NotImplementedError(
                "creating materialized view via sql query is not supported yet."
            )

        # TODO add test coverage here
        self.sql(f"CREATE VIEW {name} AS ({query})")
        return self.open_table(name)

    def create_materialized_view(
        self,
        name: str,
        query: "GenevaQueryBuilder",
        with_no_data: bool = True,
    ) -> "Table":
        """
        Create a materialized view

        Parameters
        ----------
        name: str
            Name of the materialized view.
        query: GenevaQueryBuilder
            Query to create the view.
        with_no_data: bool, optional
            If True, the view is materialized, if false it is ready for refresh.
        """
        from geneva.query import GenevaQueryBuilder

        if not isinstance(query, GenevaQueryBuilder):
            raise ValueError(
                "Materialized views only support plain queries (where, select)"
            )

        tbl = query.create_materialized_view(self, name)
        if not with_no_data and hasattr(tbl, "refresh_view"):
            tbl.refresh_view(name)  # type: ignore[attr-defined]

        return tbl

    def drop_view(self, name: str) -> pa.Table:
        """Drop a view."""
        return self.sql(f"DROP VIEW {name}")

    @override
    def drop_table(self, name: str, *args, **kwargs) -> None:
        """Drop a table."""
        self._connect.drop_table(name, *args, **kwargs)

    def define_cluster(self, name: str, cluster: "GenevaCluster") -> None:  # noqa: F821
        """
        Define a persistent Geneva cluster. This will upsert the cluster definition by
        name. The cluster can then be provisioned using `context(cluster=name)`.

        Parameters
        ----------
        name: str
            Name of the cluster. This will be used as the key when upserting and
            provisioning the cluster. The cluster name must comply with RFC 12123.
        cluster: GenevaCluster
            The cluster definition to store.
        """
        from geneva.cluster.mgr import ClusterConfigManager

        if self._cluster_manager is None:
            self._cluster_manager = ClusterConfigManager(self)

        cluster.name = name
        cluster.validate()
        self._cluster_manager.upsert(cluster)

    def list_clusters(self) -> list["GenevaCluster"]:  # noqa: F821
        """
        List the cluster definitions. These can be defined using `define_cluster()`.

        Returns
        -------
        Iterable of GenevaCluster
            List of Geneva cluster definitions
        """
        from geneva.cluster.mgr import ClusterConfigManager

        if self._cluster_manager is None:
            self._cluster_manager = ClusterConfigManager(self)
        return self._cluster_manager.list()

    def delete_cluster(self, name: str) -> None:  # noqa: F821
        """
        Delete a Geneva cluster definition.

        Parameters
        ----------
        name: str
            Name of the cluster to delete.
        """
        from geneva.cluster.mgr import ClusterConfigManager

        if self._cluster_manager is None:
            self._cluster_manager = ClusterConfigManager(self)

        self._cluster_manager.delete(name)

    def define_manifest(
        self,
        name: str,
        manifest: "GenevaManifest",  # noqa: F821
        uploader: Uploader | None = None,
    ) -> None:
        """
        Define a persistent Geneva Manifest that represents the files and dependencies
        used in the execution environment. This will upsert the manifest definition by
        name and upload the required artifacts. The manifest can then be used with
        `context(manifest=name)`.

        Parameters
        ----------
        name: str
            Name of the manifest. This will be used as the key when upserting and
            loading the manifest.
        manifest: GenevaManifest
            The manifest definition to use.
        uploader: Uploader, optional
            An optional, custom Uploader to use. If not provided, the uploader will be
            auto-detected based on the
            environment configuration.
        """
        from geneva.manifest.mgr import ManifestConfigManager

        if self._manifest_manager is None:
            self._manifest_manager = ManifestConfigManager(self)

        with upload_local_env(
            # todo: implement excludes
            uploader=uploader,
            zip_output_dir=manifest.local_zip_output_dir,
            delete_local_zips=manifest.delete_local_zips,
            skip_site_packages=manifest.skip_site_packages,
        ) as zips:
            m = copy.deepcopy(manifest)
            m.name = name
            m.zips = zips
            m.checksum = manifest.compute_checksum()
            self._manifest_manager.upsert(m)

    def list_manifests(self) -> list["GenevaManifest"]:  # noqa: F821
        """
        List the manifest definitions. These can be defined using `define_manifest()`.

        Returns
        -------
        Iterable of GenevaManifest
            List of Geneva manifest definitions
        """
        from geneva.manifest.mgr import ManifestConfigManager

        if self._manifest_manager is None:
            self._manifest_manager = ManifestConfigManager(self)
        return self._manifest_manager.list()

    def delete_manifest(self, name: str) -> None:  # noqa: F821
        """
        Delete a Geneva manifest definition.

        Parameters
        ----------
        name: str
            Name of the manifest to delete.
        """
        from geneva.manifest.mgr import ManifestConfigManager

        if self._manifest_manager is None:
            self._manifest_manager = ManifestConfigManager(self)

        self._manifest_manager.delete(name)

    def context(
        self,
        cluster: str | None = None,
        manifest: str | None = None,
        cluster_type=GenevaClusterType.KUBE_RAY,
        on_exit=None,
    ) -> contextlib.AbstractContextManager[None]:
        """Context manager for a Geneva Execution Environment.
            This will provision a cluster based on the cluster
            definition and the manifest provided.
            By default, the context manager will delete the cluster on exit.
            This can be configured with the on_exit parameter.
        Parameters
        ----------
        cluster: str
            Name of the persisted cluster definition to use. This will
            raise an exception if the cluster definition was not
            defined via `define_cluster()`. This parameter is ignored
            if `cluster_type` is `GenevaClusterType.LOCAL_RAY`.
        manifest: str
            Name of the persisted manifest to use. This will
            raise an exception if the manifest definition was not
            defined via `define_manifest()`.
        cluster_type: GenevaClusterType, optional, default GenevaClusterType.KUBE_RAY
            Type of the cluster to use. By default, KUBE_RAY will be used.
            To start a local Ray cluster, use `GenevaClusterType.LOCAL_RAY`.
        on_exit: ExitMode, optional, default ExitMode.DELETE
            Exit mode for the cluster. By default, the cluster will be deleted when the
            context manager exits.
            To retain the cluster on errors, use `ExitMode.DELETE_ON_SUCCESS`.
            To always retain the cluster, use `ExitMode.RETAIN`.
        """
        from geneva.cluster.mgr import ClusterConfigManager
        from geneva.manifest.mgr import ManifestConfigManager
        from geneva.runners.ray._mgr import ray_cluster
        from geneva.runners.ray.raycluster import ExitMode

        if self._cluster_manager is None:
            self._cluster_manager = ClusterConfigManager(self)
        if self._manifest_manager is None:
            self._manifest_manager = ManifestConfigManager(self)

        if GenevaClusterType(cluster_type) == GenevaClusterType.LOCAL_RAY:
            if manifest is not None:
                raise ValueError("custom manifest not supported with LOCAL_RAY")
            return ray_cluster(local=True)

        if cluster is None:
            raise ValueError("cluster name is required for non-LOCAL_RAY cluster types")
        cluster_def = self._cluster_manager.load(cluster)
        if cluster_def is None:
            raise Exception(
                f"cluster definition '{cluster}' not found. "
                f"Create a new cluster with define_cluster()"
            )

        supported_types = {GenevaClusterType.KUBE_RAY}
        if GenevaClusterType(cluster_def.cluster_type) not in supported_types:
            raise ValueError(
                f"cluster_type must be one of {supported_types} to use context()"
            )
        c = cluster_def.as_dict()
        use_portforwarding = c["kuberay"].get("use_portforwarding", True)
        rc = cluster_def.to_ray_cluster()
        rc.on_exit = on_exit or ExitMode.DELETE

        manifest_def = None
        if manifest is not None:
            manifest_def = self._manifest_manager.load(manifest)
            if manifest_def is None:
                raise Exception(
                    f"manifest definition '{manifest}' not found. "
                    f"Create a new manifest with define_manifest()"
                )

        return ray_cluster(
            use_portforwarding=use_portforwarding, ray_cluster=rc, manifest=manifest_def
        )

    def sql(self, query: str) -> pa.Table:
        """Execute a raw SQL query.

        It uses the Flight SQL engine to execute the query.

        Parameters
        ----------
        query: str
            SQL query to execute

        Returns
        -------
        pyarrow.Table
            Result of the query in a `pyarrow.Table`

        TODO
        ----
        - Support pagination
        - Support query parameters
        """
        info = self.flight_client.execute(query)
        return self.flight_client.do_get(info.endpoints[0].ticket).read_all()

flight_client

flight_client: FlightSQLClient

close

close() -> None

Close the connection.

Source code in geneva/db.py
def close(self) -> None:
    """Close the connection."""
    if self._flight_client is not None:
        self._flight_client.close()
    if self._ldb is not None and hasattr(self._ldb, "_conn"):
        # go to the async client and eagerly close the connection
        self._ldb._conn.close()  # type: ignore[attr-defined]

table_names

table_names(
    page_token: str | None = None,
    limit: int | None = None,
    *args,
    **kwargs,
) -> Iterable[str]

List all available tables and views.

Source code in geneva/db.py
@override
def table_names(
    self, page_token: str | None = None, limit: int | None = None, *args, **kwargs
) -> Iterable[str]:
    """List all available tables and views."""
    return self._connect.table_names(
        *args, page_token=page_token, limit=limit or 10, **kwargs
    )

open_table

open_table(
    name: str,
    storage_options: dict[str, str] | None = None,
    index_cache_size: int | None = None,
    version: int | None = None,
    *args,
    **kwargs,
) -> Table

Open a Lance Table.

Parameters:

  • name (str) –

    Name of the table.

  • storage_options (dict[str, str] | None, default: None ) –

    Additional options for the storage backend. Options already set on the connection will be inherited by the table, but can be overridden here. See available options at https://lancedb.github.io/lancedb/guides/storage/

Source code in geneva/db.py
@override
def open_table(
    self,
    name: str,
    storage_options: dict[str, str] | None = None,
    index_cache_size: int | None = None,
    version: int | None = None,
    *args,
    **kwargs,
) -> "Table":
    """Open a Lance Table.

    Parameters
    ----------
    name: str
        Name of the table.
    storage_options: dict[str, str], optional
        Additional options for the storage backend.
        Options already set on the connection will be inherited by the table,
        but can be overridden here. See available options at
        [https://lancedb.github.io/lancedb/guides/storage/](https://lancedb.github.io/lancedb/guides/storage/)


    """
    from .table import Table

    storage_options = storage_options or self._storage_options

    return Table(
        self,
        name,
        index_cache_size=index_cache_size,
        storage_options=storage_options,
        version=version,
    )

create_table

create_table(
    name: str,
    data: DATA | None = None,
    schema: Schema | LanceModel | None = None,
    mode: str = "create",
    exist_ok: bool = False,
    on_bad_vectors: str = "error",
    fill_value: float = 0.0,
    storage_options: dict[str, str] | None = None,
    *args,
    **kwargs,
) -> Table

Create a Table in the lake

Parameters:

  • name (str) –

    The name of the table

  • data (DATA | None, default: None ) –

    User must provide at least one of data or schema. Acceptable types are:

    • list-of-dict
    • pandas.DataFrame
    • pyarrow.Table or pyarrow.RecordBatch
  • schema (Schema | LanceModel | None, default: None ) –

    Acceptable types are:

    • pyarrow.Schema
    • [LanceModel][lancedb.pydantic.LanceModel]
  • mode (str, default: 'create' ) –

    The mode to use when creating the table. Can be either "create" or "overwrite". By default, if the table already exists, an exception is raised. If you want to overwrite the table, use mode="overwrite".

  • exist_ok (bool, default: False ) –

    If a table by the same name already exists, then raise an exception if exist_ok=False. If exist_ok=True, then open the existing table; it will not add the provided data but will validate against any schema that's specified.

  • on_bad_vectors (str, default: 'error' ) –

    What to do if any of the vectors are not the same size or contain NaNs. One of "error", "drop", "fill".

Source code in geneva/db.py
@override
def create_table(  # type: ignore
    self,
    name: str,
    data: DATA | None = None,
    schema: pa.Schema | LanceModel | None = None,
    mode: str = "create",
    exist_ok: bool = False,
    on_bad_vectors: str = "error",
    fill_value: float = 0.0,
    storage_options: dict[str, str] | None = None,
    *args,
    **kwargs,
) -> "Table":  # type: ignore
    """Create a Table in the lake

    Parameters
    ----------
    name: str
        The name of the table
    data: The data to initialize the table, *optional*
        User must provide at least one of `data` or `schema`.
        Acceptable types are:

        - list-of-dict
        - pandas.DataFrame
        - pyarrow.Table or pyarrow.RecordBatch
    schema: The schema of the table, *optional*
        Acceptable types are:

        - pyarrow.Schema
        - [LanceModel][lancedb.pydantic.LanceModel]
    mode: str; default "create"
        The mode to use when creating the table.
        Can be either "create" or "overwrite".
        By default, if the table already exists, an exception is raised.
        If you want to overwrite the table, use mode="overwrite".
    exist_ok: bool, default False
        If a table by the same name already exists, then raise an exception
        if exist_ok=False. If exist_ok=True, then open the existing table;
        it will not add the provided data but will validate against any
        schema that's specified.
    on_bad_vectors: str, default "error"
        What to do if any of the vectors are not the same size or contain NaNs.
        One of "error", "drop", "fill".
    """
    from .table import Table

    self._connect.create_table(
        name,
        data,
        schema,
        mode,
        *args,
        exist_ok=exist_ok,
        on_bad_vectors=on_bad_vectors,
        fill_value=fill_value,
        storage_options=storage_options,
        **kwargs,
    )
    return Table(self, name, storage_options=storage_options)

create_view

create_view(
    name: str, query: str, materialized: bool = False
) -> Table

Create a View from a Query.

Parameters:

  • name (str) –

    Name of the view.

  • query (str) –

    SQL query to create the view.

  • materialized (bool, default: False ) –

    If True, the view is materialized.

Source code in geneva/db.py
def create_view(
    self,
    name: str,
    query: str,
    materialized: bool = False,
) -> "Table":
    """Create a View from a Query.

    Parameters
    ----------
    name: str
        Name of the view.
    query: str
        SQL query to create the view.
    materialized: bool, optional
        If True, the view is materialized.
    """
    if materialized:
        # idea, rename the provided name, and use it as the basis for the
        # materialized view.
        # - how do we add the udfs to the final materialized view table?
        NotImplementedError(
            "creating materialized view via sql query is not supported yet."
        )

    # TODO add test coverage here
    self.sql(f"CREATE VIEW {name} AS ({query})")
    return self.open_table(name)

create_materialized_view

create_materialized_view(
    name: str,
    query: GenevaQueryBuilder,
    with_no_data: bool = True,
) -> Table

Create a materialized view

Parameters:

  • name (str) –

    Name of the materialized view.

  • query (GenevaQueryBuilder) –

    Query to create the view.

  • with_no_data (bool, default: True ) –

    If True, the view is materialized, if false it is ready for refresh.

Source code in geneva/db.py
def create_materialized_view(
    self,
    name: str,
    query: "GenevaQueryBuilder",
    with_no_data: bool = True,
) -> "Table":
    """
    Create a materialized view

    Parameters
    ----------
    name: str
        Name of the materialized view.
    query: GenevaQueryBuilder
        Query to create the view.
    with_no_data: bool, optional
        If True, the view is materialized, if false it is ready for refresh.
    """
    from geneva.query import GenevaQueryBuilder

    if not isinstance(query, GenevaQueryBuilder):
        raise ValueError(
            "Materialized views only support plain queries (where, select)"
        )

    tbl = query.create_materialized_view(self, name)
    if not with_no_data and hasattr(tbl, "refresh_view"):
        tbl.refresh_view(name)  # type: ignore[attr-defined]

    return tbl

drop_view

drop_view(name: str) -> Table

Drop a view.

Source code in geneva/db.py
def drop_view(self, name: str) -> pa.Table:
    """Drop a view."""
    return self.sql(f"DROP VIEW {name}")

drop_table

drop_table(name: str, *args, **kwargs) -> None

Drop a table.

Source code in geneva/db.py
@override
def drop_table(self, name: str, *args, **kwargs) -> None:
    """Drop a table."""
    self._connect.drop_table(name, *args, **kwargs)

define_cluster

define_cluster(name: str, cluster: GenevaCluster) -> None

Define a persistent Geneva cluster. This will upsert the cluster definition by name. The cluster can then be provisioned using context(cluster=name).

Parameters:

  • name (str) –

    Name of the cluster. This will be used as the key when upserting and provisioning the cluster. The cluster name must comply with RFC 12123.

  • cluster (GenevaCluster) –

    The cluster definition to store.

Source code in geneva/db.py
def define_cluster(self, name: str, cluster: "GenevaCluster") -> None:  # noqa: F821
    """
    Define a persistent Geneva cluster. This will upsert the cluster definition by
    name. The cluster can then be provisioned using `context(cluster=name)`.

    Parameters
    ----------
    name: str
        Name of the cluster. This will be used as the key when upserting and
        provisioning the cluster. The cluster name must comply with RFC 12123.
    cluster: GenevaCluster
        The cluster definition to store.
    """
    from geneva.cluster.mgr import ClusterConfigManager

    if self._cluster_manager is None:
        self._cluster_manager = ClusterConfigManager(self)

    cluster.name = name
    cluster.validate()
    self._cluster_manager.upsert(cluster)

list_clusters

list_clusters() -> list[GenevaCluster]

List the cluster definitions. These can be defined using define_cluster().

Returns:

  • Iterable of GenevaCluster

    List of Geneva cluster definitions

Source code in geneva/db.py
def list_clusters(self) -> list["GenevaCluster"]:  # noqa: F821
    """
    List the cluster definitions. These can be defined using `define_cluster()`.

    Returns
    -------
    Iterable of GenevaCluster
        List of Geneva cluster definitions
    """
    from geneva.cluster.mgr import ClusterConfigManager

    if self._cluster_manager is None:
        self._cluster_manager = ClusterConfigManager(self)
    return self._cluster_manager.list()

delete_cluster

delete_cluster(name: str) -> None

Delete a Geneva cluster definition.

Parameters:

  • name (str) –

    Name of the cluster to delete.

Source code in geneva/db.py
def delete_cluster(self, name: str) -> None:  # noqa: F821
    """
    Delete a Geneva cluster definition.

    Parameters
    ----------
    name: str
        Name of the cluster to delete.
    """
    from geneva.cluster.mgr import ClusterConfigManager

    if self._cluster_manager is None:
        self._cluster_manager = ClusterConfigManager(self)

    self._cluster_manager.delete(name)

define_manifest

define_manifest(
    name: str,
    manifest: GenevaManifest,
    uploader: Uploader | None = None,
) -> None

Define a persistent Geneva Manifest that represents the files and dependencies used in the execution environment. This will upsert the manifest definition by name and upload the required artifacts. The manifest can then be used with context(manifest=name).

Parameters:

  • name (str) –

    Name of the manifest. This will be used as the key when upserting and loading the manifest.

  • manifest (GenevaManifest) –

    The manifest definition to use.

  • uploader (Uploader | None, default: None ) –

    An optional, custom Uploader to use. If not provided, the uploader will be auto-detected based on the environment configuration.

Source code in geneva/db.py
def define_manifest(
    self,
    name: str,
    manifest: "GenevaManifest",  # noqa: F821
    uploader: Uploader | None = None,
) -> None:
    """
    Define a persistent Geneva Manifest that represents the files and dependencies
    used in the execution environment. This will upsert the manifest definition by
    name and upload the required artifacts. The manifest can then be used with
    `context(manifest=name)`.

    Parameters
    ----------
    name: str
        Name of the manifest. This will be used as the key when upserting and
        loading the manifest.
    manifest: GenevaManifest
        The manifest definition to use.
    uploader: Uploader, optional
        An optional, custom Uploader to use. If not provided, the uploader will be
        auto-detected based on the
        environment configuration.
    """
    from geneva.manifest.mgr import ManifestConfigManager

    if self._manifest_manager is None:
        self._manifest_manager = ManifestConfigManager(self)

    with upload_local_env(
        # todo: implement excludes
        uploader=uploader,
        zip_output_dir=manifest.local_zip_output_dir,
        delete_local_zips=manifest.delete_local_zips,
        skip_site_packages=manifest.skip_site_packages,
    ) as zips:
        m = copy.deepcopy(manifest)
        m.name = name
        m.zips = zips
        m.checksum = manifest.compute_checksum()
        self._manifest_manager.upsert(m)

list_manifests

list_manifests() -> list[GenevaManifest]

List the manifest definitions. These can be defined using define_manifest().

Returns:

  • Iterable of GenevaManifest

    List of Geneva manifest definitions

Source code in geneva/db.py
def list_manifests(self) -> list["GenevaManifest"]:  # noqa: F821
    """
    List the manifest definitions. These can be defined using `define_manifest()`.

    Returns
    -------
    Iterable of GenevaManifest
        List of Geneva manifest definitions
    """
    from geneva.manifest.mgr import ManifestConfigManager

    if self._manifest_manager is None:
        self._manifest_manager = ManifestConfigManager(self)
    return self._manifest_manager.list()

delete_manifest

delete_manifest(name: str) -> None

Delete a Geneva manifest definition.

Parameters:

  • name (str) –

    Name of the manifest to delete.

Source code in geneva/db.py
def delete_manifest(self, name: str) -> None:  # noqa: F821
    """
    Delete a Geneva manifest definition.

    Parameters
    ----------
    name: str
        Name of the manifest to delete.
    """
    from geneva.manifest.mgr import ManifestConfigManager

    if self._manifest_manager is None:
        self._manifest_manager = ManifestConfigManager(self)

    self._manifest_manager.delete(name)

context

context(
    cluster: str | None = None,
    manifest: str | None = None,
    cluster_type=KUBE_RAY,
    on_exit=None,
) -> AbstractContextManager[None]

Context manager for a Geneva Execution Environment. This will provision a cluster based on the cluster definition and the manifest provided. By default, the context manager will delete the cluster on exit. This can be configured with the on_exit parameter.

Parameters:

  • cluster (str | None, default: None ) –

    Name of the persisted cluster definition to use. This will raise an exception if the cluster definition was not defined via define_cluster(). This parameter is ignored if cluster_type is GenevaClusterType.LOCAL_RAY.

  • manifest (str | None, default: None ) –

    Name of the persisted manifest to use. This will raise an exception if the manifest definition was not defined via define_manifest().

  • cluster_type

    Type of the cluster to use. By default, KUBE_RAY will be used. To start a local Ray cluster, use GenevaClusterType.LOCAL_RAY.

  • on_exit

    Exit mode for the cluster. By default, the cluster will be deleted when the context manager exits. To retain the cluster on errors, use ExitMode.DELETE_ON_SUCCESS. To always retain the cluster, use ExitMode.RETAIN.

Source code in geneva/db.py
def context(
    self,
    cluster: str | None = None,
    manifest: str | None = None,
    cluster_type=GenevaClusterType.KUBE_RAY,
    on_exit=None,
) -> contextlib.AbstractContextManager[None]:
    """Context manager for a Geneva Execution Environment.
        This will provision a cluster based on the cluster
        definition and the manifest provided.
        By default, the context manager will delete the cluster on exit.
        This can be configured with the on_exit parameter.
    Parameters
    ----------
    cluster: str
        Name of the persisted cluster definition to use. This will
        raise an exception if the cluster definition was not
        defined via `define_cluster()`. This parameter is ignored
        if `cluster_type` is `GenevaClusterType.LOCAL_RAY`.
    manifest: str
        Name of the persisted manifest to use. This will
        raise an exception if the manifest definition was not
        defined via `define_manifest()`.
    cluster_type: GenevaClusterType, optional, default GenevaClusterType.KUBE_RAY
        Type of the cluster to use. By default, KUBE_RAY will be used.
        To start a local Ray cluster, use `GenevaClusterType.LOCAL_RAY`.
    on_exit: ExitMode, optional, default ExitMode.DELETE
        Exit mode for the cluster. By default, the cluster will be deleted when the
        context manager exits.
        To retain the cluster on errors, use `ExitMode.DELETE_ON_SUCCESS`.
        To always retain the cluster, use `ExitMode.RETAIN`.
    """
    from geneva.cluster.mgr import ClusterConfigManager
    from geneva.manifest.mgr import ManifestConfigManager
    from geneva.runners.ray._mgr import ray_cluster
    from geneva.runners.ray.raycluster import ExitMode

    if self._cluster_manager is None:
        self._cluster_manager = ClusterConfigManager(self)
    if self._manifest_manager is None:
        self._manifest_manager = ManifestConfigManager(self)

    if GenevaClusterType(cluster_type) == GenevaClusterType.LOCAL_RAY:
        if manifest is not None:
            raise ValueError("custom manifest not supported with LOCAL_RAY")
        return ray_cluster(local=True)

    if cluster is None:
        raise ValueError("cluster name is required for non-LOCAL_RAY cluster types")
    cluster_def = self._cluster_manager.load(cluster)
    if cluster_def is None:
        raise Exception(
            f"cluster definition '{cluster}' not found. "
            f"Create a new cluster with define_cluster()"
        )

    supported_types = {GenevaClusterType.KUBE_RAY}
    if GenevaClusterType(cluster_def.cluster_type) not in supported_types:
        raise ValueError(
            f"cluster_type must be one of {supported_types} to use context()"
        )
    c = cluster_def.as_dict()
    use_portforwarding = c["kuberay"].get("use_portforwarding", True)
    rc = cluster_def.to_ray_cluster()
    rc.on_exit = on_exit or ExitMode.DELETE

    manifest_def = None
    if manifest is not None:
        manifest_def = self._manifest_manager.load(manifest)
        if manifest_def is None:
            raise Exception(
                f"manifest definition '{manifest}' not found. "
                f"Create a new manifest with define_manifest()"
            )

    return ray_cluster(
        use_portforwarding=use_portforwarding, ray_cluster=rc, manifest=manifest_def
    )

sql

sql(query: str) -> Table

Execute a raw SQL query.

It uses the Flight SQL engine to execute the query.

Parameters:

  • query (str) –

    SQL query to execute

Returns:

  • Table

    Result of the query in a pyarrow.Table

TODO
  • Support pagination
  • Support query parameters
Source code in geneva/db.py
def sql(self, query: str) -> pa.Table:
    """Execute a raw SQL query.

    It uses the Flight SQL engine to execute the query.

    Parameters
    ----------
    query: str
        SQL query to execute

    Returns
    -------
    pyarrow.Table
        Result of the query in a `pyarrow.Table`

    TODO
    ----
    - Support pagination
    - Support query parameters
    """
    info = self.flight_client.execute(query)
    return self.flight_client.do_get(info.endpoints[0].ticket).read_all()