Read and Write Lance Dataset¶
Lance dataset APIs follows the PyArrow API conventions.
Writing Lance Dataset¶
Similar to Apache Pyarrow, the simplest approach to create a Lance dataset is
writing a pyarrow.Table
via lance.write_dataset()
.
import lance
import pyarrow as pa
table = pa.Table.from_pylist([{"name": "Alice", "age": 20},
{"name": "Bob", "age": 30}])
lance.write_dataset(table, "./alice_and_bob.lance")
If the memory footprint of the dataset is too large to fit in memory, lance.write_dataset()
also supports writing a dataset in iterator of pyarrow.RecordBatch
es.
import lance
import pyarrow as pa
def producer():
yield pa.RecordBatch.from_pylist([{"name": "Alice", "age": 20}])
yield pa.RecordBatch.from_pylist([{"name": "Blob", "age": 30}])
schema = pa.schema([
pa.field("name", pa.string()),
pa.field("age", pa.int64()),
])
lance.write_dataset(reader, "./alice_and_bob.lance", schema)
lance.write_dataset()
supports writing pyarrow.Table
, pandas.DataFrame
,
pyarrow.Dataset
, and Iterator[pyarrow.RecordBatch]
. Check its doc for more details.
Deleting rows¶
Lance supports deleting rows from a dataset using a SQL filter. For example, to delete Bob’s row from the dataset above, one could use:
import lance
dataset = lance.dataset("./alice_and_bob.lance")
dataset.delete("name = 'Bob'")
lance.LanceDataset.delete()
supports the same filters as described in
Filter push-down.
Rows are deleted by marking them as deleted in a separate deletion index. This is faster than rewriting the files and also avoids invaliding any indices that point to those files. Any subsequent queries will not return the deleted rows.
Warning
Do not read datasets with deleted rows using Lance versions prior to 0.5.0, as they will return the deleted rows. This is fixed in 0.5.0 and later.
Updating rows¶
Lance supports updating rows based on SQL expressions with the
lance.LanceDataset.update()
method. For example, if we notice
that Bob’s name in our dataset has been sometimes written as Blob
, we can fix
that with:
import lance
dataset = lance.dataset("./alice_and_bob.lance")
dataset.update({"name": "'Bob'"}), where="name = 'Blob'")
The update values are SQL expressions, which is why 'Bob'
is wrapped in single
quotes. This means we can use complex expressions that reference existing columns if
we wish. For example, if two years have passed and we wish to update the ages
of Alice and Bob in the same example, we could write:
import lance
dataset = lance.dataset("./alice_and_bob.lance")
dataset.update({"age": "age + 2"})
If you are trying to update a set of individual rows with new values then it is often more efficient to use the merge insert operation described below.
import lance
# Change the ages of both Alice and Bob
new_table = pa.Table.from_pylist([{"name": "Alice", "age": 30},
{"name": "Bob", "age": 20}])
# This works, but is inefficient, see below for a better approach
dataset = lance.dataset("./alice_and_bob.lance")
for idx in range(new_table.num_rows):
name = new_table[0][idx].as_py()
new_age = new_table[1][idx].as_py()
dataset.update({"age": new_age}, where=f"name='{name}'")
Merge Insert¶
Lance supports a merge insert operation. This can be used to add new data in bulk while also (potentially) matching against existing data. This operation can be used for a number of different use cases.
Bulk Update¶
The lance.LanceDataset.update()
method is useful for updating rows based on
a filter. However, if we want to replace existing rows with new rows then a merge
insert operation would be more efficient:
import lance
# Change the ages of both Alice and Bob
new_table = pa.Table.from_pylist([{"name": "Alice", "age": 30},
{"name": "Bob", "age": 20}])
dataset = lance.dataset("./alice_and_bob.lance")
# This will use `name` as the key for matching rows. Merge insert
# uses a JOIN internally and so you typically want this column to
# be a unique key or id of some kind.
dataset.merge_insert("name") \
.when_matched_update_all() \
.execute(new_table)
Note that, similar to the update operation, rows that are modified will be removed and inserted back into the table, changing their position to the end. Also, the relative order of these rows could change because we are using a hash-join operation internally.
Insert if not Exists¶
Sometimes we only want to insert data if we haven’t already inserted it before. This can happen, for example, when we have a batch of data but we don’t know which rows we’ve added previously and we don’t want to create duplicate rows. We can use the merge insert operation to achieve this:
import lance
# Bob is already in the table, but Carla is new
new_table = pa.Table.from_pylist([{"name": "Bob", "age": 30},
{"name": "Carla", "age": 37}])
dataset = lance.dataset("./alice_and_bob.lance")
# This will insert Carla but leave Bob unchanged
dataset.merge_insert("name") \
.when_not_matched_insert_all() \
.execute(new_table)
Update or Insert (Upsert)¶
Sometimes we want to combine both of the above behaviors. If a row already exists we want to update it. If the row does not exist we want to add it. This operation is sometimes called “upsert”. We can use the merge insert operation to do this as well:
import lance
# Change Carla's age and insert David
new_table = pa.Table.from_pylist([{"name": "Carla", "age": 27},
{"name": "David", "age": 42}])
dataset = lance.dataset("./alice_and_bob.lance")
# This will update Carla and insert David
dataset.merge_insert("name") \
.when_matched_update_all() \
.when_not_matched_insert_all() \
.execute(new_table)
Replace a Portion of Data¶
A less common, but still useful, behavior can be to replace some region of existing rows (defined by a filter) with new data. This is similar to performing both a delete and an insert in a single transaction. For example:
import lance
new_table = pa.Table.from_pylist([{"name": "Edgar", "age": 46},
{"name": "Francene", "age": 44}])
dataset = lance.dataset("./alice_and_bob.lance")
# This will remove anyone above 40 and insert our new data
dataset.merge_insert("name") \
.when_not_matched_insert_all() \
.when_not_matched_by_source_delete("age >= 40") \
.execute(new_table)
Evolving the schema¶
Lance supports schema evolution: adding, removing, and altering columns in a dataset. Most of these operations can be performed without rewriting the data files in the dataset, making them very efficient operations.
In general, schema changes will conflict with most other concurrent write operations. For example, if you change the schema of the dataset while someone else is appending data to it, either your schema change or the append will fail, depending on the order of the operations. Thus, it’s recommended to perform schema changes when no other writes are happening.
Renaming columns¶
Columns can be renamed using the lance.LanceDataset.alter_columns()
method.
import lance
import pyarrow as pa
table = pa.table({"id": pa.array([1, 2, 3])})
dataset = lance.write_dataset(table, "ids")
dataset.alter_columns({"path": "id", "name": "new_id"})
dataset.to_table().to_pandas()
new_id
0 1
1 2
2 3
This works for nested columns as well. To address a nested column, use a dot
(.
) to separate the levels of nesting. For example:
data = [
{"meta": {"id": 1, "name": "Alice"}},
{"meta": {"id": 2, "name": "Bob"}},
]
dataset = lance.write_dataset(data, "nested_rename")
dataset.alter_columns({"path": "meta.id", "name": "new_id"})
meta
0 {"new_id": 1, "name": "Alice"}
1 {"new_id": 2, "name": "Bob"}
Casting column data types¶
In addition to changing column names, you can also change the data type of a
column using the lance.LanceDataset.alter_columns()
method. This
requires rewriting that column to new data files, but does not require rewriting
the other columns.
Note
If the column has an index, the index will be dropped if the column type is changed.
This method can be used to change the vector type of a column. For example, we can change a float32 embedding column into a float16 column to save disk space at the cost of lower precision:
import lance
import pyarrow as pa
import numpy as np
table = pa.table({
"id": pa.array([1, 2, 3]),
"embedding": pa.FixedShapeTensorArray.from_numpy_ndarray(
np.random.rand(3, 128).astype("float32"))
})
dataset = lance.write_dataset(table, "embeddings")
dataset.alter_columns({"path": "embedding",
"type": pa.list_(pa.float16(), 128)})
dataset.schema()
id: int64
embedding: fixed_size_list<item: float16, 128>
Adding new columns¶
New columns can be added and populated within a single operation using the
lance.LanceDataset.add_columns()
method. There are two ways to specify
how to populate the new columns: first, by providing a SQL expression for each
new column, or second, by providing a function to generate the new column data.
SQL expressions can either be independent expressions or reference existing columns. SQL literal values can be used to set a single value for all existing rows.
import lance
import pyarrow as pa
table = pa.table({"name": pa.array(["Alice", "Bob", "Carla"])})
dataset = lance.write_dataset(table, "names")
dataset.add_columns({
"hash": "sha256(name)",
"status": "'active'",
})
dataset.to_table().to_pandas()
name hash... status
0 Alice 3bc51062973c... active
1 Bob cd9fb1e148cc... active
2 Carla ad8d83ffd82b... active
You can also provide a Python function to generate the new column data. This can be used, for example, to compute a new embedding column. This function should take a PyArrow RecordBatch and return either a PyArrow RecordBatch or a Pandas DataFrame. The function will be called once for each batch in the dataset.
If the function is expensive to compute and can fail, it is recommended to set a checkpoint file in the UDF. This checkpoint file saves the state of the UDF after each invocation, so that if the UDF fails, it can be restarted from the last checkpoint. Note that this file can get quite large, since it needs to store unsaved results for up to an entire data file.
import lance
import pyarrow as pa
import numpy as np
table = pa.table({"id": pa.array([1, 2, 3])})
dataset = lance.write_dataset(table, "ids")
@lance.batch_udf(checkpoint_file="embedding_checkpoint.sqlite")
def add_random_vector(batch):
embeddings = np.random.rand(batch.num_rows, 128).astype("float32")
return pd.DataFrame({"embedding": embeddings})
dataset.add_columns(add_random_vector)
Adding new columns using merge¶
If you have pre-computed one or more new columns, you can add them to an existing
dataset using the lance.LanceDataset.merge()
method. This allows filling in
additional columns without having to rewrite the whole dataset.
To use the merge
method, provide a new dataset that includes the columns you
want to add, and a column name to use for joining the new data to the existing
dataset.
For example, imagine we have a dataset of embeddings and ids:
import lance
import pyarrow as pa
import numpy as np
table = pa.table({
"id": pa.array([1, 2, 3]),
"embedding": pa.array([np.array([1, 2, 3]), np.array([4, 5, 6]),
np.array([7, 8, 9])])
})
dataset = lance.write_dataset(table, "embeddings")
Now if we want to add a column of labels we have generated, we can do so by merging a new table:
new_data = pa.table({
"id": pa.array([1, 2, 3]),
"label": pa.array(["horse", "rabbit", "cat"])
})
dataset.merge(new_data, "id")
dataset.to_table().to_pandas()
id embedding label
0 1 [1, 2, 3] horse
1 2 [4, 5, 6] rabbit
2 3 [7, 8, 9] cat
Dropping columns¶
Finally, you can drop columns from a dataset using the lance.LanceDataset.drop_columns()
method. This is a metadata-only operation and does not delete the data on disk. This makes
it very quick.
import lance
import pyarrow as pa
table = pa.table({"id": pa.array([1, 2, 3]),
"name": pa.array(["Alice", "Bob", "Carla"])})
dataset = lance.write_dataset(table, "names")
dataset.drop_columns(["name"])
dataset.schema()
id: int64
To actually remove the data from disk, the files must be rewritten to remove the
columns and then the old files must be deleted. This can be done using
lance.dataset.DatasetOptimizer.compact_files()
followed by
lance.LanceDataset.cleanup_old_versions()
.
Reading Lance Dataset¶
To open a Lance dataset, use the lance.dataset()
function:
import lance
ds = lance.dataset("s3://bucket/path/imagenet.lance")
# Or local path
ds = lance.dataset("./imagenet.lance")
Note
Lance supports local file system, AWS s3
and Google Cloud Storage(gs
) as storage backends
at the moment. Read more in Object Store Configuration.
The most straightforward approach for reading a Lance dataset is to utilize the lance.LanceDataset.to_table()
method in order to load the entire dataset into memory.
table = ds.to_table()
Due to Lance being a high-performance columnar format, it enables efficient reading of subsets of the dataset by utilizing Column (projection) push-down and filter (predicates) push-downs.
table = ds.to_table(
columns=["image", "label"],
filter="label = 2 AND text IS NOT NULL",
limit=1000,
offset=3000)
Lance understands the cost of reading heavy columns such as image
.
Consequently, it employs an optimized query plan to execute the operation efficiently.
Iterative Read¶
If the dataset is too large to fit in memory, you can read it in batches
using the lance.LanceDataset.to_batches()
method:
for batch in ds.to_batches(columns=["image"], filter="label = 10"):
# do something with batch
compute_on_batch(batch)
Unsurprisingly, to_batches()
takes the same parameters
as to_table()
function.
Filter push-down¶
Lance embraces the utilization of standard SQL expressions as predicates for dataset filtering. By pushing down the SQL predicates directly to the storage system, the overall I/O load during a scan is significantly reduced.
Currently, Lance supports a growing list of expressions.
>
,>=
,<
,<=
,=
AND
,OR
,NOT
IS NULL
,IS NOT NULL
IS TRUE
,IS NOT TRUE
,IS FALSE
,IS NOT FALSE
IN
LIKE
,NOT LIKE
regexp_match(column, pattern)
CAST
For example, the following filter string is acceptable:
((label IN [10, 20]) AND (note['email'] IS NOT NULL))
OR NOT note['created']
Nested fields can be accessed using the subscripts. Struct fields can be subscripted using field names, while list fields can be subscripted using indices.
If your column name contains special characters or is a SQL Keyword,
you can use backtick (`
) to escape it. For nested fields, each segment of the
path must be wrapped in backticks.
`CUBE` = 10 AND `column name with space` IS NOT NULL
AND `nested with space`.`inner with space` < 2
Warning
Field names containing periods (.
) are not supported.
Literals for dates, timestamps, and decimals can be written by writing the string value after the type name. For example
date_col = date '2021-01-01'
and timestamp_col = timestamp '2021-01-01 00:00:00'
and decimal_col = decimal(8,3) '1.000'
For timestamp columns, the precision can be specified as a number in the type parameter. Microsecond precision (6) is the default.
SQL |
Time unit |
---|---|
|
Seconds |
|
Milliseconds |
|
Microseconds |
|
Nanoseconds |
Lance internally stores data in Arrow format. The mapping from SQL types to Arrow is:
SQL type |
Arrow type |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
See precision mapping in previous table.
Random read¶
One district feature of Lance, as columnar format, is that it allows you to read random samples quickly.
# Access the 2nd, 101th and 501th rows
data = ds.take([1, 100, 500], columns=["image", "label"])
The ability to achieve fast random access to individual rows plays a crucial role in facilitating various workflows such as random sampling and shuffling in ML training. Additionally, it empowers users to construct secondary indices, enabling swift execution of queries for enhanced performance.
Table Maintenance¶
Some operations over time will cause a Lance dataset to have a poor layout. For example, many small appends will lead to a large number of small fragments. Or deleting many rows will lead to slower queries due to the need to filter out deleted rows.
To address this, Lance provides methods for optimizing dataset layout.
Compact data files¶
Data files can be rewritten so there are fewer files. When passing a
target_rows_per_fragment
to lance.dataset.DatasetOptimizer.compact_files()
,
Lance will skip any fragments that are already above that row count, and rewrite
others. Fragments will be merged according to their fragment ids, so the inherent
ordering of the data will be preserved.
Note
Compaction creates a new version of the table. It does not delete the old version of the table and the files referenced by it.
import lance
dataset = lance.dataset("./alice_and_bob.lance")
dataset.optimize.compact_files(target_rows_per_fragment=1024 * 1024)
During compaction, Lance can also remove deleted rows. Rewritten fragments will not have deletion files. This can improve scan performance since the soft deleted rows don’t have to be skipped during the scan.
When files are rewritten, the original row addresses are invalidated. This means the affected files are no longer part of any ANN index if they were before. Because of this, it’s recommended to rewrite files before re-building indices.
Object Store Configuration¶
Lance supports object stores such as AWS S3 (and compatible stores), Azure Blob Store,
and Google Cloud Storage. Which object store to use is determined by the URI scheme of
the dataset path. For example, s3://bucket/path
will use S3, az://bucket/path
will use Azure, and gs://bucket/path
will use GCS.
New in version 0.10.7: Passing options directly to storage options.
These object stores take additional configuration objects. There are two ways to
specify these configurations: by setting environment variables or by passing them
to the storage_options
parameter of lance.dataset()
and
lance.write_dataset()
. So for example, to globally set a higher timeout,
you would run in your shell:
export TIMEOUT=60s
If you only want to set the timeout for a single dataset, you can pass it as a storage option:
import lance
ds = lance.dataset("s3://path", storage_options={"timeout": "60s"})
General Configuration¶
These options apply to all object stores.
Key |
Description |
---|---|
|
Allow non-TLS, i.e. non-HTTPS connections. Default, |
|
Number of times to retry a download. Default, |
|
Skip certificate validation on https connections. Default, |
|
Timeout for only the connect phase of a Client. Default, |
|
Timeout for the entire request, from connection until the response body
has finished. Default, |
|
User agent string to use in requests. |
|
URL of a proxy server to use for requests. Default, |
|
PEM-formatted CA certificate for proxy connections |
|
List of hosts that bypass proxy. This is a comma separated list of domains
and IP masks. Any subdomain of the provided domain will be bypassed. For
example, |
|
Number of times for a s3 client to retry the request. Default, |
|
Timeout for a s3 client to retry the request in seconds. Default, |
S3 Configuration¶
S3 (and S3-compatible stores) have additional configuration options that configure authorization and S3-specific features (such as server-side encryption).
AWS credentials can be set in the environment variables AWS_ACCESS_KEY_ID
,
AWS_SECRET_ACCESS_KEY
, and AWS_SESSION_TOKEN
. Alternatively, they can be
passed as parameters to the storage_options
parameter:
import lance
ds = lance.dataset(
"s3://bucket/path",
storage_options={
"access_key_id": "my-access-key",
"secret_access_key": "my-secret-key",
"session_token": "my-session-token",
}
)
If you are using AWS SSO, you can specify the AWS_PROFILE
environment variable.
It cannot be specified in the storage_options
parameter.
The following keys can be used as both environment variables or keys in the
storage_options
parameter:
Key |
Description |
---|---|
|
The AWS region the bucket is in. This can be automatically detected when using AWS S3, but must be specified for S3-compatible stores. |
|
The AWS access key ID to use. |
|
The AWS secret access key to use. |
|
The AWS session token to use. |
|
The endpoint to use for S3-compatible stores. |
|
Whether to use virtual hosted-style requests, where bucket name is part
of the endpoint. Meant to be used with |
|
Whether to use S3 Express One Zone endpoints. Default, |
|
The server-side encryption algorithm to use. Must be one of |
|
The KMS key ID to use for server-side encryption. If set,
|
|
Whether to use bucket keys for server-side encryption. |
S3-compatible stores¶
Lance can also connect to S3-compatible stores, such as MinIO. To do so, you must specify both region and endpoint:
import lance
ds = lance.dataset(
"s3://bucket/path",
storage_options={
"region": "us-east-1",
"endpoint": "http://minio:9000",
}
)
This can also be done with the AWS_ENDPOINT
and AWS_DEFAULT_REGION
environment variables.
S3 Express¶
New in version 0.9.7.
Lance supports S3 Express One Zone endpoints, but requires additional configuration. Also, S3 Express endpoints only support connecting from an EC2 instance within the same region.
To configure Lance to use an S3 Express endpoint, you must set the storage option
s3_express
. The bucket name in your table URI should include the suffix.
import lance
ds = lance.dataset(
"s3://my-bucket--use1-az4--x-s3/path/imagenet.lance",
storage_options={
"region": "us-east-1",
"s3_express": "true",
}
)
Committing mechanisms for S3¶
Most supported storage systems (e.g. local file system, Google Cloud Storage,
Azure Blob Store) natively support atomic commits, which prevent concurrent
writers from corrupting the dataset. However, S3 does not support this natively.
To work around this, you may provide a locking mechanism that Lance can use to
lock the table while providing a write. To do so, you should implement a
context manager that acquires and releases a lock and then pass that to the
commit_lock
parameter of lance.write_dataset()
.
Note
In order for the locking mechanism to work, all writers must use the same exact mechanism. Otherwise, Lance will not be able to detect conflicts.
On entering, the context manager should acquire the lock on the table. The table
version being committed is passed in as an argument, which may be used if the
locking service wishes to keep track of the current version of the table, but
this is not required. If the table is already locked by another transaction,
it should wait until it is unlocked, since the other transaction may fail. Once
unlocked, it should either lock the table or, if the lock keeps track of the
current version of the table, return a CommitConflictError
if the
requested version has already been committed.
To prevent poisoned locks, it’s recommended to set a timeout on the locks. That way, if a process crashes while holding the lock, the lock will be released eventually. The timeout should be no less than 30 seconds.
from contextlib import contextmanager
@contextmanager
def commit_lock(version: int);
# Acquire the lock
my_lock.acquire()
try:
yield
except:
failed = True
finally:
my_lock.release()
lance.write_dataset(data, "s3://bucket/path/", commit_lock=commit_lock)
When the context manager is exited, it will raise an exception if the commit failed. This might be because of a network error or if the version has already been written. Either way, the context manager should release the lock. Use a try/finally block to ensure that the lock is released.
Concurrent Writer on S3 using DynamoDB¶
Warning
This feature is experimental at the moment
Lance has native support for concurrent writers on S3 using DynamoDB instead of locking. User may pass in a DynamoDB table name alone with the S3 URI to their dataset to enable this feature.
import lance
# s3+ddb:// URL scheme let's lance know that you want to
# use DynamoDB for writing to S3 concurrently
ds = lance.dataset("s3+ddb://my-bucket/mydataset?ddbTableName=mytable")
The DynamoDB table is expected to have a primary hash key of base_uri
and a range key version
.
The key base_uri
should be string type, and the key version
should be number type.
For details on how this feature works, please see External Manifest Store.
Google Cloud Storage Configuration¶
GCS credentials are configured by setting the GOOGLE_SERVICE_ACCOUNT
environment
variable to the path of a JSON file containing the service account credentials.
Alternatively, you can pass the path to the JSON file in the storage_options
import lance
ds = lance.dataset(
"gs://my-bucket/my-dataset",
storage_options={
"service_account": "path/to/service-account.json",
}
)
Note
By default, GCS uses HTTP/1 for communication, as opposed to HTTP/2. This improves
maximum throughput significantly. However, if you wish to use HTTP/2 for some reason,
you can set the environment variable HTTP1_ONLY
to false
.
The following keys can be used as both environment variables or keys in the
storage_options
parameter:
Key |
Description |
---|---|
|
Path to the service account JSON file. |
|
The serialized service account key. |
|
Path to the application credentials. |
Azure Blob Storage Configuration¶
Azure Blob Storage credentials can be configured by setting the AZURE_STORAGE_ACCOUNT_NAME
and AZURE_STORAGE_ACCOUNT_KEY
environment variables. Alternatively, you can pass
the account name and key in the storage_options
parameter:
import lance
ds = lance.dataset(
"az://my-container/my-dataset",
storage_options={
"account_name": "some-account",
"account_key": "some-key",
}
)
These keys can be used as both environment variables or keys in the storage_options
parameter:
Key |
Description |
---|---|
|
The name of the azure storage account. |
|
The serialized service account key. |
|
Service principal client id for authorizing requests. |
|
Service principal client secret for authorizing requests. |
|
Tenant id used in oauth flows. |
|
Shared access signature. The signature is expected to be percent-encoded, much like they are provided in the azure storage explorer or azure portal. |
|
Bearer token. |
|
Use object store with azurite storage emulator. |
|
Override the endpoint used to communicate with blob storage. |
|
Use object store with url scheme account.dfs.fabric.microsoft.com. |
|
Endpoint to request a imds managed identity token. |
|
Object id for use with managed identity authentication. |
|
Msi resource id for use with managed identity authentication. |
|
File containing token for Azure AD workload identity federation. |
|
Use azure cli for acquiring access token. |
|
Disables tagging objects. This can be desirable if not supported by the backing store. |