diff --git a/src/main/java/com/clearspring/analytics/stream/cardinality/CountThenEstimate.java b/src/main/java/com/clearspring/analytics/stream/cardinality/CountThenEstimate.java index 32ff84c3e..e1dd7d179 100644 --- a/src/main/java/com/clearspring/analytics/stream/cardinality/CountThenEstimate.java +++ b/src/main/java/com/clearspring/analytics/stream/cardinality/CountThenEstimate.java @@ -183,7 +183,7 @@ public boolean tipped() } @Override - public byte[] getBytes() throws IOException + public byte[] getBytes() { return ExternalizableUtil.toBytes(this); } diff --git a/src/main/java/com/clearspring/analytics/stream/cardinality/HyperLogLog.java b/src/main/java/com/clearspring/analytics/stream/cardinality/HyperLogLog.java index 588aad749..8f17d9a5e 100644 --- a/src/main/java/com/clearspring/analytics/stream/cardinality/HyperLogLog.java +++ b/src/main/java/com/clearspring/analytics/stream/cardinality/HyperLogLog.java @@ -16,17 +16,13 @@ package com.clearspring.analytics.stream.cardinality; +import java.io.Serializable; +import java.nio.ByteBuffer; + import com.clearspring.analytics.hash.MurmurHash; import com.clearspring.analytics.util.Bits; import com.clearspring.analytics.util.IBuilder; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.Serializable; - /** * Java implementation of HyperLogLog (HLL) algorithm from this paper: *

@@ -215,20 +211,17 @@ public int sizeof() return registerSet.size * 4; } - @Override - public byte[] getBytes() throws IOException - { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(baos); - dos.writeInt(log2m); - dos.writeInt(registerSet.size * 4); + @Override + public byte[] getBytes() { + ByteBuffer bb = ByteBuffer.allocate(Bits.INT_BYTES * (1+1+registerSet.size)); + bb.putInt(log2m); + bb.putInt(registerSet.size * Bits.INT_BYTES); for (int x : registerSet.bits()) { - dos.writeInt(x); + bb.putInt(x); } - - return baos.toByteArray(); + return bb.array(); } @Override @@ -283,15 +276,13 @@ public int sizeof() return RegisterSet.getBits(k) * 4; } - public static HyperLogLog build(byte[] bytes) throws IOException + public static HyperLogLog build(byte[] bytes) { - ByteArrayInputStream bais = new ByteArrayInputStream(bytes); - DataInputStream oi = new DataInputStream(bais); - int log2m = oi.readInt(); - int size = oi.readInt(); - byte[] longArrayBytes = new byte[size]; - oi.readFully(longArrayBytes); - return new HyperLogLog(log2m, new RegisterSet((int) Math.pow(2, log2m), Bits.getBits(longArrayBytes))); + ByteBuffer bb = ByteBuffer.wrap(bytes); + int log2m = bb.getInt(); + int size = bb.getInt(); + int [] regSetDump = Bits.getBits(bb, size); + return new HyperLogLog(log2m, new RegisterSet((int) Math.pow(2, log2m), regSetDump)); } } diff --git a/src/main/java/com/clearspring/analytics/stream/cardinality/HyperLogLogPlus.java b/src/main/java/com/clearspring/analytics/stream/cardinality/HyperLogLogPlus.java index 51de1a6de..fbbf21f12 100644 --- a/src/main/java/com/clearspring/analytics/stream/cardinality/HyperLogLogPlus.java +++ b/src/main/java/com/clearspring/analytics/stream/cardinality/HyperLogLogPlus.java @@ -22,6 +22,7 @@ import com.clearspring.analytics.util.Varint; import java.io.*; +import java.nio.ByteBuffer; import java.util.*; @@ -820,36 +821,47 @@ public int sizeof() } @Override - public byte[] getBytes() throws IOException + public byte[] getBytes() { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(baos); - - dos.writeInt(p); - dos.writeInt(sp); + ByteBuffer bb = null; + switch (format) { case NORMAL: - dos.writeInt(0); - dos.writeInt(registerSet.size * 4); + int bbSize = Bits.INT_BYTES * (1+1+1+1+registerSet.size); + bb = ByteBuffer.allocate(bbSize); + bb.putInt(p); + bb.putInt(sp); + bb.putInt(0); + bb.putInt((Integer.SIZE/8) * registerSet.size); for (int x : registerSet.bits()) { - dos.writeInt(x); + bb.putInt(x); } break; case SPARSE: - dos.writeInt(1); mergeTempList(); + + int sparseByteCount = 0; + for (byte[] bytes: sparseSet) + { + sparseByteCount += Bits.INT_BYTES * 1 + bytes.length; + } + + bb = ByteBuffer.allocate(Bits.INT_BYTES * (1+1+1+1) + sparseByteCount); + bb.putInt(p); + bb.putInt(sp); + bb.putInt(1); for (byte[] bytes : sparseSet) { - dos.writeInt(bytes.length); - dos.write(bytes); + bb.putInt(bytes.length); + bb.put(bytes); } - dos.writeInt(-1); + bb.putInt(-1); break; } - return baos.toByteArray(); + return bb.array(); } /** @@ -1009,19 +1021,18 @@ public int sizeof() return RegisterSet.getBits(k) * 5; } - public static HyperLogLogPlus build(byte[] bytes) throws IOException + public static HyperLogLogPlus build(byte[] bytes) { - ByteArrayInputStream bais = new ByteArrayInputStream(bytes); - DataInputStream oi = new DataInputStream(bais); - int p = oi.readInt(); - int sp = oi.readInt(); - int formatType = oi.readInt(); + ByteBuffer bb = ByteBuffer.wrap(bytes); + + int p = bb.getInt(); + int sp = bb.getInt(); + int formatType = bb.getInt(); if (formatType == 0) { - int size = oi.readInt(); - byte[] longArrayBytes = new byte[size]; - oi.readFully(longArrayBytes); - HyperLogLogPlus hyperLogLogPlus = new HyperLogLogPlus(p, sp, new RegisterSet((int) Math.pow(2, p), Bits.getBits(longArrayBytes))); + int size = bb.getInt(); + int [] regSetDump = Bits.getBits(bb, size); + HyperLogLogPlus hyperLogLogPlus = new HyperLogLogPlus(p, sp, new RegisterSet((int) Math.pow(2, p), regSetDump)); hyperLogLogPlus.format = Format.NORMAL; return hyperLogLogPlus; } @@ -1029,10 +1040,10 @@ public static HyperLogLogPlus build(byte[] bytes) throws IOException { int l; List rehydratedSet = new ArrayList(); - while ((l = oi.readInt()) > 0) + while ((l = bb.getInt()) > 0) { byte[] longArrayBytes = new byte[l]; - oi.read(longArrayBytes, 0, l); + bb.get(longArrayBytes, 0, l); rehydratedSet.add(longArrayBytes); } HyperLogLogPlus hyperLogLogPlus = new HyperLogLogPlus(p, sp, rehydratedSet); diff --git a/src/main/java/com/clearspring/analytics/stream/cardinality/ICardinality.java b/src/main/java/com/clearspring/analytics/stream/cardinality/ICardinality.java index 111c4a91f..5cf5ff43b 100644 --- a/src/main/java/com/clearspring/analytics/stream/cardinality/ICardinality.java +++ b/src/main/java/com/clearspring/analytics/stream/cardinality/ICardinality.java @@ -57,7 +57,7 @@ public interface ICardinality * @return * @throws IOException */ - byte[] getBytes() throws IOException; + byte[] getBytes(); /** * Merges estimators to produce an estimator for the combined streams diff --git a/src/main/java/com/clearspring/analytics/util/Bits.java b/src/main/java/com/clearspring/analytics/util/Bits.java index 10c029cbe..14cdc1d7c 100644 --- a/src/main/java/com/clearspring/analytics/util/Bits.java +++ b/src/main/java/com/clearspring/analytics/util/Bits.java @@ -16,21 +16,19 @@ package com.clearspring.analytics.util; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.IOException; +import java.nio.ByteBuffer; public class Bits { - public static int[] getBits(byte[] mBytes) throws IOException + public static final int INT_BYTES = Integer.SIZE / 8; + + public static int [] getBits(ByteBuffer bb, int size) { - int bitSize = mBytes.length / 4; - int[] bits = new int[bitSize]; - DataInputStream dis = new DataInputStream(new ByteArrayInputStream(mBytes)); - for (int i = 0; i < bitSize; i++) - { - bits[i] = dis.readInt(); + int count = size / (Integer.SIZE/8); + int [] bits = new int[count]; + for (int i = 0; i < count; i++) { + bits[i] = bb.getInt(); } return bits; } diff --git a/src/main/java/com/clearspring/analytics/util/ExternalizableUtil.java b/src/main/java/com/clearspring/analytics/util/ExternalizableUtil.java index 33077ab4b..236b8d179 100644 --- a/src/main/java/com/clearspring/analytics/util/ExternalizableUtil.java +++ b/src/main/java/com/clearspring/analytics/util/ExternalizableUtil.java @@ -7,12 +7,17 @@ public class ExternalizableUtil { - public static byte[] toBytes(Externalizable o) throws IOException + public static byte[] toBytes(Externalizable o) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream out = new ObjectOutputStream(baos); - o.writeExternal(out); - out.flush(); - return baos.toByteArray(); + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(baos); + o.writeExternal(out); + out.flush(); + return baos.toByteArray(); + } catch (IOException e) { + // ByteArrayOutputStream does not throw IOException + throw new IllegalStateException(e); + } } }