Skip to content

Commit

Permalink
Generalized BlockSource to build upon any SeekableReadableChannelSour…
Browse files Browse the repository at this point in the history
…ce (SRCS) rather than just java.nio.Path.

Added initial read tracking for SRCSs.
  • Loading branch information
Aklakan committed Sep 11, 2024
1 parent 3ef7007 commit 6c23e55
Show file tree
Hide file tree
Showing 19 changed files with 578 additions and 254 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,9 @@ protected synchronized void initBackendRequest() throws IOException {
// disposable = (Disposable)iterator;

// TODO Init the reader
Range<Long> range = requestLimit == 0 || requestLimit == Long.MAX_VALUE
? Range.atLeast(requestOffset)
: Range.closedOpen(requestOffset, LongMath.saturatedAdd(requestOffset, requestLimit));
Range<Long> range = requestLimit == 0 || requestLimit == Long.MAX_VALUE
? Range.atLeast(requestOffset)
: Range.closedOpen(requestOffset, LongMath.saturatedAdd(requestOffset, requestLimit));
dataStream = cacheSystem.getDataSource().newReadableChannel(range);

} else {
Expand All @@ -241,23 +241,22 @@ protected synchronized void initBackendRequest() throws IOException {

@Override
public void run() {
// if (offset == 4516) {
// System.out.println("debug point");
// }

try {
checkpoint();
// If the checkpoint offset was not advanced then we reached end of data
if (offset != nextCheckpointOffset) {
runCore();
}

// if (offset == 62997688) {
// System.out.println("debug point");
// }
logger.debug("RangeRequestWorker normal termination at offset " + offset);
} catch (Exception e) {
logger.error("RangeRequestWorker exceptional termination at offset " + offset, e);
if (logger.isDebugEnabled()) {
logger.debug("RangeRequestWorker normal termination at offset " + offset);
}
} catch (Exception | Error e) {
// Catch also e.g. StackOverflowErrors
// FIXME Deadlock if a worker encounters an exception when opening a channel because
// consumers are not notified!
if (logger.isErrorEnabled()) {
logger.error("RangeRequestWorker exceptional termination at offset " + offset, e);
}
throw new RuntimeException(e);
} finally {
close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package org.aksw.commons.io.input;

import com.google.common.collect.Range;
import com.google.common.collect.RangeMap;
import com.google.common.collect.RangeSet;
import com.google.common.collect.TreeRangeMap;
import com.google.common.collect.TreeRangeSet;

public class ChannelMonitor {
public class RangeTracker {
protected long totalDurationNanos;
// protected Set<Integer> readLengths;
protected int minReadLength;
protected int maxReadLength;
// protected long totalReadlength;
protected long readCount;

public RangeTracker(int readLength, long totalDurationNanos) {
this(readLength, readLength, totalDurationNanos, 1);
}

public RangeTracker(int minReadLength, int maxReadLength, long totalDurationNanos, long readCount) {
super();
this.totalDurationNanos = totalDurationNanos;
this.minReadLength = minReadLength;
this.maxReadLength = maxReadLength;
this.readCount = readCount;
}

public long getTotalDurationNanos() {
return totalDurationNanos;
}

public int getMinReadLength() {
return minReadLength;
}

public int getMaxReadLength() {
return maxReadLength;
}

public long getReadCount() {
return readCount;
}

public void add(RangeTracker contrib) {
this.totalDurationNanos += contrib.totalDurationNanos;
this.maxReadLength = Math.max(this.maxReadLength, contrib.maxReadLength);
this.minReadLength = this.minReadLength == -1
? contrib.minReadLength
: Math.min(this.minReadLength, contrib.minReadLength);
++this.readCount;
}

@Override
protected RangeTracker clone() {
return new RangeTracker(minReadLength, maxReadLength, totalDurationNanos, readCount);
}
}

protected RangeMap<Long, RangeTracker> trackedReads;

public ChannelMonitor() {
this.trackedReads = TreeRangeMap.create();
}

public RangeMap<Long, RangeTracker> getTrackedReads() {
return trackedReads;
}

public synchronized void submitReadStats(long readStartPos, long readEndPos, int readLength, long durationNanos) {
if (readLength > 0) { // Skip lengths that are <= 0
RangeTracker contribution = new RangeTracker(readLength, readLength, durationNanos, 1);

Range<Long> span = Range.openClosed(readStartPos, readEndPos);

RangeMap<Long, RangeTracker> subMap = trackedReads.subRangeMap(span);
subMap.asMapOfRanges().values().forEach(tracker -> tracker.add(contribution));

RangeSet<Long> rangeSet = TreeRangeSet.create(subMap.asMapOfRanges().keySet());
RangeSet<Long> gaps = rangeSet.complement().subRangeSet(span);
gaps.asRanges().forEach(range -> {
trackedReads.put(range, contribution.clone());
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package org.aksw.commons.io.input;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.IllegalBlockingModeException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectableChannel;
import java.util.Objects;



/**
* This is {@link sun.nio.ch.ChannelInputStream} without relying on the channel's size() method.
*/
public class InputStreamOverChannel
extends InputStream
{
public static int read(ReadableByteChannel ch, ByteBuffer bb,
boolean block)
throws IOException
{
if (ch instanceof SelectableChannel) {
SelectableChannel sc = (SelectableChannel)ch;
synchronized (sc.blockingLock()) {
boolean bm = sc.isBlocking();
if (!bm)
throw new IllegalBlockingModeException();
if (bm != block)
sc.configureBlocking(block);
int n = ch.read(bb);
if (bm != block)
sc.configureBlocking(bm);
return n;
}
} else {
return ch.read(bb);
}
}

protected final ReadableByteChannel ch;
private ByteBuffer bb = null;
private byte[] bs = null; // Invoker's previous array
private byte[] b1 = null;

public InputStreamOverChannel(ReadableByteChannel ch) {
this.ch = ch;
}

@Override
public synchronized int read() throws IOException {
if (b1 == null)
b1 = new byte[1];
int n = this.read(b1);
if (n == 1)
return b1[0] & 0xff;
return -1;
}

@Override
public synchronized int read(byte[] bs, int off, int len)
throws IOException
{
Objects.checkFromIndexSize(off, len, bs.length);
if (len == 0)
return 0;

ByteBuffer bb = ((this.bs == bs)
? this.bb
: ByteBuffer.wrap(bs));
bb.limit(Math.min(off + len, bb.capacity()));
bb.position(off);
this.bb = bb;
this.bs = bs;
return read(bb);
}

protected int read(ByteBuffer bb)
throws IOException
{
return read(ch, bb, true);
}

@Override
public void close() throws IOException {
ch.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.aksw.commons.io.input;

public interface ReadableChannelBuilder<A, X extends ReadableChannel<A>, B extends ReadableChannelBuilder<A, X, B>> {
B setStart(long start);
B setEnd(long end);
B setAdvertiseBlocks(boolean offOrOn);
X build();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.aksw.commons.io.input;

import java.io.IOException;
import java.util.Objects;

public class SeekableReadableChannelWithMonitor<A, X extends SeekableReadableChannel<A>>
extends SeekableReadableChannelDecoratorBase<A, X>
{
protected ChannelMonitor monitor;

public SeekableReadableChannelWithMonitor(X delegate, ChannelMonitor monitor) {
super(delegate);
this.monitor = Objects.requireNonNull(monitor);
}

@Override
public int read(A array, int position, int length) throws IOException {
// Include positioning in the time so that long times may be discovered
long startTimestamp = System.nanoTime();
long startPos = super.position();
int result = super.read(array, position, length);
long endPos = super.position();
long endTimestamp = System.nanoTime();
long duration = endTimestamp - startTimestamp;
monitor.submitReadStats(startPos, endPos, result, duration);

return result;
}

@Override
public void position(long pos) throws IOException {
super.position(pos);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.aksw.commons.io.input;

import java.io.IOException;
import java.util.Objects;

import org.aksw.commons.io.buffer.array.ArrayOps;

public class SeekableReadableSourceWrapperWithMonitor<A>
implements SeekableReadableChannelSource<A>
{
protected SeekableReadableChannelSource<A> delegate;
protected ChannelMonitor channelMonitor;

public SeekableReadableSourceWrapperWithMonitor(SeekableReadableChannelSource<A> delegate) {
this(delegate, new ChannelMonitor());
}

public SeekableReadableSourceWrapperWithMonitor(SeekableReadableChannelSource<A> delegate,
ChannelMonitor channelMonitor) {
super();
this.delegate = Objects.requireNonNull(delegate);
this.channelMonitor = Objects.requireNonNull(channelMonitor);
}

public SeekableReadableChannelSource<A> getDelegate() {
return delegate;
}

@Override
public long size() throws IOException {
return getDelegate().size();
}

@Override
public ArrayOps<A> getArrayOps() {
return getDelegate().getArrayOps();
}

@Override
public SeekableReadableChannel<A> newReadableChannel() throws IOException {
return wrap(getDelegate().newReadableChannel());
}

@Override
public SeekableReadableChannel<A> newReadableChannel(long start) throws IOException {
return wrap(getDelegate().newReadableChannel(start));
}

// @Override
// public SeekableReadableChannel<A> newReadableChannel(long start, long end) throws IOException {
// return wrap(getDelegate().newReadableChannel(start, end));
// }

protected SeekableReadableChannel<A> wrap(SeekableReadableChannel<A> delegate) {
return new SeekableReadableChannelWithMonitor<>(delegate, channelMonitor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.util.Objects;

import org.aksw.commons.io.buffer.array.ArrayOps;
import org.aksw.commons.io.input.GetPosition;
import org.aksw.commons.io.input.InputStreamOverChannel;
import org.aksw.commons.io.input.SeekableReadableChannel;
import org.aksw.commons.io.input.SeekableReadableChannelBase;
import org.aksw.commons.io.input.SeekableReadableChannels;
Expand Down Expand Up @@ -42,9 +42,9 @@ public static <T extends ReadableByteChannel> SeekableInputStream create(
GetPositionFn<? super T> getPosition,
SetPositionFn<? super T> setPosition
) {

return create(
Channels.newInputStream(channel),
// Channels.newInputStream(channel), relies on size() which is not always implemented
new InputStreamOverChannel(channel),
() -> getPosition.apply(channel),
position -> setPosition.apply(channel, position));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,18 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import org.aksw.commons.io.binseach.BinarySearcher;
import org.aksw.commons.io.input.ReadableChannel;
import org.aksw.commons.io.input.ReadableChannelSupplier;
import org.aksw.commons.io.input.ReadableChannelWithLimitByDelimiter;
import org.aksw.commons.io.input.ReadableChannels;
import org.aksw.commons.io.input.SeekableReadableChannel;
import org.apache.hadoop.io.compress.BZip2Codec;

import com.google.common.base.Stopwatch;
Expand Down Expand Up @@ -137,7 +131,7 @@ public static void main2(String[] args) throws IOException {
// pos = 0;
for (int x = 0; x < 0; ++x) {

BlockSourceChannelAdapter channel = blockSource.newReadableChannel(pos, true);
BlockSourceChannel channel = blockSource.newReadableChannel(pos, true);
try (InputStream in = ReadableChannels.newInputStream(channel)) {
long startBlockId = channel.getStartingBlockId();
// channel.adjustToNextBlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public static void main(String[] args) throws IOException {

Path path = Paths.get("/media/raven/T9/raven/datasets/wikidata/2024-08-24_wikidata-truthy.sorted.nt.bz2");

BlockSource blockSource = BlockSource.of(path, new BZip2Codec());
BlockSource blockSource = BlockSource.of(SeekableReadableChannelSources.of(path), new BZip2Codec());

byte[] data = new byte[10];
// e -> 44580
Expand Down
Loading

0 comments on commit 6c23e55

Please sign in to comment.