diff --git a/java/spark/src/main/java/com/lancedb/lance/spark/SparkOptions.java b/java/spark/src/main/java/com/lancedb/lance/spark/SparkOptions.java index 590e584573..2f54e0a53e 100644 --- a/java/spark/src/main/java/com/lancedb/lance/spark/SparkOptions.java +++ b/java/spark/src/main/java/com/lancedb/lance/spark/SparkOptions.java @@ -23,9 +23,11 @@ public class SparkOptions { private static final String ak = "access_key_id"; private static final String sk = "secret_access_key"; - private static final String endpoint = "aws_region"; - private static final String region = "aws_endpoint"; + private static final String endpoint = "aws_endpoint"; + private static final String region = "aws_region"; private static final String virtual_hosted_style = "virtual_hosted_style_request"; + private static final String allow_http = "allow_http"; + private static final String block_size = "block_size"; private static final String version = "version"; private static final String index_cache_size = "index_cache_size"; @@ -82,9 +84,16 @@ private static Map genStorageOptions(LanceConfig config) { storageOptions.put(ak, maps.get(ak)); storageOptions.put(sk, maps.get(sk)); storageOptions.put(endpoint, maps.get(endpoint)); + } + if (maps.containsKey(region)) { storageOptions.put(region, maps.get(region)); + } + if (maps.containsKey(virtual_hosted_style)) { storageOptions.put(virtual_hosted_style, maps.get(virtual_hosted_style)); } + if (maps.containsKey(allow_http)) { + storageOptions.put(allow_http, maps.get(allow_http)); + } return storageOptions; } diff --git a/java/spark/src/main/java/com/lancedb/lance/spark/write/LanceArrowWriter.java b/java/spark/src/main/java/com/lancedb/lance/spark/write/LanceArrowWriter.java index e272b36f1d..9ddda82e7d 100644 --- a/java/spark/src/main/java/com/lancedb/lance/spark/write/LanceArrowWriter.java +++ b/java/spark/src/main/java/com/lancedb/lance/spark/write/LanceArrowWriter.java @@ -24,8 +24,6 @@ import javax.annotation.concurrent.GuardedBy; import java.io.IOException; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -34,10 +32,6 @@ public class LanceArrowWriter extends ArrowReader { private final Schema schema; private final int batchSize; - private final Object monitor = new Object(); - - @GuardedBy("monitor") - private final Queue rowQueue = new ConcurrentLinkedQueue<>(); @GuardedBy("monitor") private volatile boolean finished; @@ -53,7 +47,6 @@ public LanceArrowWriter(BufferAllocator allocator, Schema schema, int batchSize) Preconditions.checkNotNull(schema); Preconditions.checkArgument(batchSize > 0); this.schema = schema; - // TODO(lu) batch size as config? this.batchSize = batchSize; this.writeToken = new Semaphore(0); this.loadToken = new Semaphore(0); diff --git a/java/spark/src/main/java/com/lancedb/lance/spark/write/LanceDataWriter.java b/java/spark/src/main/java/com/lancedb/lance/spark/write/LanceDataWriter.java index 4e73599676..28a71d39f5 100644 --- a/java/spark/src/main/java/com/lancedb/lance/spark/write/LanceDataWriter.java +++ b/java/spark/src/main/java/com/lancedb/lance/spark/write/LanceDataWriter.java @@ -41,7 +41,6 @@ private LanceDataWriter( LanceArrowWriter arrowWriter, FutureTask> fragmentCreationTask, Thread fragmentCreationThread) { - // TODO support write to multiple fragments this.arrowWriter = arrowWriter; this.fragmentCreationThread = fragmentCreationThread; this.fragmentCreationTask = fragmentCreationTask; diff --git a/java/spark/src/test/java/com/lancedb/lance/spark/read/SparkConnectorReadTest.java b/java/spark/src/test/java/com/lancedb/lance/spark/read/SparkConnectorReadTest.java index 1d85049f92..1b3bbd372c 100644 --- a/java/spark/src/test/java/com/lancedb/lance/spark/read/SparkConnectorReadTest.java +++ b/java/spark/src/test/java/com/lancedb/lance/spark/read/SparkConnectorReadTest.java @@ -162,6 +162,13 @@ public void filterSelect() { .collect(Collectors.toList())); } - // TODO(lu) support spark.read().format("lance") - // .load(dbPath.resolve(datasetName).toString()); + @Test + public void supportDataSourceLoadPath() { + Dataset df = + spark + .read() + .format("lance") + .load(LanceConfig.getDatasetUri(dbPath, TestUtils.TestTable1Config.datasetName)); + validateData(df, TestUtils.TestTable1Config.expectedValues); + } }