diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java index 56612adc1ef80..2431e61c91cab 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java @@ -107,7 +107,7 @@ private FileSystemManagedLedgerOffloader(OffloadPoliciesImpl conf, OrderedSchedu this.configuration.setClassLoader(FileSystemLedgerOffloaderFactory.class.getClassLoader()); this.driverName = conf.getManagedLedgerOffloadDriver(); - this.storageBasePath = configuration.get("hadoop.tmp.dir"); + this.storageBasePath = configuration.get("fs.defaultFS"); this.scheduler = scheduler; this.fileSystem = FileSystem.get(configuration); this.assignmentScheduler = OrderedScheduler.newSchedulerBuilder() diff --git a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemOffloaderLocalFileTest.java b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemOffloaderLocalFileTest.java new file mode 100644 index 0000000000000..14734b3faca99 --- /dev/null +++ b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemOffloaderLocalFileTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.offload.filesystem.impl; + +import static org.testng.Assert.assertEquals; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.UUID; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.PulsarMockBookKeeper; +import org.apache.bookkeeper.client.api.DigestType; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.mledger.LedgerOffloaderStats; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; +import org.testng.annotations.Test; + +public class FileSystemOffloaderLocalFileTest { + private OrderedScheduler scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build(); + private LedgerOffloaderStats offloaderStats = LedgerOffloaderStats.create(true, true, scheduler, 60); + + + private String getResourceFilePath(String name) { + return getClass().getClassLoader().getResource(name).getPath(); + } + + @Test + public void testReadWriteWithLocalFileUsingFileSystemURI() throws Exception { + // prepare the offload policies + final String basePath = "/tmp"; + OffloadPoliciesImpl offloadPolicies = new OffloadPoliciesImpl(); + offloadPolicies.setFileSystemURI("file://" + basePath); + offloadPolicies.setManagedLedgerOffloadDriver("filesystem"); + offloadPolicies.setFileSystemProfilePath(getResourceFilePath("filesystem_offload_core_site.xml")); + + // initialize the offloader with the offload policies + var offloader = FileSystemManagedLedgerOffloader.create(offloadPolicies, scheduler, offloaderStats); + + int numberOfEntries = 100; + + // prepare the data in bookkeeper + BookKeeper bk = new PulsarMockBookKeeper(scheduler); + LedgerHandle lh = bk.createLedger(1,1,1, BookKeeper.DigestType.CRC32, "".getBytes()); + for (int i = 0; i < numberOfEntries; i++) { + byte[] entry = ("foobar"+i).getBytes(); + lh.addEntry(entry); + } + lh.close(); + + ReadHandle read = bk.newOpenLedgerOp() + .withLedgerId(lh.getId()) + .withDigestType(DigestType.CRC32) + .withPassword("".getBytes()).execute().get(); + + final String mlName = TopicName.get("testWriteLocalFIle").getPersistenceNamingEncoding(); + Map offloadDriverMetadata = new HashMap<>(); + offloadDriverMetadata.put("ManagedLedgerName", mlName); + + UUID uuid = UUID.randomUUID(); + offloader.offload(read, uuid, offloadDriverMetadata).get(); + ReadHandle toTest = offloader.readOffloaded(read.getId(), uuid, offloadDriverMetadata).get(); + assertEquals(toTest.getLastAddConfirmed(), read.getLastAddConfirmed()); + LedgerEntries toTestEntries = toTest.read(0, numberOfEntries - 1); + LedgerEntries toWriteEntries = read.read(0,numberOfEntries - 1); + Iterator toTestIter = toTestEntries.iterator(); + Iterator toWriteIter = toWriteEntries.iterator(); + while(toTestIter.hasNext()) { + LedgerEntry toWriteEntry = toWriteIter.next(); + LedgerEntry toTestEntry = toTestIter.next(); + + assertEquals(toWriteEntry.getLedgerId(), toTestEntry.getLedgerId()); + assertEquals(toWriteEntry.getEntryId(), toTestEntry.getEntryId()); + assertEquals(toWriteEntry.getLength(), toTestEntry.getLength()); + assertEquals(toWriteEntry.getEntryBuffer(), toTestEntry.getEntryBuffer()); + } + toTestEntries = toTest.read(1, numberOfEntries - 1); + toWriteEntries = read.read(1,numberOfEntries - 1); + toTestIter = toTestEntries.iterator(); + toWriteIter = toWriteEntries.iterator(); + while(toTestIter.hasNext()) { + LedgerEntry toWriteEntry = toWriteIter.next(); + LedgerEntry toTestEntry = toTestIter.next(); + + assertEquals(toWriteEntry.getLedgerId(), toTestEntry.getLedgerId()); + assertEquals(toWriteEntry.getEntryId(), toTestEntry.getEntryId()); + assertEquals(toWriteEntry.getLength(), toTestEntry.getLength()); + assertEquals(toWriteEntry.getEntryBuffer(), toTestEntry.getEntryBuffer()); + } + + // check the file located in the local file system + Path offloadedFilePath = Paths.get(basePath, mlName); + assertEquals(Files.exists(offloadedFilePath), true); + } +} diff --git a/tiered-storage/file-system/src/test/resources/filesystem_offload_core_site.xml b/tiered-storage/file-system/src/test/resources/filesystem_offload_core_site.xml new file mode 100644 index 0000000000000..d26cec2cc60f0 --- /dev/null +++ b/tiered-storage/file-system/src/test/resources/filesystem_offload_core_site.xml @@ -0,0 +1,48 @@ + + + + + fs.defaultFS + + + + hadoop.tmp.dir + pulsar + + + io.file.buffer.size + 4096 + + + io.seqfile.compress.blocksize + 1000000 + + + io.seqfile.compression.type + BLOCK + + + io.map.index.interval + 128 + + +