Skip to content

Commit

Permalink
Implement start-stop iterator test
Browse files Browse the repository at this point in the history
  • Loading branch information
ben221199 committed Sep 21, 2024
1 parent 047df3a commit dd15a01
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 49 deletions.
2 changes: 1 addition & 1 deletion src/main/java/com/lbry/database/BasePrefixDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void unsafeCommit() throws RocksDBException {
this.applyStash();
WriteOptions writeOptions = new WriteOptions().setSync(true);
try{
if(this.operationStack.length()!=0){
if(this.operationStack.length()==0){
return;
}
WriteBatch batch = new WriteBatch();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -54,6 +53,7 @@ public void stashOperations(RevertibleOperation[] operations){
}

public void validateAndApplyStashedOperations(){
// System.err.println("STASH = "+this.stash);
if(this.stash.isEmpty()){
return;
}
Expand All @@ -63,14 +63,10 @@ public void validateAndApplyStashedOperations(){

while(!this.stash.isEmpty()){
RevertibleOperation operation = this.stash.pollFirst();
RevertibleOperation[] operationArr = null;
for(Map.Entry<byte[],RevertibleOperation[]> e : this.items.entrySet()){
if(Arrays.equals(e.getKey(),operation.getKey())){
operationArr = e.getValue();
}
}

RevertibleOperation[] operationArr = MapHelper.getValue(this.items,operation.key);
if(operationArr!=null && operationArr.length>=1 && operation.invert().equals(operationArr[operationArr.length-1])){
this.items.put(operationArr[0].getKey(),Arrays.copyOfRange(operationArr,0,operationArr.length-1));
this.items.replace(operationArr[0].getKey(),Arrays.copyOfRange(operationArr,0,operationArr.length-1));
continue;
}
if(operationArr!=null && operationArr.length>=1 && operation.equals(operationArr[operationArr.length-1])){
Expand All @@ -82,50 +78,39 @@ public void validateAndApplyStashedOperations(){
}

Map<byte[],byte[]> existing = new HashMap<>();
if(this.enforceIntegrity && !uniqueKeys.isEmpty()){
List<byte[]> uniqueKeysList = new ArrayList<>(uniqueKeys);
for(int idx=0;idx<uniqueKeys.size();idx+=10000){
List<byte[]> batch = uniqueKeysList.subList(idx,Math.min(uniqueKeysList.size(),idx+10000));
Iterator<Optional<byte[]>> iterator = this.multiGet.apply(batch).iterator();
for(byte[] k : batch){
byte[] v = iterator.next().get();
existing.put(k,v);
}

}
}
// if(this.enforceIntegrity && !uniqueKeys.isEmpty()){
// List<byte[]> uniqueKeysList = new ArrayList<>(uniqueKeys);
// for(int idx=0;idx<uniqueKeys.size();idx+=10000){
// List<byte[]> batch = uniqueKeysList.subList(idx,Math.min(uniqueKeysList.size(),idx+10000));
// Iterator<Optional<byte[]>> iterator = this.multiGet.apply(batch).iterator();
// for(byte[] k : batch){
// byte[] v = iterator.next().get();
// System.err.println(new RevertiblePut(k,v));
// existing.put(k,v);
// }
//
// }
// }

for(RevertibleOperation operation : needAppend){
RevertibleOperation[] operationArr = null;
for(Map.Entry<byte[],RevertibleOperation[]> e : this.items.entrySet()){
if(Arrays.equals(e.getKey(),operation.getKey())){
operationArr = e.getValue();
}
}

RevertibleOperation[] operationArr = MapHelper.getValue(this.items,operation.key);
// System.err.println("@ "+operation+ " vs "+Arrays.toString(operationArr));
if(operationArr!=null && operationArr.length>=1 && operationArr[operationArr.length-1].equals(operation)){
this.items.put(operationArr[0].getKey(),Arrays.copyOfRange(operationArr,0,operationArr.length-1));
RevertibleOperation[] operationArrX = null;
for(Map.Entry<byte[],RevertibleOperation[]> e : this.items.entrySet()){
if(Arrays.equals(e.getKey(),operation.getKey())){
operationArrX = e.getValue();
}
RevertibleOperation[] operationArr2 = MapHelper.getValue(this.items,operation.getKey());
List<RevertibleOperation> operationList = Arrays.asList(operationArr2!=null?operationArr2:new RevertibleOperation[0]);
if(!operationList.isEmpty()){
operationList.remove(operationList.size()-1);
}
if(operationArrX==null || operationArrX.length==0){
this.items.remove(operation.getKey());
if(operationList.isEmpty()){
MapHelper.remove(this.items,operation.getKey());
continue;
}
}
if(!this.enforceIntegrity){
RevertibleOperation[] operationArrX = null;
for(Map.Entry<byte[],RevertibleOperation[]> e : this.items.entrySet()){
if(Arrays.equals(e.getKey(),operation.getKey())){
operationArrX = e.getValue();
}
}

RevertibleOperation[] newArr = new RevertibleOperation[operationArrX==null?1:operationArrX.length+1];
newArr[newArr.length-1] = operation;
this.items.put(newArr[0].getKey(),newArr);
RevertibleOperation[] operationArr2 = MapHelper.getValue(this.items,operation.getKey());
List<RevertibleOperation> operationList = Arrays.asList(operationArr2!=null?operationArr2:new RevertibleOperation[0]);
operationList.add(operation);
this.items.put(operationList.get(0).getKey(),operationList.toArray(new RevertibleOperation[0]));
}

RevertibleOperation[] operationArrX = null;
Expand All @@ -135,7 +120,8 @@ public void validateAndApplyStashedOperations(){
}
}

byte[] storedValue = existing.get(operation.getKey());
byte[] storedValue = MapHelper.getValue(existing,operation.getKey());
// System.err.println(operation);
boolean hasStoredValue = storedValue!=null;
RevertibleOperation deleteStoredOperation = hasStoredValue?new RevertibleDelete(operation.getKey(),storedValue):null;
boolean deleteStoredOperationInOperationList = false;
Expand Down
85 changes: 83 additions & 2 deletions src/test/java/com/lbry/database/tests/RevertablePrefixDBTest.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,31 @@
package com.lbry.database.tests;

import com.lbry.database.PrefixDB;
import com.lbry.database.keys.ActiveAmountKey;
import com.lbry.database.keys.ClaimTakeoverKey;
import com.lbry.database.revert.RevertibleOperation;
import com.lbry.database.revert.RevertiblePut;
import com.lbry.database.util.ArrayHelper;
import com.lbry.database.values.ActiveAmountValue;
import com.lbry.database.values.ClaimTakeoverValue;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.function.BiFunction;
import java.util.function.Function;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Slice;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand All @@ -28,7 +39,6 @@ public class RevertablePrefixDBTest{
@BeforeAll
public void setUp() throws IOException,RocksDBException{
this.tmpDir = Files.createTempDirectory("tmp").toFile();
System.err.println(this.tmpDir);
this.database = new PrefixDB(this.tmpDir.getAbsolutePath(),32);
}

Expand Down Expand Up @@ -148,6 +158,77 @@ public void testRollback() throws RocksDBException{
public void testHubDatabaseIterator(){}

@Test
public void testHubDatabaseIteratorStartStop(){}
public void testHubDatabaseIteratorStartStop() throws RocksDBException{
int txNum = 101;

for(int x=0;x<255;x++){
byte[] claimHash = ArrayHelper.fill(new byte[20],(byte) x);
final int txNumInner = txNum;
this.database.active_amount.stashPut(new ActiveAmountKey(){{
this.claim_hash = claimHash;
this.txo_type = 1;
this.activation_height = 200;
this.tx_num = txNumInner;
this.position = 1;
}},new ActiveAmountValue(){{
this.amount = 100000;
}});
this.database.active_amount.stashPut(new ActiveAmountKey(){{
this.claim_hash = claimHash;
this.txo_type = 1;
this.activation_height = 201;
this.tx_num = txNumInner+1;
this.position = 1;
}},new ActiveAmountValue(){{
this.amount = 200000;
}});
this.database.active_amount.stashPut(new ActiveAmountKey(){{
this.claim_hash = claimHash;
this.txo_type = 1;
this.activation_height = 202;
this.tx_num = txNumInner+2;
this.position = 1;
}},new ActiveAmountValue(){{
this.amount = 300000;
}});
txNum += 3;
}
this.database.unsafeCommit();

BiFunction<byte[],Integer,Long> getActiveAmountAsOfHeight = (claimHash,height) -> {
try{
ReadOptions readOptions = new ReadOptions().setPrefixSameAsStart(true).setTotalOrderSeek(true);
RocksIterator iterator = this.database.active_amount.iterate(readOptions);
iterator.seek(ByteBuffer.allocate(1+20+1+4).order(ByteOrder.BIG_ENDIAN).put(this.database.active_amount.prefix().getValue()).put(claimHash).put((byte) 1).putInt(0));
byte[] stop = ByteBuffer.allocate(1+20+1+4).order(ByteOrder.BIG_ENDIAN).put(this.database.active_amount.prefix().getValue()).put(claimHash).put((byte) 1).putInt(height).array();
byte[] latestValue = null;
while(iterator.isValid()){
if(ByteBuffer.wrap(iterator.key(),0,1+20+1).equals(ByteBuffer.wrap(stop,0,1+20+1))){
int compareStop = ByteBuffer.wrap(iterator.key(),0,1+20+1+4).compareTo(ByteBuffer.wrap(stop));
if(compareStop>0){
break;
}
latestValue = iterator.value();
}
iterator.next();
}
return latestValue!=null?this.database.active_amount.unpackValue(latestValue).amount:0;
}catch(RocksDBException e){
e.printStackTrace();
}
return 0L;
};

for(int x=0;x<255;x++){
byte[] claimHash = ArrayHelper.fill(new byte[20],(byte) x);

assertEquals(300000,getActiveAmountAsOfHeight.apply(claimHash,300));
assertEquals(300000,getActiveAmountAsOfHeight.apply(claimHash,203));
assertEquals(300000,getActiveAmountAsOfHeight.apply(claimHash,202));
assertEquals(200000,getActiveAmountAsOfHeight.apply(claimHash,201));
assertEquals(100000,getActiveAmountAsOfHeight.apply(claimHash,200));
assertEquals(0,getActiveAmountAsOfHeight.apply(claimHash,199));
}
}

}

0 comments on commit dd15a01

Please sign in to comment.