Skip to content

Commit

Permalink
chore(java): remove some supported TODO and add allow_http for storag…
Browse files Browse the repository at this point in the history
…e_option (#3288)

In this pr, I did:
1. remove some supported TODOs
2. add allow_http for storage_option
3. remove some unused code
  • Loading branch information
SaintBacchus authored Dec 24, 2024
1 parent 877b018 commit c6fcb31
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 12 deletions.
13 changes: 11 additions & 2 deletions java/spark/src/main/java/com/lancedb/lance/spark/SparkOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
public class SparkOptions {
private static final String ak = "access_key_id";
private static final String sk = "secret_access_key";
private static final String endpoint = "aws_region";
private static final String region = "aws_endpoint";
private static final String endpoint = "aws_endpoint";
private static final String region = "aws_region";
private static final String virtual_hosted_style = "virtual_hosted_style_request";
private static final String allow_http = "allow_http";

private static final String block_size = "block_size";
private static final String version = "version";
private static final String index_cache_size = "index_cache_size";
Expand Down Expand Up @@ -82,9 +84,16 @@ private static Map<String, String> genStorageOptions(LanceConfig config) {
storageOptions.put(ak, maps.get(ak));
storageOptions.put(sk, maps.get(sk));
storageOptions.put(endpoint, maps.get(endpoint));
}
if (maps.containsKey(region)) {
storageOptions.put(region, maps.get(region));
}
if (maps.containsKey(virtual_hosted_style)) {
storageOptions.put(virtual_hosted_style, maps.get(virtual_hosted_style));
}
if (maps.containsKey(allow_http)) {
storageOptions.put(allow_http, maps.get(allow_http));
}
return storageOptions;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import javax.annotation.concurrent.GuardedBy;

import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -34,10 +32,6 @@
public class LanceArrowWriter extends ArrowReader {
private final Schema schema;
private final int batchSize;
private final Object monitor = new Object();

@GuardedBy("monitor")
private final Queue<InternalRow> rowQueue = new ConcurrentLinkedQueue<>();

@GuardedBy("monitor")
private volatile boolean finished;
Expand All @@ -53,7 +47,6 @@ public LanceArrowWriter(BufferAllocator allocator, Schema schema, int batchSize)
Preconditions.checkNotNull(schema);
Preconditions.checkArgument(batchSize > 0);
this.schema = schema;
// TODO(lu) batch size as config?
this.batchSize = batchSize;
this.writeToken = new Semaphore(0);
this.loadToken = new Semaphore(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ private LanceDataWriter(
LanceArrowWriter arrowWriter,
FutureTask<List<FragmentMetadata>> fragmentCreationTask,
Thread fragmentCreationThread) {
// TODO support write to multiple fragments
this.arrowWriter = arrowWriter;
this.fragmentCreationThread = fragmentCreationThread;
this.fragmentCreationTask = fragmentCreationTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,13 @@ public void filterSelect() {
.collect(Collectors.toList()));
}

// TODO(lu) support spark.read().format("lance")
// .load(dbPath.resolve(datasetName).toString());
@Test
public void supportDataSourceLoadPath() {
Dataset<Row> df =
spark
.read()
.format("lance")
.load(LanceConfig.getDatasetUri(dbPath, TestUtils.TestTable1Config.datasetName));
validateData(df, TestUtils.TestTable1Config.expectedValues);
}
}

0 comments on commit c6fcb31

Please sign in to comment.