Skip to content

Commit

Permalink
Merge pull request #435 from apache/fix_theta_compressed_stream
Browse files Browse the repository at this point in the history
fixed compressed theta stream serialization
  • Loading branch information
AlexanderSaydakov authored Jul 26, 2024
2 parents c5283ad + 6be246d commit dba8394
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 1 deletion.
1 change: 1 addition & 0 deletions theta/include/theta_sketch_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ void compact_theta_sketch_alloc<A>::serialize_version_4(std::ostream& os) const
previous = entries_[i];
offset = pack_bits(delta, entry_bits, ptr, offset);
}
if (offset > 0) ++ptr;
write(os, buffer.data(), ptr - buffer.data());
}
}
Expand Down
2 changes: 1 addition & 1 deletion theta/test/theta_sketch_serialize_for_java.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ TEST_CASE("theta sketch generate compressed", "[serialize_for_java]") {
REQUIRE_FALSE(sketch.is_empty());
REQUIRE(sketch.get_estimate() == Approx(n).margin(n * 0.03));
std::ofstream os("theta_compressed_n" + std::to_string(n) + "_cpp.sk", std::ios::binary);
sketch.compact().serialize(os);
sketch.compact().serialize_compressed(os);
}
}

Expand Down
41 changes: 41 additions & 0 deletions theta/test/theta_sketch_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,47 @@ TEST_CASE("theta sketch: wrap compact v2 estimation from java", "[theta_sketch]"
}
}

TEST_CASE("theta sketch: serialize deserialize small compressed", "[theta_sketch]") {
auto update_sketch = update_theta_sketch::builder().build();
for (int i = 0; i < 10; i++) update_sketch.update(i);
auto compact_sketch = update_sketch.compact();

auto bytes = compact_sketch.serialize_compressed();
REQUIRE(bytes.size() == compact_sketch.get_serialized_size_bytes(true));
{ // deserialize bytes
auto deserialized_sketch = compact_theta_sketch::deserialize(bytes.data(), bytes.size());
REQUIRE(deserialized_sketch.get_num_retained() == compact_sketch.get_num_retained());
REQUIRE(deserialized_sketch.get_theta() == compact_sketch.get_theta());
auto iter = deserialized_sketch.begin();
for (const auto key: compact_sketch) {
REQUIRE(*iter == key);
++iter;
}
}
{ // wrap bytes
auto wrapped_sketch = wrapped_compact_theta_sketch::wrap(bytes.data(), bytes.size());
REQUIRE(wrapped_sketch.get_num_retained() == compact_sketch.get_num_retained());
REQUIRE(wrapped_sketch.get_theta() == compact_sketch.get_theta());
auto iter = wrapped_sketch.begin();
for (const auto key: compact_sketch) {
REQUIRE(*iter == key);
++iter;
}
}

std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
compact_sketch.serialize_compressed(s);
REQUIRE(static_cast<size_t>(s.tellp()) == compact_sketch.get_serialized_size_bytes(true));
auto deserialized_sketch = compact_theta_sketch::deserialize(s);
REQUIRE(deserialized_sketch.get_num_retained() == compact_sketch.get_num_retained());
REQUIRE(deserialized_sketch.get_theta() == compact_sketch.get_theta());
auto iter = deserialized_sketch.begin();
for (const auto key: compact_sketch) {
REQUIRE(*iter == key);
++iter;
}
}

TEST_CASE("theta sketch: serialize deserialize compressed", "[theta_sketch]") {
auto update_sketch = update_theta_sketch::builder().build();
for (int i = 0; i < 10000; i++) update_sketch.update(i);
Expand Down

0 comments on commit dba8394

Please sign in to comment.