Writing and Reading a Dataset Using Spark¶
Attention
The Spark connector is currently an experimental feature undergoing rapid iteration.
In this example, we will read a local iris.csv
file and write it as a Lance dataset using Apache Spark, then demonstrate how to query the dataset.
Preparing the Environment and Raw Dataset¶
Download the Spark binary package from the official website. We recommend downloading Spark 3.5+ for Scala 2.12 (as the Spark connector currently only supports Scala 2.12).
You can directly download Spark 3.5.1 using this link.
Prepare the dataset by downloading iris.csv to your local machine.
Create a Scala file named iris_to_lance_via_spark_shell.scala
and open it.
Reading the Raw Dataset and Writing to a Lance Dataset¶
Add necessary imports and create a Spark session:
import org.apache.spark.sql.types.{StructType, StructField, DoubleType, StringType}
import org.apache.spark.sql.{SparkSession, DataFrame}
import com.lancedb.lance.spark.{LanceConfig, LanceDataSource}
val spark = SparkSession.builder()
.appName("Iris CSV to Lance Converter")
.config("spark.sql.catalog.lance", "com.lancedb.lance.spark.LanceCatalog")
.getOrCreate()
Specifying your input and output path:
val irisPath = "/path/to/your/input/iris.csv"
val outputPath = "/path/to/your/output/iris.lance"
Reading the iris.csv
via the following snippet:
val rawDF = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(irisPath)
rawDF.printSchema()
Preparing the lance schema and write a lance dataset:
val lanceSchema = new StructType()
.add(StructField("sepal_length", DoubleType))
.add(StructField("sepal_width", DoubleType))
.add(StructField("petal_length", DoubleType))
.add(StructField("petal_width", DoubleType))
.add(StructField("species", StringType))
val lanceDF = spark.createDataFrame(rawDF.rdd, lanceSchema)
lanceDF.write
.format(LanceDataSource.name)
.option(LanceConfig.CONFIG_DATASET_URI, outputPath)
.save()
Reading a Lance dataset¶
After writing the dataset, we can read it back and examine its properties:
val lanceDF = spark.read
.format("lance")
.option(LanceConfig.CONFIG_DATASET_URI, outputPath)
.load()
println(s"The total count: ${lanceDF.count()}")
lanceDF.printSchema()
println("\n The top 5 data:")
lanceDF.show(5, truncate = false)
println("\n Species distribution statistics:")
lanceDF.groupBy("species").count().show()
First, we open the dataset and count the total rows. Then we print the dataset schema. Finally, we analyze the species distribution statistics.
Running the Spark Application¶
To execute the application, download these dependencies:
lance-core JAR: Core Rust Spark binding exposing Lance features to Java (available here)
lance-spark JAR: Spark connector for reading/writing Lance format (available here)
jar-jni JAR: Load JNI dependencies embedded within a JAR file (available here)
arrow-c-data JAR: Java implementation of C Data Interface (available here)
arrow-dataset JAR: Java implementation of Arrow Dataset API/Framework (available here)
Place these JARs in the ${SPARK_HOME}/jars
directory, then run:
./bin/spark-shell --jars ./jars/lance-core-0.23.0.jar,./jars/lance-spark-0.23.0.jar,./jars/jar-jni-1.1.1.jar,./jars/arrow-c-data-12.0.1.jar,./jars/arrow-dataset-12.0.1.jar -i ./iris_to_lance_via_spark_shell.scala
It should be work! Have fun!