- class lance.BlobColumn(blob_column: Array | ChunkedArray)¶
A utility to wrap a Pyarrow binary column and iterate over the rows as file-like objects.
This can be useful for working with medium-to-small binary objects that need to interface with APIs that expect file-like objects. For very large binary objects (4-8MB or more per value) you might be better off creating a blob column and using
lance.Dataset.take_blobs()
to access the blob data.
- class lance.BlobFile(inner: LanceBlobFile)¶
Represents a blob in a Lance dataset as a file-like object.
- close() None ¶
Flush and close the IO object.
This method has no effect if the file is already closed.
- property closed : bool¶
- readable() bool ¶
Return whether object was opened for reading.
If False, read() will raise OSError.
- readall() bytes ¶
Read until EOF, using multiple read() call.
-
seek(offset: int, whence: int =
0
) int ¶ Change the stream position to the given byte offset.
- offset
The stream position, relative to ‘whence’.
- whence
The relative position to seek from.
The offset is interpreted relative to the position indicated by whence. Values for whence are:
os.SEEK_SET or 0 – start of stream (the default); offset should be zero or positive
os.SEEK_CUR or 1 – current stream position; offset may be negative
os.SEEK_END or 2 – end of stream; offset is usually negative
Return the new absolute position.
- seekable() bool ¶
Return whether object supports random access.
If False, seek(), tell() and truncate() will raise OSError. This method may need to do a test seek().
- size() int ¶
Returns the size of the blob in bytes.
- tell() int ¶
Return current stream position.
- class lance.DataStatistics(fields: FieldStatistics)¶
Statistics about the data in the dataset
- fields : FieldStatistics¶
Statistics about the fields in the dataset
- class lance.FieldStatistics(id: int, bytes_on_disk: int)¶
Statistics about a field in the dataset
- bytes_on_disk : int¶
(possibly compressed) bytes on disk used to store the field
- id : int¶
id of the field
-
class lance.FragmentMetadata(id: int, files: list[DataFile], physical_rows: int, deletion_file: DeletionFile | None =
None
, row_id_meta: RowIdMeta | None =None
)¶ Metadata for a fragment.
- id¶
The ID of the fragment.
- Type:
int
- files¶
The data files of the fragment. Each data file must have the same number of rows. Each file stores a different subset of the columns.
- Type:
List[DataFile]
- physical_rows¶
The number of rows originally in this fragment. This is the number of rows in the data files before deletions.
- Type:
int
- deletion_file¶
The deletion file, if any.
- Type:
Optional[DeletionFile]
- row_id_meta¶
The row id metadata, if any.
- Type:
Optional[RowIdMeta]
- data_files() list[DataFile] ¶
-
deletion_file : DeletionFile | None =
None
¶
- files : List[DataFile]¶
- static from_json(json_data: str) FragmentMetadata ¶
- id : int¶
- property num_deletions : int¶
The number of rows that have been deleted from this fragment.
- property num_rows : int¶
The number of rows in this fragment after deletions.
- physical_rows : int¶
-
row_id_meta : RowIdMeta | None =
None
¶
- to_json() dict ¶
Get this as a simple JSON-serializable dictionary.
-
class lance.LanceDataset(uri: str | Path, version: int | str | None =
None
, block_size: int | None =None
, index_cache_size: int | None =None
, metadata_cache_size: int | None =None
, commit_lock: CommitLock | None =None
, storage_options: dict[str, str] | None =None
, serialized_manifest: bytes | None =None
, default_scan_options: dict[str, Any] | None =None
)¶ A Lance Dataset in Lance format where the data is stored at the given uri.
-
add_columns(transforms: dict[str, str] | BatchUDF | ReaderLike, read_columns: list[str] | None =
None
, reader_schema: pa.Schema | None =None
, batch_size: int | None =None
)¶ Add new columns with defined values.
There are several ways to specify the new columns. First, you can provide SQL expressions for each new column. Second you can provide a UDF that takes a batch of existing data and returns a new batch with the new columns. These new columns will be appended to the dataset.
You can also provide a RecordBatchReader which will read the new column values from some external source. This is often useful when the new column values have already been staged to files (often by some distributed process)
See the
lance.add_columns_udf()
decorator for more information on writing UDFs.- Parameters:
- transforms : dict or AddColumnsUDF or ReaderLike¶
If this is a dictionary, then the keys are the names of the new columns and the values are SQL expression strings. These strings can reference existing columns in the dataset. If this is a AddColumnsUDF, then it is a UDF that takes a batch of existing data and returns a new batch with the new columns.
- read_columns : list of str, optional¶
The names of the columns that the UDF will read. If None, then the UDF will read all columns. This is only used when transforms is a UDF. Otherwise, the read columns are inferred from the SQL expressions.
- reader_schema : pa.Schema, optional¶
Only valid if transforms is a ReaderLike object. This will be used to determine the schema of the reader.
- batch_size : int, optional¶
The number of rows to read at a time from the source dataset when applying the transform. This is ignored if the dataset is a v1 dataset.
Examples
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2, 3]}) >>> dataset = lance.write_dataset(table, "my_dataset") >>> @lance.batch_udf() ... def double_a(batch): ... df = batch.to_pandas() ... return pd.DataFrame({'double_a': 2 * df['a']}) >>> dataset.add_columns(double_a) >>> dataset.to_table().to_pandas() a double_a 0 1 2 1 2 4 2 3 6 >>> dataset.add_columns({"triple_a": "a * 3"}) >>> dataset.to_table().to_pandas() a double_a triple_a 0 1 2 3 1 2 4 6 2 3 6 9
See also
LanceDataset.merge
Merge a pre-computed set of columns into the dataset.
- alter_columns(*alterations: Iterable[AlterColumn])¶
Alter column name, data type, and nullability.
Columns that are renamed can keep any indices that are on them. If a column has an IVF_PQ index, it can be kept if the column is casted to another type. However, other index types don’t support casting at this time.
Column types can be upcasted (such as int32 to int64) or downcasted (such as int64 to int32). However, downcasting will fail if there are any values that cannot be represented in the new type. In general, columns can be casted to same general type: integers to integers, floats to floats, and strings to strings. However, strings, binary, and list columns can be casted between their size variants. For example, string to large string, binary to large binary, and list to large list.
Columns that are renamed can keep any indices that are on them. However, if the column is casted to a different type, it’s indices will be dropped.
- Parameters:
- alterations : Iterable[Dict[str, Any]]¶
A sequence of dictionaries, each with the following keys:
- ”path”: str
The column path to alter. For a top-level column, this is the name. For a nested column, this is the dot-separated path, e.g. “a.b.c”.
- ”name”: str, optional
The new name of the column. If not specified, the column name is not changed.
- ”nullable”: bool, optional
Whether the column should be nullable. If not specified, the column nullability is not changed. Only non-nullable columns can be changed to nullable. Currently, you cannot change a nullable column to non-nullable.
- ”data_type”: pyarrow.DataType, optional
The new data type to cast the column to. If not specified, the column data type is not changed.
Examples
>>> import lance >>> import pyarrow as pa >>> schema = pa.schema([pa.field('a', pa.int64()), ... pa.field('b', pa.string(), nullable=False)]) >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example") >>> dataset.alter_columns({"path": "a", "name": "x"}, ... {"path": "b", "nullable": True}) >>> dataset.to_table().to_pandas() x b 0 1 a 1 2 b 2 3 c >>> dataset.alter_columns({"path": "x", "data_type": pa.int32()}) >>> dataset.schema x: int32 b: string
- checkout_version(version: int | str) LanceDataset ¶
Load the given version of the dataset.
Unlike the
dataset()
constructor, this will re-use the current cache. This is a no-op if the dataset is already at the given version.- Parameters:
- version : int | str,¶
The version to check out. A version number (int) or a tag (str) can be provided.
- Return type:
-
cleanup_old_versions(older_than: timedelta | None =
None
, *, delete_unverified: bool =False
, error_if_tagged_old_versions: bool =True
) CleanupStats ¶ Cleans up old versions of the dataset.
Some dataset changes, such as overwriting, leave behind data that is not referenced by the latest dataset version. The old data is left in place to allow the dataset to be restored back to an older version.
This method will remove older versions and any data files they reference. Once this cleanup task has run you will not be able to checkout or restore these older versions.
- Parameters:
- older_than : timedelta, optional¶
Only versions older than this will be removed. If not specified, this will default to two weeks.
- delete_unverified : bool, default False¶
Files leftover from a failed transaction may appear to be part of an in-progress operation (e.g. appending new data) and these files will not be deleted unless they are at least 7 days old. If delete_unverified is True then these files will be deleted regardless of their age.
This should only be set to True if you can guarantee that no other process is currently working on this dataset. Otherwise the dataset could be put into a corrupted state.
- error_if_tagged_old_versions : bool, default True¶
Some versions may have tags associated with them. Tagged versions will not be cleaned up, regardless of how old they are. If this argument is set to True (the default), an exception will be raised if any tagged versions match the parameters. Otherwise, tagged versions will be ignored without any error and only untagged versions will be cleaned up.
-
static commit(base_uri: str | Path | LanceDataset, operation: LanceOperation.BaseOperation | Transaction, blobs_op: LanceOperation.BaseOperation | None =
None
, read_version: int | None =None
, commit_lock: CommitLock | None =None
, storage_options: dict[str, str] | None =None
, enable_v2_manifest_paths: bool | None =None
, detached: bool | None =False
, max_retries: int =20
) LanceDataset ¶ Create a new version of dataset
This method is an advanced method which allows users to describe a change that has been made to the data files. This method is not needed when using Lance to apply changes (e.g. when using
LanceDataset
orwrite_dataset()
.)It’s current purpose is to allow for changes being made in a distributed environment where no single process is doing all of the work. For example, a distributed bulk update or a distributed bulk modify operation.
Once all of the changes have been made, this method can be called to make the changes visible by updating the dataset manifest.
Warning
This is an advanced API and doesn’t provide the same level of validation as the other APIs. For example, it’s the responsibility of the caller to ensure that the fragments are valid for the schema.
- Parameters:
- base_uri : str, Path, or LanceDataset¶
The base uri of the dataset, or the dataset object itself. Using the dataset object can be more efficient because it can re-use the file metadata cache.
- operation : BaseOperation¶
The operation to apply to the dataset. This describes what changes have been made. See available operations under
LanceOperation
.- read_version : int, optional¶
The version of the dataset that was used as the base for the changes. This is not needed for overwrite or restore operations.
- commit_lock : CommitLock, optional¶
A custom commit lock. Only needed if your object store does not support atomic commits. See the user guide for more details.
- storage_options : optional, dict¶
Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.
- enable_v2_manifest_paths : bool, optional¶
If True, and this is a new dataset, uses the new V2 manifest paths. These paths provide more efficient opening of datasets with many versions on object stores. This parameter has no effect if the dataset already exists. To migrate an existing dataset, instead use the
migrate_manifest_paths_v2()
method. Default is False. WARNING: turning this on will make the dataset unreadable for older versions of Lance (prior to 0.17.0).- detached : bool, optional¶
If True, then the commit will not be part of the dataset lineage. It will never show up as the latest dataset and the only way to check it out in the future will be to specifically check it out by version. The version will be a random version that is only unique amongst detached commits. The caller should store this somewhere as there will be no other way to obtain it in the future.
- max_retries : int¶
The maximum number of retries to perform when committing the dataset.
- Returns:
A new version of Lance Dataset.
- Return type:
Examples
Creating a new dataset with the
LanceOperation.Overwrite
operation:>>> import lance >>> import pyarrow as pa >>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]}) >>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]}) >>> fragment1 = lance.fragment.LanceFragment.create("example", tab1) >>> fragment2 = lance.fragment.LanceFragment.create("example", tab2) >>> fragments = [fragment1, fragment2] >>> operation = lance.LanceOperation.Overwrite(tab1.schema, fragments) >>> dataset = lance.LanceDataset.commit("example", operation) >>> dataset.to_table().to_pandas() a b 0 1 a 1 2 b 2 3 c 3 4 d
-
static commit_batch(dest: str | Path | LanceDataset, transactions: collections.abc.Sequence[Transaction], commit_lock: CommitLock | None =
None
, storage_options: dict[str, str] | None =None
, enable_v2_manifest_paths: bool | None =None
, detached: bool | None =False
, max_retries: int =20
) BulkCommitResult ¶ Create a new version of dataset with multiple transactions.
This method is an advanced method which allows users to describe a change that has been made to the data files. This method is not needed when using Lance to apply changes (e.g. when using
LanceDataset
orwrite_dataset()
.)- Parameters:
- dest : str, Path, or LanceDataset¶
The base uri of the dataset, or the dataset object itself. Using the dataset object can be more efficient because it can re-use the file metadata cache.
- transactions : Iterable[Transaction]¶
The transactions to apply to the dataset. These will be merged into a single transaction and applied to the dataset. Note: Only append transactions are currently supported. Other transaction types will be supported in the future.
- commit_lock : CommitLock, optional¶
A custom commit lock. Only needed if your object store does not support atomic commits. See the user guide for more details.
- storage_options : optional, dict¶
Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.
- enable_v2_manifest_paths : bool, optional¶
If True, and this is a new dataset, uses the new V2 manifest paths. These paths provide more efficient opening of datasets with many versions on object stores. This parameter has no effect if the dataset already exists. To migrate an existing dataset, instead use the
migrate_manifest_paths_v2()
method. Default is False. WARNING: turning this on will make the dataset unreadable for older versions of Lance (prior to 0.17.0).- detached : bool, optional¶
If True, then the commit will not be part of the dataset lineage. It will never show up as the latest dataset and the only way to check it out in the future will be to specifically check it out by version. The version will be a random version that is only unique amongst detached commits. The caller should store this somewhere as there will be no other way to obtain it in the future.
- max_retries : int¶
The maximum number of retries to perform when committing the dataset.
- Returns:
- dataset: LanceDataset
A new version of Lance Dataset.
- merged: Transaction
The merged transaction that was applied to the dataset.
- Return type:
dict with keys
-
count_rows(filter: Expression | str | None =
None
, **kwargs) int ¶ Count rows matching the scanner filter.
- Parameters:
- **kwargs : dict, optional¶
See py:method:scanner method for full parameter description.
- Returns:
count – The total number of rows in the dataset.
- Return type:
int
-
create_index(column: str | list[str], index_type: str, name: str | None =
None
, metric: str ='L2'
, replace: bool =False
, num_partitions: int | None =None
, ivf_centroids: np.ndarray | pa.FixedSizeListArray | pa.FixedShapeTensorArray | None =None
, pq_codebook: np.ndarray | pa.FixedSizeListArray | pa.FixedShapeTensorArray | None =None
, num_sub_vectors: int | None =None
, accelerator: str | 'torch.Device' | None =None
, index_cache_size: int | None =None
, shuffle_partition_batches: int | None =None
, shuffle_partition_concurrency: int | None =None
, ivf_centroids_file: str | None =None
, precomputed_partition_dataset: str | None =None
, storage_options: dict[str, str] | None =None
, filter_nan: bool =True
, one_pass_ivfpq: bool =False
, **kwargs) LanceDataset ¶ Create index on column.
Experimental API
- Parameters:
- column : str¶
The column to be indexed.
- index_type : str¶
The type of the index.
"IVF_PQ, IVF_HNSW_PQ and IVF_HNSW_SQ"
are supported now.- name : str, optional¶
The index name. If not provided, it will be generated from the column name.
- metric : str¶
The distance metric type, i.e., “L2” (alias to “euclidean”), “cosine” or “dot” (dot product). Default is “L2”.
- replace : bool¶
Replace the existing index if it exists.
- num_partitions : int, optional¶
The number of partitions of IVF (Inverted File Index).
- ivf_centroids : optional¶
It can be either
np.ndarray
,pyarrow.FixedSizeListArray
orpyarrow.FixedShapeTensorArray
. Anum_partitions x dimension
array of existing K-mean centroids for IVF clustering. If not provided, a new KMeans model will be trained.- pq_codebook : optional,¶
It can be
np.ndarray
,pyarrow.FixedSizeListArray
, orpyarrow.FixedShapeTensorArray
. Anum_sub_vectors x (2 ^ nbits * dimensions // num_sub_vectors)
array of K-mean centroids for PQ codebook.Note:
nbits
is always 8 for now. If not provided, a new PQ model will be trained.- num_sub_vectors : int, optional¶
The number of sub-vectors for PQ (Product Quantization).
- accelerator: str | 'torch.Device' | None =
None
¶ If set, use an accelerator to speed up the training process. Accepted accelerator: “cuda” (Nvidia GPU) and “mps” (Apple Silicon GPU). If not set, use the CPU.
- index_cache_size : int, optional¶
The size of the index cache in number of entries. Default value is 256.
- shuffle_partition_batches : int, optional¶
The number of batches, using the row group size of the dataset, to include in each shuffle partition. Default value is 10240.
Assuming the row group size is 1024, each shuffle partition will hold 10240 * 1024 = 10,485,760 rows. By making this value smaller, this shuffle will consume less memory but will take longer to complete, and vice versa.
- shuffle_partition_concurrency : int, optional¶
The number of shuffle partitions to process concurrently. Default value is 2
By making this value smaller, this shuffle will consume less memory but will take longer to complete, and vice versa.
- storage_options : optional, dict¶
Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.
- filter_nan : bool¶
Defaults to True. False is UNSAFE, and will cause a crash if any null/nan values are present (and otherwise will not). Disables the null filter used for nullable columns. Obtains a small speed boost.
- one_pass_ivfpq : bool¶
Defaults to False. If enabled, index type must be “IVF_PQ”. Reduces disk IO.
- **kwargs¶
Parameters passed to the index building process.
The SQ (Scalar Quantization) is available for only
IVF_HNSW_SQ
index type, this quantization method is used to reduce the memory usage of the index, it maps the float vectors to integer vectors, each integer is ofnum_bits
, now only 8 bits are supported.- If
index_type
is “IVF_*”, then the following parameters are required: num_partitions
- If
index_type
is with “PQ”, then the following parameters are required: num_sub_vectors
Optional parameters for IVF_PQ:
- ivf_centroids
Existing K-mean centroids for IVF clustering.
- num_bits
The number of bits for PQ (Product Quantization). Default is 8. Only 4, 8 are supported.
- Optional parameters for IVF_HNSW_*:
- max_level
Int, the maximum number of levels in the graph.
- m
Int, the number of edges per node in the graph.
- ef_construction
Int, the number of nodes to examine during the construction.
Examples
import lance dataset = lance.dataset("/tmp/sift.lance") dataset.create_index( "vector", "IVF_PQ", num_partitions=256, num_sub_vectors=16 )
import lance dataset = lance.dataset("/tmp/sift.lance") dataset.create_index( "vector", "IVF_HNSW_SQ", num_partitions=256, )
Experimental Accelerator (GPU) support:
- accelerate: use GPU to train IVF partitions.
Only supports CUDA (Nvidia) or MPS (Apple) currently. Requires PyTorch being installed.
import lance dataset = lance.dataset("/tmp/sift.lance") dataset.create_index( "vector", "IVF_PQ", num_partitions=256, num_sub_vectors=16, accelerator="cuda" )
References
-
create_scalar_index(column: str, index_type: 'BTREE' | 'BITMAP' | 'LABEL_LIST' | 'INVERTED' | 'FTS' | 'NGRAM', name: str | None =
None
, *, replace: bool =True
, **kwargs)¶ Create a scalar index on a column.
Scalar indices, like vector indices, can be used to speed up scans. A scalar index can speed up scans that contain filter expressions on the indexed column. For example, the following scan will be faster if the column
my_col
has a scalar index:import lance dataset = lance.dataset("/tmp/images.lance") my_table = dataset.scanner(filter="my_col != 7").to_table()
Vector search with pre-filers can also benefit from scalar indices. For example,
import lance dataset = lance.dataset("/tmp/images.lance") my_table = dataset.scanner( nearest=dict( column="vector", q=[1, 2, 3, 4], k=10, ) filter="my_col != 7", prefilter=True )
There are 5 types of scalar indices available today.
BTREE
. The most common type isBTREE
. This index is inspired by the btree data structure although only the first few layers of the btree are cached in memory. It will perform well on columns with a large number of unique values and few rows per value.BITMAP
. This index stores a bitmap for each unique value in the column. This index is useful for columns with a small number of unique values and many rows per value.LABEL_LIST
. A special index that is used to index list columns whose values have small cardinality. For example, a column that contains lists of tags (e.g.["tag1", "tag2", "tag3"]
) can be indexed with aLABEL_LIST
index. This index can only speedup queries witharray_has_any
orarray_has_all
filters.NGRAM
. A special index that is used to index string columns. This index creates a bitmap for each ngram in the string. By default we use trigrams. This index can currently speed up queries using thecontains
function in filters.FTS/INVERTED
. It is used to index document columns. This index can conduct full-text searches. For example, a column that contains any word of query string “hello world”. The results will be ranked by BM25.
Note that the
LANCE_BYPASS_SPILLING
environment variable can be used to bypass spilling to disk. Setting this to true can avoid memory exhaustion issues (see https://github.com/apache/datafusion/issues/10073 for more info).Experimental API
- Parameters:
- column : str¶
The column to be indexed. Must be a boolean, integer, float, or string column.
- index_type : str¶
The type of the index. One of
"BTREE"
,"BITMAP"
,"LABEL_LIST"
,"NGRAM"
,"FTS"
or"INVERTED"
.- name : str, optional¶
The index name. If not provided, it will be generated from the column name.
- replace : bool, default True¶
Replace the existing index if it exists.
- with_position : bool, default True
This is for the
INVERTED
index. If True, the index will store the positions of the words in the document, so that you can conduct phrase query. This will significantly increase the index size. It won’t impact the performance of non-phrase queries even if it is set to True.- base_tokenizer : str, default "simple"
This is for the
INVERTED
index. The base tokenizer to use. The value can be: * “simple”: splits tokens on whitespace and punctuation. * “whitespace”: splits tokens on whitespace. * “raw”: no tokenization.- language : str, default "English"
This is for the
INVERTED
index. The language for stemming and stop words. This is only used when stem or remove_stop_words is true- max_token_length : Optional[int], default 40
This is for the
INVERTED
index. The maximum token length. Any token longer than this will be removed.- lower_case : bool, default True
This is for the
INVERTED
index. If True, the index will convert all text to lowercase.- stem : bool, default False
This is for the
INVERTED
index. If True, the index will stem the tokens.- remove_stop_words : bool, default False
This is for the
INVERTED
index. If True, the index will remove stop words.- ascii_folding : bool, default False
This is for the
INVERTED
index. If True, the index will convert non-ascii characters to ascii characters if possible. This would remove accents like “é” -> “e”.
Examples
import lance dataset = lance.dataset("/tmp/images.lance") dataset.create_index( "category", "BTREE", )
Scalar indices can only speed up scans for basic filters using equality, comparison, range (e.g.
my_col BETWEEN 0 AND 100
), and set membership (e.g. my_col IN (0, 1, 2))Scalar indices can be used if the filter contains multiple indexed columns and the filter criteria are AND’d or OR’d together (e.g.
my_col < 0 AND other_col> 100
)Scalar indices may be used if the filter contains non-indexed columns but, depending on the structure of the filter, they may not be usable. For example, if the column
not_indexed
does not have a scalar index then the filtermy_col = 0 OR not_indexed = 1
will not be able to use any scalar index onmy_col
.To determine if a scan is making use of a scalar index you can use
explain_plan
to look at the query plan that lance has created. Queries that use scalar indices will either have aScalarIndexQuery
relation or aMaterializeIndex
operator.
- property data_storage_version : str¶
The version of the data storage format this dataset is using
- delete(predicate: str | Expression)¶
Delete rows from the dataset.
This marks rows as deleted, but does not physically remove them from the files. This keeps the existing indexes still valid.
- Parameters:
- predicate : str or pa.compute.Expression¶
The predicate to use to select rows to delete. May either be a SQL string or a pyarrow Expression.
Examples
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example") >>> dataset.delete("a = 1 or b in ('a', 'b')") >>> dataset.to_table() pyarrow.Table a: int64 b: string ---- a: [[3]] b: [["c"]]
-
static drop(base_uri: str | Path, storage_options: dict[str, str] | None =
None
) None ¶
- drop_columns(columns: list[str])¶
Drop one or more columns from the dataset
This is a metadata-only operation and does not remove the data from the underlying storage. In order to remove the data, you must subsequently call
compact_files
to rewrite the data without the removed columns and then callcleanup_old_versions
to remove the old files.- Parameters:
- columns : list of str¶
The names of the columns to drop. These can be nested column references (e.g. “a.b.c”) or top-level column names (e.g. “a”).
Examples
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example") >>> dataset.drop_columns(["a"]) >>> dataset.to_table().to_pandas() b 0 a 1 b 2 c
- drop_index(name: str)¶
Drops an index from the dataset
Note: Indices are dropped by “index name”. This is not the same as the field name. If you did not specify a name when you created the index then a name was generated for you. You can use the list_indices method to get the names of the indices.
- get_fragment(fragment_id: int) LanceFragment | None ¶
Get the fragment with fragment id.
-
get_fragments(filter: Expression | None =
None
) list[LanceFragment] ¶ Get all fragments from the dataset.
Note: filter is not supported yet.
- property has_index¶
- index_statistics(index_name: str) dict[str, Any] ¶
-
insert(data: ReaderLike, *, mode=
'append'
, **kwargs)¶ Insert data into the dataset.
- Parameters:
- data_obj : Reader-like
The data to be written. Acceptable types are: - Pandas DataFrame, Pyarrow Table, Dataset, Scanner, or RecordBatchReader - Huggingface dataset
- mode : str, default 'append'¶
- The mode to use when writing the data. Options are:
create - create a new dataset (raises if uri already exists). overwrite - create a new snapshot version append - create a new version that is the concat of the input the latest version (raises if uri does not exist)
- **kwargs : dict, optional¶
Additional keyword arguments to pass to
write_dataset()
.
-
join(right_dataset, keys, right_keys=
None
, join_type='left outer'
, left_suffix=None
, right_suffix=None
, coalesce_keys=True
, use_threads=True
)¶ Not implemented (just override pyarrow dataset to prevent segfault)
- property lance_schema : LanceSchema¶
The LanceSchema for this dataset
- property latest_version : int¶
Returns the latest version of the dataset.
- list_indices() list[Index] ¶
- property max_field_id : int¶
The max_field_id in manifest
-
merge(data_obj: ReaderLike, left_on: str, right_on: str | None =
None
, schema=None
)¶ Merge another dataset into this one.
Performs a left join, where the dataset is the left side and data_obj is the right side. Rows existing in the dataset but not on the left will be filled with null values, unless Lance doesn’t support null values for some types, in which case an error will be raised.
- Parameters:
- data_obj : Reader-like¶
The data to be merged. Acceptable types are: - Pandas DataFrame, Pyarrow Table, Dataset, Scanner, Iterator[RecordBatch], or RecordBatchReader
- left_on : str¶
The name of the column in the dataset to join on.
- right_on : str or None¶
The name of the column in data_obj to join on. If None, defaults to left_on.
Examples
>>> import lance >>> import pyarrow as pa >>> df = pa.table({'x': [1, 2, 3], 'y': ['a', 'b', 'c']}) >>> dataset = lance.write_dataset(df, "dataset") >>> dataset.to_table().to_pandas() x y 0 1 a 1 2 b 2 3 c >>> new_df = pa.table({'x': [1, 2, 3], 'z': ['d', 'e', 'f']}) >>> dataset.merge(new_df, 'x') >>> dataset.to_table().to_pandas() x y z 0 1 a d 1 2 b e 2 3 c f
See also
LanceDataset.add_columns
Add new columns by computing batch-by-batch.
- merge_insert(on: str | Iterable[str])¶
Returns a builder that can be used to create a “merge insert” operation
This operation can add rows, update rows, and remove rows in a single transaction. It is a very generic tool that can be used to create behaviors like “insert if not exists”, “update or insert (i.e. upsert)”, or even replace a portion of existing data with new data (e.g. replace all data where month=”january”)
The merge insert operation works by combining new data from a source table with existing data in a target table by using a join. There are three categories of records.
“Matched” records are records that exist in both the source table and the target table. “Not matched” records exist only in the source table (e.g. these are new data). “Not matched by source” records exist only in the target table (this is old data).
The builder returned by this method can be used to customize what should happen for each category of data.
Please note that the data will be reordered as part of this operation. This is because updated rows will be deleted from the dataset and then reinserted at the end with the new values. The order of the newly inserted rows may fluctuate randomly because a hash-join operation is used internally.
- Parameters:
- on : Union[str, Iterable[str]]¶
A column (or columns) to join on. This is how records from the source table and target table are matched. Typically this is some kind of key or id column.
Examples
Use when_matched_update_all() and when_not_matched_insert_all() to perform an “upsert” operation. This will update rows that already exist in the dataset and insert rows that do not exist.
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [2, 1, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example") >>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]}) >>> # Perform a "upsert" operation >>> dataset.merge_insert("a") \ ... .when_matched_update_all() \ ... .when_not_matched_insert_all() \ ... .execute(new_table) {'num_inserted_rows': 1, 'num_updated_rows': 2, 'num_deleted_rows': 0} >>> dataset.to_table().sort_by("a").to_pandas() a b 0 1 b 1 2 x 2 3 y 3 4 z
Use when_not_matched_insert_all() to perform an “insert if not exists” operation. This will only insert rows that do not already exist in the dataset.
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example2") >>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]}) >>> # Perform an "insert if not exists" operation >>> dataset.merge_insert("a") \ ... .when_not_matched_insert_all() \ ... .execute(new_table) {'num_inserted_rows': 1, 'num_updated_rows': 0, 'num_deleted_rows': 0} >>> dataset.to_table().sort_by("a").to_pandas() a b 0 1 a 1 2 b 2 3 c 3 4 z
You are not required to provide all the columns. If you only want to update a subset of columns, you can omit columns you don’t want to update. Omitted columns will keep their existing values if they are updated, or will be null if they are inserted.
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"], \ ... "c": ["x", "y", "z"]}) >>> dataset = lance.write_dataset(table, "example3") >>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]}) >>> # Perform an "upsert" operation, only updating column "a" >>> dataset.merge_insert("a") \ ... .when_matched_update_all() \ ... .when_not_matched_insert_all() \ ... .execute(new_table) {'num_inserted_rows': 1, 'num_updated_rows': 2, 'num_deleted_rows': 0} >>> dataset.to_table().sort_by("a").to_pandas() a b c 0 1 a x 1 2 x y 2 3 y z 3 4 z None
- migrate_manifest_paths_v2()¶
Migrate the manifest paths to the new format.
This will update the manifest to use the new v2 format for paths.
This function is idempotent, and can be run multiple times without changing the state of the object store.
DANGER: this should not be run while other concurrent operations are happening. And it should also run until completion before resuming other operations.
- property optimize : DatasetOptimizer¶
- property partition_expression¶
Not implemented (just override pyarrow dataset to prevent segfault)
- replace_field_metadata(field_name: str, new_metadata: dict[str, str])¶
Replace the metadata of a field in the schema
- replace_schema(schema: Schema)¶
Not implemented (just override pyarrow dataset to prevent segfault)
See :py:method:`replace_schema_metadata` or :py:method:`replace_field_metadata`
- replace_schema_metadata(new_metadata: dict[str, str])¶
Replace the schema metadata of the dataset
- Parameters:
- new_metadata : dict¶
The new metadata to set
- restore()¶
Restore the currently checked out version as the latest version of the dataset.
This creates a new commit.
-
sample(num_rows: int, columns: list[str] | dict[str, str] | None =
None
, randomize_order: bool =True
, **kwargs) Table ¶ Select a random sample of data
- Parameters:
- num_rows : int¶
number of rows to retrieve
- columns : list of str, or dict of str to str default None¶
List of column names to be fetched. Or a dictionary of column names to SQL expressions. All columns are fetched if None or unspecified.
- **kwargs : dict, optional¶
see scanner() method for full parameter description.
- Returns:
table
- Return type:
Table
-
scanner(columns: list[str] | dict[str, str] | None =
None
, filter: Expression | str | None =None
, limit: int | None =None
, offset: int | None =None
, nearest: dict | None =None
, batch_size: int | None =None
, batch_readahead: int | None =None
, fragment_readahead: int | None =None
, scan_in_order: bool | None =None
, fragments: Iterable[LanceFragment] | None =None
, full_text_query: str | dict | None =None
, *, prefilter: bool | None =None
, with_row_id: bool | None =None
, with_row_address: bool | None =None
, use_stats: bool | None =None
, fast_search: bool | None =None
, io_buffer_size: int | None =None
, late_materialization: bool | list[str] | None =None
, use_scalar_index: bool | None =None
, include_deleted_rows: bool | None =None
) LanceScanner ¶ Return a Scanner that can support various pushdowns.
- Parameters:
- columns : list of str, or dict of str to str default None¶
List of column names to be fetched. Or a dictionary of column names to SQL expressions. All columns are fetched if None or unspecified.
- filter : pa.compute.Expression or str¶
Expression or str that is a valid SQL where clause. See Lance filter pushdown for valid SQL expressions.
- limit : int, default None¶
Fetch up to this many rows. All rows if None or unspecified.
- offset : int, default None¶
Fetch starting with this row. 0 if None or unspecified.
- nearest : dict, default None¶
Get the rows corresponding to the K most similar vectors. Example:
{ "column": <embedding col name>, "q": <query vector as pa.Float32Array>, "k": 10, "nprobes": 1, "refine_factor": 1 }
- batch_size : int, default None¶
The target size of batches returned. In some cases batches can be up to twice this size (but never larger than this). In some cases batches can be smaller than this size.
- io_buffer_size : int, default None¶
The size of the IO buffer. See
ScannerBuilder.io_buffer_size
for more information.- batch_readahead : int, optional¶
The number of batches to read ahead.
- fragment_readahead : int, optional¶
The number of fragments to read ahead.
- scan_in_order : bool, default True¶
Whether to read the fragments and batches in order. If false, throughput may be higher, but batches will be returned out of order and memory use might increase.
- fragments : iterable of LanceFragment, default None¶
If specified, only scan these fragments. If scan_in_order is True, then the fragments will be scanned in the order given.
- prefilter : bool, default False¶
If True then the filter will be applied before the vector query is run. This will generate more correct results but it may be a more costly query. It’s generally good when the filter is highly selective.
If False then the filter will be applied after the vector query is run. This will perform well but the results may have fewer than the requested number of rows (or be empty) if the rows closest to the query do not match the filter. It’s generally good when the filter is not very selective.
- use_scalar_index : bool, default True¶
Lance will automatically use scalar indices to optimize a query. In some corner cases this can make query performance worse and this parameter can be used to disable scalar indices in these cases.
- late_materialization : bool or List[str], default None¶
Allows custom control over late materialization. Late materialization fetches non-query columns using a take operation after the filter. This is useful when there are few results or columns are very large.
Early materialization can be better when there are many results or the columns are very narrow.
If True, then all columns are late materialized. If False, then all columns are early materialized. If a list of strings, then only the columns in the list are late materialized.
The default uses a heuristic that assumes filters will select about 0.1% of the rows. If your filter is more selective (e.g. find by id) you may want to set this to True. If your filter is not very selective (e.g. matches 20% of the rows) you may want to set this to False.
- full_text_query : str or dict, optional¶
query string to search for, the results will be ranked by BM25. e.g. “hello world”, would match documents containing “hello” or “world”. or a dictionary with the following keys:
- columns: list[str]
The columns to search, currently only supports a single column in the columns list.
- query: str
The query string to search for.
- fast_search : bool, default False¶
If True, then the search will only be performed on the indexed data, which yields faster search time.
- include_deleted_rows : bool, default False¶
If True, then rows that have been deleted, but are still present in the fragment, will be returned. These rows will have the _rowid column set to null. All other columns will reflect the value stored on disk and may not be null.
Note: if this is a search operation, or a take operation (including scalar indexed scans) then deleted rows cannot be returned.
Note
For now, if BOTH filter and nearest is specified, then:
nearest is executed first.
The results are filtered afterwards.
For debugging ANN results, you can choose to not use the index even if present by specifying
use_index=False
. For example, the following will always return exact KNN results:dataset.to_table(nearest={ "column": "vector", "k": 10, "q": <query vector>, "use_index": False }
- session() _Session ¶
Return the dataset session, which holds the dataset’s state.
- property stats : LanceStats¶
Experimental API
- property tags : Tags¶
-
take(indices: list[int] | Array, columns: list[str] | dict[str, str] | None =
None
) Table ¶ Select rows of data by index.
- Parameters:
- Returns:
table
- Return type:
- take_blobs(row_ids: list[int] | Array, blob_column: str) list[BlobFile] ¶
Select blobs by row IDs.
Instead of loading large binary blob data into memory before processing it, this API allows you to open binary blob data as a regular Python file-like object. For more details, see
lance.BlobFile
.
-
to_batches(columns: list[str] | dict[str, str] | None =
None
, filter: Expression | str | None =None
, limit: int | None =None
, offset: int | None =None
, nearest: dict | None =None
, batch_size: int | None =None
, batch_readahead: int | None =None
, fragment_readahead: int | None =None
, scan_in_order: bool | None =None
, *, prefilter: bool | None =None
, with_row_id: bool | None =None
, with_row_address: bool | None =None
, use_stats: bool | None =None
, full_text_query: str | dict | None =None
, io_buffer_size: int | None =None
, late_materialization: bool | list[str] | None =None
, use_scalar_index: bool | None =None
, **kwargs) Iterator[RecordBatch] ¶ Read the dataset as materialized record batches.
- Parameters:
- Returns:
record_batches
- Return type:
Iterator of
RecordBatch
-
to_table(columns: list[str] | dict[str, str] | None =
None
, filter: Expression | str | None =None
, limit: int | None =None
, offset: int | None =None
, nearest: dict | None =None
, batch_size: int | None =None
, batch_readahead: int | None =None
, fragment_readahead: int | None =None
, scan_in_order: bool | None =None
, *, prefilter: bool | None =None
, with_row_id: bool | None =None
, with_row_address: bool | None =None
, use_stats: bool | None =None
, fast_search: bool | None =None
, full_text_query: str | dict | None =None
, io_buffer_size: int | None =None
, late_materialization: bool | list[str] | None =None
, use_scalar_index: bool | None =None
, include_deleted_rows: bool | None =None
) Table ¶ Read the data into memory as a
pyarrow.Table
- Parameters:
- columns : list of str, or dict of str to str default None¶
List of column names to be fetched. Or a dictionary of column names to SQL expressions. All columns are fetched if None or unspecified.
- filter : pa.compute.Expression or str¶
Expression or str that is a valid SQL where clause. See Lance filter pushdown for valid SQL expressions.
- limit : int, default None¶
Fetch up to this many rows. All rows if None or unspecified.
- offset : int, default None¶
Fetch starting with this row. 0 if None or unspecified.
- nearest : dict, default None¶
Get the rows corresponding to the K most similar vectors. Example:
{ "column": <embedding col name>, "q": <query vector as pa.Float32Array>, "k": 10, "metric": "cosine", "nprobes": 1, "refine_factor": 1 }
- batch_size : int, optional¶
The number of rows to read at a time.
- io_buffer_size : int, default None¶
The size of the IO buffer. See
ScannerBuilder.io_buffer_size
for more information.- batch_readahead : int, optional¶
The number of batches to read ahead.
- fragment_readahead : int, optional¶
The number of fragments to read ahead.
- scan_in_order : bool, optional, default True¶
Whether to read the fragments and batches in order. If false, throughput may be higher, but batches will be returned out of order and memory use might increase.
- prefilter : bool, optional, default False¶
Run filter before the vector search.
- late_materialization : bool or List[str], default None¶
Allows custom control over late materialization. See
ScannerBuilder.late_materialization
for more information.- use_scalar_index : bool, default True¶
Allows custom control over scalar index usage. See
ScannerBuilder.use_scalar_index
for more information.- with_row_id : bool, optional, default False¶
Return row ID.
- with_row_address : bool, optional, default False¶
Return row address
- use_stats : bool, optional, default True¶
Use stats pushdown during filters.
- fast_search : bool, optional, default False¶
- full_text_query : str or dict, optional¶
query string to search for, the results will be ranked by BM25. e.g. “hello world”, would match documents contains “hello” or “world”. or a dictionary with the following keys:
- columns: list[str]
The columns to search, currently only supports a single column in the columns list.
- query: str
The query string to search for.
- include_deleted_rows : bool, optional, default False¶
If True, then rows that have been deleted, but are still present in the fragment, will be returned. These rows will have the _rowid column set to null. All other columns will reflect the value stored on disk and may not be null.
Note: if this is a search operation, or a take operation (including scalar indexed scans) then deleted rows cannot be returned.
Notes
If BOTH filter and nearest is specified, then:
nearest is executed first.
The results are filtered afterward, unless pre-filter sets to True.
-
update(updates: dict[str, str], where: str | None =
None
) UpdateResult ¶ Update column values for rows matching where.
- Parameters:
- Returns:
updates – A dictionary containing the number of rows updated.
- Return type:
dict
Examples
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example") >>> update_stats = dataset.update(dict(a = 'a + 2'), where="b != 'a'") >>> update_stats["num_updated_rows"] = 2 >>> dataset.to_table().to_pandas() a b 0 1 a 1 4 b 2 5 c
- property uri : str¶
The location of the data
- validate()¶
Validate the dataset.
This checks the integrity of the dataset and will raise an exception if the dataset is corrupted.
- property version : int¶
Returns the currently checked out version of the dataset
- versions()¶
Return all versions in this dataset.
-
add_columns(transforms: dict[str, str] | BatchUDF | ReaderLike, read_columns: list[str] | None =
-
class lance.LanceFragment(dataset: LanceDataset, fragment_id: int | None, *, fragment: _Fragment | None =
None
)¶ - count_rows(self, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)¶
Count rows matching the scanner filter.
- Parameters:
- filter : Expression, default None
Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.
- batch_size : int, default 131_072
The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.
- batch_readahead : int, default 16
The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.
- fragment_readahead : int, default 4
The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.
- fragment_scan_options : FragmentScanOptions, default None
Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.
- use_threads : bool, default True
If enabled, then maximum parallelism will be used determined by the number of available CPU cores.
- memory_pool : MemoryPool, default None
For memory allocations, if required. If not specified, uses the default pool.
- Returns:
count
- Return type:
int
-
static create(dataset_uri: str | Path, data: ReaderLike, fragment_id: int | None =
None
, schema: pa.Schema | None =None
, max_rows_per_group: int =1024
, progress: FragmentWriteProgress | None =None
, mode: str ='append'
, *, data_storage_version: str | None =None
, use_legacy_format: bool | None =None
, storage_options: dict[str, str] | None =None
) FragmentMetadata ¶ Create a
FragmentMetadata
from the given data.This can be used if the dataset is not yet created.
Warning
Internal API. This method is not intended to be used by end users.
- Parameters:
- dataset_uri : str¶
The URI of the dataset.
- fragment_id : int¶
The ID of the fragment.
- data : pa.Table or pa.RecordBatchReader¶
The data to be written to the fragment.
- schema : pa.Schema, optional¶
The schema of the data. If not specified, the schema will be inferred from the data.
- max_rows_per_group : int, default 1024¶
The maximum number of rows per group in the data file.
- progress : FragmentWriteProgress, optional¶
Experimental API. Progress tracking for writing the fragment. Pass a custom class that defines hooks to be called when each fragment is starting to write and finishing writing.
- mode : str, default "append"¶
The write mode. If “append” is specified, the data will be checked against the existing dataset’s schema. Otherwise, pass “create” or “overwrite” to assign new field ids to the schema.
- data_storage_version : optional, str, default None¶
The version of the data storage format to use. Newer versions are more efficient but require newer versions of lance to read. The default (None) will use the latest stable version. See the user guide for more details.
- use_legacy_format : bool, default None¶
Deprecated parameter. Use data_storage_version instead.
- storage_options : optional, dict¶
Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.
See also
lance.dataset.LanceOperation.Overwrite
The operation used to create a new dataset or overwrite one using fragments created with this API. See the doc page for an example of using this API.
lance.dataset.LanceOperation.Append
The operation used to append fragments created with this API to an existing dataset. See the doc page for an example of using this API.
- Return type:
- static create_from_file(filename: str, dataset: LanceDataset, fragment_id: int) FragmentMetadata ¶
Create a fragment from the given datafile uri.
This can be used if the datafile is loss from dataset.
Warning
Internal API. This method is not intended to be used by end users.
- Parameters:
- filename : str¶
The filename of the datafile.
- dataset : LanceDataset¶
The dataset that the fragment belongs to.
- fragment_id : int¶
The ID of the fragment.
- data_files()¶
Return the data files of this fragment.
- delete(predicate: str) FragmentMetadata | None ¶
Delete rows from this Fragment.
This will add or update the deletion file of this fragment. It does not modify or delete the data files of this fragment. If no rows are left after the deletion, this method will return None.
Warning
Internal API. This method is not intended to be used by end users.
- Parameters:
- predicate : str¶
A SQL predicate that specifies the rows to delete.
- Returns:
A new fragment containing the new deletion file, or None if no rows left.
- Return type:
FragmentMetadata or None
Examples
>>> import lance >>> import pyarrow as pa >>> tab = pa.table({"a": [1, 2, 3], "b": [4, 5, 6]}) >>> dataset = lance.write_dataset(tab, "dataset") >>> frag = dataset.get_fragment(0) >>> frag.delete("a > 1") FragmentMetadata(id=0, files=[DataFile(path='...', fields=[0, 1], ...), ...) >>> frag.delete("a > 0") is None True
See also
lance.dataset.LanceOperation.Delete
The operation used to commit these changes to a dataset. See the doc page for an example of using this API.
- deletion_file()¶
Return the deletion file, if any
- property fragment_id¶
- head(self, int num_rows, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)¶
Load the first N rows of the fragment.
- Parameters:
- num_rows : int
The number of rows to load.
- columns : list of str, default None
The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.
The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).
The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.
- filter : Expression, default None
Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.
- batch_size : int, default 131_072
The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.
- batch_readahead : int, default 16
The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.
- fragment_readahead : int, default 4
The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.
- fragment_scan_options : FragmentScanOptions, default None
Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.
- use_threads : bool, default True
If enabled, then maximum parallelism will be used determined by the number of available CPU cores.
- memory_pool : MemoryPool, default None
For memory allocations, if required. If not specified, uses the default pool.
- Return type:
Table
-
merge(data_obj: ReaderLike, left_on: str, right_on: str | None =
None
, schema=None
) tuple[FragmentMetadata, LanceSchema] ¶ Merge another dataset into this fragment.
Performs a left join, where the fragment is the left side and data_obj is the right side. Rows existing in the dataset but not on the left will be filled with null values, unless Lance doesn’t support null values for some types, in which case an error will be raised.
- Parameters:
- data_obj : Reader-like¶
The data to be merged. Acceptable types are: - Pandas DataFrame, Pyarrow Table, Dataset, Scanner, Iterator[RecordBatch], or RecordBatchReader
- left_on : str¶
The name of the column in the dataset to join on.
- right_on : str or None¶
The name of the column in data_obj to join on. If None, defaults to left_on.
Examples
>>> import lance >>> import pyarrow as pa >>> df = pa.table({'x': [1, 2, 3], 'y': ['a', 'b', 'c']}) >>> dataset = lance.write_dataset(df, "dataset") >>> dataset.to_table().to_pandas() x y 0 1 a 1 2 b 2 3 c >>> fragments = dataset.get_fragments() >>> new_df = pa.table({'x': [1, 2, 3], 'z': ['d', 'e', 'f']}) >>> merged = [] >>> schema = None >>> for f in fragments: ... f, schema = f.merge(new_df, 'x') ... merged.append(f) >>> merge = lance.LanceOperation.Merge(merged, schema) >>> dataset = lance.LanceDataset.commit("dataset", merge, read_version=1) >>> dataset.to_table().to_pandas() x y z 0 1 a d 1 2 b e 2 3 c f
See also
LanceDataset.merge_columns
Add columns to this Fragment.
- Returns:
A new fragment with the merged column(s) and the final schema.
- Return type:
Tuple[FragmentMetadata, LanceSchema]
-
merge_columns(value_func: dict[str, str] | BatchUDF | ReaderLike | collections.abc.Callable[[pa.RecordBatch], pa.RecordBatch], columns: list[str] | None =
None
, batch_size: int | None =None
, reader_schema: pa.Schema | None =None
) tuple[FragmentMetadata, LanceSchema] ¶ Add columns to this Fragment.
Warning
Internal API. This method is not intended to be used by end users.
The parameters and their interpretation are the same as in the
lance.dataset.LanceDataset.add_columns()
operation.The only difference is that, instead of modifying the dataset, a new fragment is created. The new schema of the fragment is returned as well. These can be used in a later operation to commit the changes to the dataset.
See also
lance.dataset.LanceOperation.Merge
The operation used to commit these changes to the dataset. See the doc page for an example of using this API.
- Returns:
A new fragment with the added column(s) and the final schema.
- Return type:
Tuple[FragmentMetadata, LanceSchema]
- property metadata : FragmentMetadata¶
Return the metadata of this fragment.
- Return type:
- property num_deletions : int¶
Return the number of deleted rows in this fragment.
- property partition_expression : Schema¶
An Expression which evaluates to true for all data viewed by this Fragment.
- property physical_rows : int¶
Return the number of rows originally in this fragment.
To get the number of rows after deletions, use
count_rows()
instead.
- property physical_schema : Schema¶
Return the physical schema of this Fragment. This schema can be different from the dataset read schema.
-
scanner(*, columns: list[str] | dict[str, str] | None =
None
, batch_size: int | None =None
, filter: str | pa.compute.Expression | None =None
, limit: int | None =None
, offset: int | None =None
, with_row_id: bool =False
, with_row_address: bool =False
, batch_readahead: int =16
) LanceScanner ¶ See Dataset::scanner for details
- take(self, indices, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)¶
Select rows of data by index.
- Parameters:
- indices : Array or array-like¶
The indices of row to select in the dataset.
- columns : list of str, default None
The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.
The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).
The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.
- filter : Expression, default None
Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.
- batch_size : int, default 131_072
The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.
- batch_readahead : int, default 16
The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.
- fragment_readahead : int, default 4
The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.
- fragment_scan_options : FragmentScanOptions, default None
Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.
- use_threads : bool, default True
If enabled, then maximum parallelism will be used determined by the number of available CPU cores.
- memory_pool : MemoryPool, default None
For memory allocations, if required. If not specified, uses the default pool.
- Return type:
Table
- to_batches(self, Schema schema=None, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)¶
Read the fragment as materialized record batches.
- Parameters:
- schema : Schema, optional
Concrete schema to use for scanning.
- columns : list of str, default None
The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.
The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).
The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.
- filter : Expression, default None
Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.
- batch_size : int, default 131_072
The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.
- batch_readahead : int, default 16
The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.
- fragment_readahead : int, default 4
The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.
- fragment_scan_options : FragmentScanOptions, default None
Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.
- use_threads : bool, default True
If enabled, then maximum parallelism will be used determined by the number of available CPU cores.
- memory_pool : MemoryPool, default None
For memory allocations, if required. If not specified, uses the default pool.
- Returns:
record_batches
- Return type:
iterator of RecordBatch
- to_table(self, Schema schema=None, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)¶
Convert this Fragment into a Table.
Use this convenience utility with care. This will serially materialize the Scan result in memory before creating the Table.
- Parameters:
- schema : Schema, optional
Concrete schema to use for scanning.
- columns : list of str, default None
The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.
The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).
The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.
- filter : Expression, default None
Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.
- batch_size : int, default 131_072
The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.
- batch_readahead : int, default 16
The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.
- fragment_readahead : int, default 4
The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.
- fragment_scan_options : FragmentScanOptions, default None
Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.
- use_threads : bool, default True
If enabled, then maximum parallelism will be used determined by the number of available CPU cores.
- memory_pool : MemoryPool, default None
For memory allocations, if required. If not specified, uses the default pool.
- Returns:
table
- Return type:
Table
- class lance.LanceOperation¶
- class Append(fragments: Iterable[FragmentMetadata])¶
Append new rows to the dataset.
- fragments¶
The fragments that contain the new rows.
- Type:
list[FragmentMetadata]
Warning
This is an advanced API for distributed operations. To append to a dataset on a single machine, use
lance.write_dataset()
.Examples
To append new rows to a dataset, first use
lance.fragment.LanceFragment.create()
to create fragments. Then collect the fragment metadata into a list and pass it to this class. Finally, pass the operation to theLanceDataset.commit()
method to create the new dataset.>>> import lance >>> import pyarrow as pa >>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]}) >>> dataset = lance.write_dataset(tab1, "example") >>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]}) >>> fragment = lance.fragment.LanceFragment.create("example", tab2) >>> operation = lance.LanceOperation.Append([fragment]) >>> dataset = lance.LanceDataset.commit("example", operation, ... read_version=dataset.version) >>> dataset.to_table().to_pandas() a b 0 1 a 1 2 b 2 3 c 3 4 d
- fragments : Iterable[FragmentMetadata]¶
- class BaseOperation¶
Base class for operations that can be applied to a dataset.
See available operations under
LanceOperation
.
- class CreateIndex(uuid: str, name: str, fields: list[int], dataset_version: int, fragment_ids: set[int])¶
Operation that creates an index on the dataset.
- dataset_version : int¶
- fields : List[int]¶
- fragment_ids : Set[int]¶
- name : str¶
- uuid : str¶
- class DataReplacement(replacements: list[DataReplacementGroup])¶
Operation that replaces existing datafiles in the dataset.
- replacements : List[DataReplacementGroup]¶
- class DataReplacementGroup(fragment_id: int, new_file: DataFile)¶
Group of data replacements
- fragment_id : int¶
- new_file : DataFile¶
- class Delete(updated_fragments: Iterable[FragmentMetadata], deleted_fragment_ids: Iterable[int], predicate: str)¶
Remove fragments or rows from the dataset.
- updated_fragments¶
The fragments that have been updated with new deletion vectors.
- Type:
list[FragmentMetadata]
- deleted_fragment_ids¶
The ids of the fragments that have been deleted entirely. These are the fragments where
LanceFragment.delete()
returned None.- Type:
list[int]
- predicate¶
The original SQL predicate used to select the rows to delete.
- Type:
str
Warning
This is an advanced API for distributed operations. To delete rows from dataset on a single machine, use
lance.LanceDataset.delete()
.Examples
To delete rows from a dataset, call
lance.fragment.LanceFragment.delete()
on each of the fragments. If that returns a new fragment, add that to theupdated_fragments
list. If it returns None, that means the whole fragment was deleted, so add the fragment id to thedeleted_fragment_ids
. Finally, pass the operation to theLanceDataset.commit()
method to complete the deletion operation.>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2], "b": ["a", "b"]}) >>> dataset = lance.write_dataset(table, "example") >>> table = pa.table({"a": [3, 4], "b": ["c", "d"]}) >>> dataset = lance.write_dataset(table, "example", mode="append") >>> dataset.to_table().to_pandas() a b 0 1 a 1 2 b 2 3 c 3 4 d >>> predicate = "a >= 2" >>> updated_fragments = [] >>> deleted_fragment_ids = [] >>> for fragment in dataset.get_fragments(): ... new_fragment = fragment.delete(predicate) ... if new_fragment is not None: ... updated_fragments.append(new_fragment) ... else: ... deleted_fragment_ids.append(fragment.fragment_id) >>> operation = lance.LanceOperation.Delete(updated_fragments, ... deleted_fragment_ids, ... predicate) >>> dataset = lance.LanceDataset.commit("example", operation, ... read_version=dataset.version) >>> dataset.to_table().to_pandas() a b 0 1 a
- deleted_fragment_ids : Iterable[int]¶
- predicate : str¶
- updated_fragments : Iterable[FragmentMetadata]¶
- class Merge(fragments: Iterable[FragmentMetadata], schema: LanceSchema | Schema)¶
Operation that adds columns. Unlike Overwrite, this should not change the structure of the fragments, allowing existing indices to be kept.
- fragments¶
The fragments that make up the new dataset.
- Type:
iterable of FragmentMetadata
- schema¶
The schema of the new dataset. Passing a LanceSchema is preferred, and passing a pyarrow.Schema is deprecated.
- Type:
LanceSchema or pyarrow.Schema
Warning
This is an advanced API for distributed operations. To overwrite or create new dataset on a single machine, use
lance.write_dataset()
.Examples
To add new columns to a dataset, first define a method that will create the new columns based on the existing columns. Then use
lance.fragment.LanceFragment.add_columns()
>>> import lance >>> import pyarrow as pa >>> import pyarrow.compute as pc >>> table = pa.table({"a": [1, 2, 3, 4], "b": ["a", "b", "c", "d"]}) >>> dataset = lance.write_dataset(table, "example") >>> dataset.to_table().to_pandas() a b 0 1 a 1 2 b 2 3 c 3 4 d >>> def double_a(batch: pa.RecordBatch) -> pa.RecordBatch: ... doubled = pc.multiply(batch["a"], 2) ... return pa.record_batch([doubled], ["a_doubled"]) >>> fragments = [] >>> for fragment in dataset.get_fragments(): ... new_fragment, new_schema = fragment.merge_columns(double_a, ... columns=['a']) ... fragments.append(new_fragment) >>> operation = lance.LanceOperation.Merge(fragments, new_schema) >>> dataset = lance.LanceDataset.commit("example", operation, ... read_version=dataset.version) >>> dataset.to_table().to_pandas() a b a_doubled 0 1 a 2 1 2 b 4 2 3 c 6 3 4 d 8
- fragments : Iterable[FragmentMetadata]¶
- class Overwrite(new_schema: LanceSchema | Schema, fragments: Iterable[FragmentMetadata])¶
Overwrite or create a new dataset.
- new_schema¶
The schema of the new dataset.
- Type:
- fragments¶
The fragments that make up the new dataset.
- Type:
list[FragmentMetadata]
Warning
This is an advanced API for distributed operations. To overwrite or create new dataset on a single machine, use
lance.write_dataset()
.Examples
To create or overwrite a dataset, first use
lance.fragment.LanceFragment.create()
to create fragments. Then collect the fragment metadata into a list and pass it along with the schema to this class. Finally, pass the operation to theLanceDataset.commit()
method to create the new dataset.>>> import lance >>> import pyarrow as pa >>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]}) >>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]}) >>> fragment1 = lance.fragment.LanceFragment.create("example", tab1) >>> fragment2 = lance.fragment.LanceFragment.create("example", tab2) >>> fragments = [fragment1, fragment2] >>> operation = lance.LanceOperation.Overwrite(tab1.schema, fragments) >>> dataset = lance.LanceDataset.commit("example", operation) >>> dataset.to_table().to_pandas() a b 0 1 a 1 2 b 2 3 c 3 4 d
- fragments : Iterable[FragmentMetadata]¶
- class Project(schema: LanceSchema)¶
Operation that project columns. Use this operator for drop column or rename/swap column.
- schema¶
The lance schema of the new dataset.
- Type:
LanceSchema
Examples
Use the projece operator to swap column:
>>> import lance >>> import pyarrow as pa >>> import pyarrow.compute as pc >>> from lance.schema import LanceSchema >>> table = pa.table({"a": [1, 2], "b": ["a", "b"], "b1": ["c", "d"]}) >>> dataset = lance.write_dataset(table, "example") >>> dataset.to_table().to_pandas() a b b1 0 1 a c 1 2 b d >>> >>> ## rename column `b` into `b0` and rename b1 into `b` >>> table = pa.table({"a": [3, 4], "b0": ["a", "b"], "b": ["c", "d"]}) >>> lance_schema = LanceSchema.from_pyarrow(table.schema) >>> operation = lance.LanceOperation.Project(lance_schema) >>> dataset = lance.LanceDataset.commit("example", operation, read_version=1) >>> dataset.to_table().to_pandas() a b0 b 0 1 a c 1 2 b d
- schema : LanceSchema¶
- class Restore(version: int)¶
Operation that restores a previous version of the dataset.
- version : int¶
- class Rewrite(groups: Iterable[RewriteGroup], rewritten_indices: Iterable[RewrittenIndex])¶
Operation that rewrites one or more files and indices into one or more files and indices.
- groups¶
Groups of files that have been rewritten.
- Type:
list[RewriteGroup]
- rewritten_indices¶
Indices that have been rewritten.
- Type:
list[RewrittenIndex]
Warning
This is an advanced API not intended for general use.
- groups : Iterable[RewriteGroup]¶
- rewritten_indices : Iterable[RewrittenIndex]¶
- class RewriteGroup(old_fragments: Iterable[FragmentMetadata], new_fragments: Iterable[FragmentMetadata])¶
Collection of rewritten files
- new_fragments : Iterable[FragmentMetadata]¶
- old_fragments : Iterable[FragmentMetadata]¶
- class RewrittenIndex(old_id: str, new_id: str)¶
An index that has been rewritten
- new_id : str¶
- old_id : str¶
- class Update(removed_fragment_ids: list[int], updated_fragments: list[FragmentMetadata], new_fragments: list[FragmentMetadata])¶
Operation that updates rows in the dataset.
- removed_fragment_ids¶
The ids of the fragments that have been removed entirely.
- Type:
list[int]
- updated_fragments¶
The fragments that have been updated with new deletion vectors.
- Type:
list[FragmentMetadata]
- new_fragments¶
The fragments that contain the new rows.
- Type:
list[FragmentMetadata]
- new_fragments : List[FragmentMetadata]¶
- removed_fragment_ids : List[int]¶
- updated_fragments : List[FragmentMetadata]¶
- class lance.LanceScanner(scanner: _Scanner, dataset: LanceDataset)¶
- analyze_plan() str ¶
Execute the plan for this scanner and display with runtime metrics.
- Parameters:
- verbose : bool, default False
Use a verbose output format.
- Returns:
plan
- Return type:
str
- count_rows()¶
Count rows matching the scanner filter.
- Returns:
count
- Return type:
int
-
explain_plan(verbose=
False
) str ¶ Return the execution plan for this scanner.
- Parameters:
- verbose : bool, default False¶
Use a verbose output format.
- Returns:
plan
- Return type:
str
- head(num_rows)¶
Load the first N rows of the dataset.
- Parameters:
- num_rows : int¶
The number of rows to load.
- Return type:
Table
- property projected_schema : Schema¶
The materialized schema of the data, accounting for projections.
This is the schema of any data returned from the scanner.
- scan_batches()¶
Consume a Scanner in record batches with corresponding fragments.
- Returns:
record_batches
- Return type:
iterator of TaggedRecordBatch
- class lance.MergeInsertBuilder(dataset, on)¶
-
execute(data_obj: ReaderLike, *, schema: pa.Schema | None =
None
)¶ Executes the merge insert operation
This function updates the original dataset and returns a dictionary with information about merge statistics - i.e. the number of inserted, updated, and deleted rows.
- Parameters:
- data_obj : ReaderLike¶
The new data to use as the source table for the operation. This parameter can be any source of data (e.g. table / dataset) that
write_dataset()
accepts.- schema : Optional[pa.Schema]¶
The schema of the data. This only needs to be supplied whenever the data source is some kind of generator.
-
execute_uncommitted(data_obj: ReaderLike, *, schema: pa.Schema | None =
None
) tuple[Transaction, dict[str, Any]] ¶ Executes the merge insert operation without committing
This function updates the original dataset and returns a dictionary with information about merge statistics - i.e. the number of inserted, updated, and deleted rows.
- Parameters:
- data_obj : ReaderLike¶
The new data to use as the source table for the operation. This parameter can be any source of data (e.g. table / dataset) that
write_dataset()
accepts.- schema : Optional[pa.Schema]¶
The schema of the data. This only needs to be supplied whenever the data source is some kind of generator.
-
when_matched_update_all(condition: str | None =
None
) MergeInsertBuilder ¶ Configure the operation to update matched rows
After this method is called, when the merge insert operation executes, any rows that match both the source table and the target table will be updated. The rows from the target table will be removed and the rows from the source table will be added.
An optional condition may be specified. This should be an SQL filter and, if present, then only matched rows that also satisfy this filter will be updated. The SQL filter should use the prefix target. to refer to columns in the target table and the prefix source. to refer to columns in the source table. For example, source.last_update < target.last_update.
If a condition is specified and rows do not satisfy the condition then these rows will not be updated. Failure to satisfy the filter does not cause a “matched” row to become a “not matched” row.
-
when_not_matched_by_source_delete(expr: str | None =
None
) MergeInsertBuilder ¶ Configure the operation to delete source rows that do not match
After this method is called, when the merge insert operation executes, any rows that exist only in the target table will be deleted. An optional filter can be specified to limit the scope of the delete operation. If given (as an SQL filter) then only rows which match the filter will be deleted.
- when_not_matched_insert_all() MergeInsertBuilder ¶
Configure the operation to insert not matched rows
After this method is called, when the merge insert operation executes, any rows that exist only in the source table will be inserted into the target table.
-
execute(data_obj: ReaderLike, *, schema: pa.Schema | None =
- class lance.Transaction(read_version: 'int', operation: 'LanceOperation.BaseOperation', uuid: 'str' = <factory>, blobs_op: 'Optional[LanceOperation.BaseOperation]' = None)¶
-
blobs_op : BaseOperation | None =
None
¶
- operation : BaseOperation¶
- read_version : int¶
- uuid : str¶
-
blobs_op : BaseOperation | None =
-
lance.batch_udf(output_schema=
None
, checkpoint_file=None
)¶ Create a user defined function (UDF) that adds columns to a dataset.
This function is used to add columns to a dataset. It takes a function that takes a single argument, a RecordBatch, and returns a RecordBatch. The function is called once for each batch in the dataset. The function should not modify the input batch, but instead create a new batch with the new columns added.
- Parameters:
- output_schema : Schema, optional¶
The schema of the output RecordBatch. This is used to validate the output of the function. If not provided, the schema of the first output RecordBatch will be used.
- checkpoint_file : str or Path, optional¶
If specified, this file will be used as a cache for unsaved results of this UDF. If the process fails, and you call add_columns again with this same file, it will resume from the last saved state. This is useful for long running processes that may fail and need to be resumed. This file may get very large. It will hold up to an entire data files’ worth of results on disk, which can be multiple gigabytes of data.
- Return type:
AddColumnsUDF
- lance.bytes_read_counter()¶
-
lance.dataset(uri: str | Path, version: int | str | None =
None
, asof: ts_types | None =None
, block_size: int | None =None
, commit_lock: CommitLock | None =None
, index_cache_size: int | None =None
, storage_options: dict[str, str] | None =None
, default_scan_options: dict[str, str] | None =None
) LanceDataset ¶ Opens the Lance dataset from the address specified.
- Parameters:
- uri : str¶
Address to the Lance dataset. It can be a local file path /tmp/data.lance, or a cloud object store URI, i.e., s3://bucket/data.lance.
- version : optional, int | str¶
If specified, load a specific version of the Lance dataset. Else, loads the latest version. A version number (int) or a tag (str) can be provided.
- asof : optional, datetime or str¶
If specified, find the latest version created on or earlier than the given argument value. If a version is already specified, this arg is ignored.
- block_size : optional, int¶
Block size in bytes. Provide a hint for the size of the minimal I/O request.
- commit_lock : optional, lance.commit.CommitLock¶
A custom commit lock. Only needed if your object store does not support atomic commits. See the user guide for more details.
- index_cache_size : optional, int¶
Index cache size. Index cache is a LRU cache with TTL. This number specifies the number of index pages, for example, IVF partitions, to be cached in the host memory. Default value is
256
.Roughly, for an
IVF_PQ
partition withn
rows, the size of each index page equals the combination of the pq code (nd.array([n,pq], dtype=uint8))
and the row ids (nd.array([n], dtype=uint64)
). Approximately,n = Total Rows / number of IVF partitions
.pq = number of PQ sub-vectors
.- storage_options : optional, dict¶
Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.
- default_scan_options : optional, dict¶
Default scan options that are used when scanning the dataset. This accepts the same arguments described in
lance.LanceDataset.scanner()
. The arguments will be applied to any scan operation.This can be useful to supply defaults for common parameters such as
batch_size
.It can also be used to create a view of the dataset that includes meta fields such as
_rowid
or_rowaddr
. Ifdefault_scan_options
is provided then the schema returned bylance.LanceDataset.schema()
will include these fields if the appropriate scan options are set.
- lance.iops_counter()¶
- lance.json_to_schema(schema_json: dict[str, Any]) Schema ¶
Converts a JSON string to a PyArrow schema.
- Parameters:
- schema_json : Dict[str, Any]¶
The JSON payload to convert to a PyArrow Schema.
-
lance.set_logger(file_path=
'pylance.log'
, name='pylance'
, level=20
, format_string=None
, log_handler=None
)¶
-
lance.write_dataset(data_obj: ReaderLike, uri: str | Path | LanceDataset, schema: pa.Schema | None =
None
, mode: str ='create'
, *, max_rows_per_file: int =1048576
, max_rows_per_group: int =1024
, max_bytes_per_file: int =96636764160
, commit_lock: CommitLock | None =None
, progress: FragmentWriteProgress | None =None
, storage_options: dict[str, str] | None =None
, data_storage_version: str | None =None
, use_legacy_format: bool | None =None
, enable_v2_manifest_paths: bool =False
, enable_move_stable_row_ids: bool =False
) LanceDataset ¶ Write a given data_obj to the given uri
- Parameters:
- data_obj : Reader-like¶
The data to be written. Acceptable types are: - Pandas DataFrame, Pyarrow Table, Dataset, Scanner, or RecordBatchReader - Huggingface dataset
- uri : str, Path, or LanceDataset¶
Where to write the dataset to (directory). If a LanceDataset is passed, the session will be reused.
- schema : Schema, optional¶
If specified and the input is a pandas DataFrame, use this schema instead of the default pandas to arrow table conversion.
- mode : str¶
create - create a new dataset (raises if uri already exists). overwrite - create a new snapshot version append - create a new version that is the concat of the input the latest version (raises if uri does not exist)
- max_rows_per_file : int, default 1024 * 1024¶
The max number of rows to write before starting a new file
- max_rows_per_group : int, default 1024¶
The max number of rows before starting a new group (in the same file)
- max_bytes_per_file : int, default 90 * 1024 * 1024 * 1024¶
The max number of bytes to write before starting a new file. This is a soft limit. This limit is checked after each group is written, which means larger groups may cause this to be overshot meaningfully. This defaults to 90 GB, since we have a hard limit of 100 GB per file on object stores.
- commit_lock : CommitLock, optional¶
A custom commit lock. Only needed if your object store does not support atomic commits. See the user guide for more details.
- progress : FragmentWriteProgress, optional¶
Experimental API. Progress tracking for writing the fragment. Pass a custom class that defines hooks to be called when each fragment is starting to write and finishing writing.
- storage_options : optional, dict¶
Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.
- data_storage_version : optional, str, default None¶
The version of the data storage format to use. Newer versions are more efficient but require newer versions of lance to read. The default (None) will use the latest stable version. See the user guide for more details.
- use_legacy_format : optional, bool, default None¶
Deprecated method for setting the data storage version. Use the data_storage_version parameter instead.
- enable_v2_manifest_paths : bool, optional¶
If True, and this is a new dataset, uses the new V2 manifest paths. These paths provide more efficient opening of datasets with many versions on object stores. This parameter has no effect if the dataset already exists. To migrate an existing dataset, instead use the
LanceDataset.migrate_manifest_paths_v2()
method. Default is False.- enable_move_stable_row_ids : bool, optional¶
Experimental parameter: if set to true, the writer will use move-stable row ids. These row ids are stable after compaction operations, but not after updates. This makes compaction more efficient, since with stable row ids no secondary indices need to be updated to point to new row ids.