-
Notifications
You must be signed in to change notification settings - Fork 673
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[merkledb benchmark] implement simple write profile benchmark #3372
base: master
Are you sure you want to change the base?
Changes from 10 commits
da7f632
67b83d9
60a1bd7
e9f6f20
2ce08b8
a8b673f
e888f9e
a34e949
8984569
c844c70
30f05e7
e2ed004
24da31d
171c3b5
3d9fd61
c16e891
bfe6c7c
4a0ae2b
a4d935f
2014fdc
af5c4ab
4f807c8
291f2da
058ec55
0c80953
a6385b0
6a1817e
716fbbd
dd25de9
ae64496
f3191bc
a0f31a4
4602deb
72b9d13
b94e8f3
2485f98
b2aa009
ceada16
9ff9dbe
ddd0c88
1b87ea8
78d164b
e3a0002
d9327ae
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,372 @@ | ||||||
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. | ||||||
// See the file LICENSE for licensing terms. | ||||||
|
||||||
package main | ||||||
|
||||||
import ( | ||||||
"context" | ||||||
"crypto/sha256" | ||||||
"encoding/binary" | ||||||
"fmt" | ||||||
"net/http" | ||||||
"os" | ||||||
"path" | ||||||
"time" | ||||||
|
||||||
"github.com/prometheus/client_golang/prometheus" | ||||||
"github.com/prometheus/client_golang/prometheus/promhttp" | ||||||
"github.com/spf13/pflag" | ||||||
|
||||||
"github.com/ava-labs/avalanchego/database/leveldb" | ||||||
"github.com/ava-labs/avalanchego/trace" | ||||||
"github.com/ava-labs/avalanchego/utils/logging" | ||||||
"github.com/ava-labs/avalanchego/utils/units" | ||||||
"github.com/ava-labs/avalanchego/x/merkledb" | ||||||
) | ||||||
|
||||||
const ( | ||||||
defaultDatabaseEntries = 2000000 | ||||||
databaseCreationBatchSize = 1000000 | ||||||
databaseRunningBatchSize = 2500 | ||||||
databaseRunningUpdateSize = 5000 | ||||||
defaultMetricsPort = 3000 | ||||||
) | ||||||
|
||||||
var ( | ||||||
databaseEntries = pflag.Uint64("n", defaultDatabaseEntries, "number of database entries") | ||||||
httpMetricPort = pflag.Uint64("p", defaultMetricsPort, "default metrics port") | ||||||
verbose = pflag.Bool("v", false, "verbose") | ||||||
) | ||||||
|
||||||
func getMerkleDBConfig(promRegistry prometheus.Registerer) merkledb.Config { | ||||||
const defaultHistoryLength = 300 | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's use the same number as firewood of 120: https://github.com/ava-labs/firewood/blob/main/firewood/src/manager.rs#L21 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done. |
||||||
return merkledb.Config{ | ||||||
BranchFactor: merkledb.BranchFactor16, | ||||||
Hasher: merkledb.DefaultHasher, | ||||||
RootGenConcurrency: 0, | ||||||
HistoryLength: defaultHistoryLength, | ||||||
ValueNodeCacheSize: units.MiB, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These seem really small. If I am reading this correctly, there is about 2Mb of total cache? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I attempted to tweak this value, but it had no performance impact. |
||||||
IntermediateNodeCacheSize: units.MiB, | ||||||
IntermediateWriteBufferSize: units.KiB, | ||||||
IntermediateWriteBatchSize: 256 * units.KiB, | ||||||
Reg: promRegistry, | ||||||
TraceLevel: merkledb.NoTrace, | ||||||
Tracer: trace.Noop, | ||||||
} | ||||||
} | ||||||
|
||||||
func getGoldenStagingDatabaseDirectory() string { | ||||||
wd, _ := os.Getwd() | ||||||
return path.Join(wd, fmt.Sprintf("db-bench-test-golden-staging-%d", *databaseEntries)) | ||||||
} | ||||||
|
||||||
func getGoldenDatabaseDirectory() string { | ||||||
wd, _ := os.Getwd() | ||||||
return path.Join(wd, fmt.Sprintf("db-bench-test-golden-%d", *databaseEntries)) | ||||||
} | ||||||
|
||||||
func getRunningDatabaseDirectory() string { | ||||||
wd, _ := os.Getwd() | ||||||
return path.Join(wd, fmt.Sprintf("db-bench-test-running-%d", *databaseEntries)) | ||||||
} | ||||||
|
||||||
func calculateIndexEncoding(idx uint64) []byte { | ||||||
var entryEncoding [8]byte | ||||||
binary.NativeEndian.PutUint64(entryEncoding[:], idx) | ||||||
entryHash := sha256.Sum256(entryEncoding[:]) | ||||||
return entryHash[:] | ||||||
} | ||||||
|
||||||
func createGoldenDatabase() error { | ||||||
err := os.RemoveAll(getGoldenStagingDatabaseDirectory()) | ||||||
if err != nil { | ||||||
fmt.Fprintf(os.Stderr, "unable to remove running directory : %v\n", err) | ||||||
return err | ||||||
} | ||||||
err = os.Mkdir(getGoldenStagingDatabaseDirectory(), 0o777) | ||||||
if err != nil { | ||||||
fmt.Fprintf(os.Stderr, "unable to create golden staging directory : %v\n", err) | ||||||
return err | ||||||
} | ||||||
|
||||||
promRegistry := prometheus.NewRegistry() | ||||||
|
||||||
levelDB, err := leveldb.New( | ||||||
getGoldenStagingDatabaseDirectory(), | ||||||
nil, | ||||||
logging.NoLog{}, | ||||||
promRegistry, | ||||||
) | ||||||
if err != nil { | ||||||
fmt.Fprintf(os.Stderr, "unable to create levelDB database : %v\n", err) | ||||||
return err | ||||||
} | ||||||
|
||||||
mdb, err := merkledb.New(context.Background(), levelDB, getMerkleDBConfig(nil)) | ||||||
if err != nil { | ||||||
fmt.Fprintf(os.Stderr, "unable to create merkleDB database : %v\n", err) | ||||||
return err | ||||||
} | ||||||
|
||||||
fmt.Printf("Initializing database.") | ||||||
ticksCh := make(chan interface{}) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: do you really need this ticker? Might be easier to report every 100k rows or something. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. reporting every 100k rows doesn't work nicely because of the batch writing ( which blocks for a long time ). |
||||||
go func() { | ||||||
t := time.NewTicker(time.Second) | ||||||
defer t.Stop() | ||||||
for { | ||||||
select { | ||||||
case <-ticksCh: | ||||||
return | ||||||
case <-t.C: | ||||||
fmt.Printf(".") | ||||||
} | ||||||
} | ||||||
}() | ||||||
|
||||||
startInsertTime := time.Now() | ||||||
currentBatch := mdb.NewBatch() | ||||||
for entryIdx := uint64(0); entryIdx < *databaseEntries; entryIdx++ { | ||||||
entryHash := calculateIndexEncoding(entryIdx) | ||||||
err = currentBatch.Put(entryHash, entryHash) | ||||||
if err != nil { | ||||||
fmt.Fprintf(os.Stderr, "unable to put value in merkleDB database : %v\n", err) | ||||||
return err | ||||||
} | ||||||
|
||||||
if entryIdx%databaseCreationBatchSize == (databaseCreationBatchSize - 1) { | ||||||
err = currentBatch.Write() | ||||||
if err != nil { | ||||||
fmt.Fprintf(os.Stderr, "unable to write value in merkleDB database : %v\n", err) | ||||||
return err | ||||||
} | ||||||
currentBatch.Reset() | ||||||
} | ||||||
} | ||||||
if (*databaseEntries)%databaseCreationBatchSize != 0 { | ||||||
err = currentBatch.Write() | ||||||
if err != nil { | ||||||
fmt.Fprintf(os.Stderr, "unable to write value in merkleDB database : %v\n", err) | ||||||
return err | ||||||
} | ||||||
} | ||||||
close(ticksCh) | ||||||
|
||||||
fmt.Printf(" done!\n") | ||||||
rootID, err := mdb.GetMerkleRoot(context.Background()) | ||||||
if err != nil { | ||||||
fmt.Fprintf(os.Stderr, "unable to get root ID : %v\n", err) | ||||||
return err | ||||||
} | ||||||
|
||||||
fmt.Printf("Generated and inserted %d batches of size %d in %v\n", | ||||||
(*databaseEntries)/databaseCreationBatchSize, databaseCreationBatchSize, time.Since(startInsertTime)) | ||||||
err = mdb.Close() | ||||||
if err != nil { | ||||||
fmt.Fprintf(os.Stderr, "unable to close levelDB database : %v\n", err) | ||||||
return err | ||||||
} | ||||||
err = levelDB.Close() | ||||||
if err != nil { | ||||||
fmt.Fprintf(os.Stderr, "unable to close merkleDB database : %v\n", err) | ||||||
return err | ||||||
} | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The logs seem inverted here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed. |
||||||
err = os.Rename(getGoldenStagingDatabaseDirectory(), getGoldenDatabaseDirectory()) | ||||||
if err != nil { | ||||||
fmt.Fprintf(os.Stderr, "unable to rename golden staging directory : %v\n", err) | ||||||
return err | ||||||
} | ||||||
fmt.Printf("Completed initialization with hash of %v\n", rootID.Hex()) | ||||||
return nil | ||||||
} | ||||||
|
||||||
func resetRunningDatabaseDirectory() error { | ||||||
runningDir := getRunningDatabaseDirectory() | ||||||
if _, err := os.Stat(runningDir); err == nil { | ||||||
err := os.RemoveAll(runningDir) | ||||||
if err != nil { | ||||||
fmt.Fprintf(os.Stderr, "unable to remove running directory : %v\n", err) | ||||||
return err | ||||||
} | ||||||
} | ||||||
err := os.Mkdir(runningDir, 0o777) | ||||||
if err != nil { | ||||||
fmt.Fprintf(os.Stderr, "unable to create running directory : %v\n", err) | ||||||
return err | ||||||
} | ||||||
err = CopyDirectory(getGoldenDatabaseDirectory(), runningDir) | ||||||
if err != nil { | ||||||
fmt.Fprintf(os.Stderr, "unable to duplicate golden directory : %v\n", err) | ||||||
return err | ||||||
} | ||||||
return nil | ||||||
} | ||||||
|
||||||
func runBenchmark() error { | ||||||
promRegistry := prometheus.NewRegistry() | ||||||
if err := prometheus.Register(promRegistry); err != nil { | ||||||
return err | ||||||
} | ||||||
|
||||||
writesCounter := prometheus.NewCounter(prometheus.CounterOpts{ | ||||||
Namespace: "merkledb_bench", | ||||||
Name: "writes", | ||||||
Help: "Total number of keys written", | ||||||
}) | ||||||
batchCounter := prometheus.NewCounter(prometheus.CounterOpts{ | ||||||
Namespace: "merkledb_bench", | ||||||
Name: "batch", | ||||||
Help: "Total number of batches written", | ||||||
}) | ||||||
deleteRate := prometheus.NewGauge(prometheus.GaugeOpts{ | ||||||
Namespace: "merkledb_bench", | ||||||
Name: "entry_delete_rate", | ||||||
Help: "The rate at which elements are deleted", | ||||||
}) | ||||||
updateRate := prometheus.NewGauge(prometheus.GaugeOpts{ | ||||||
Namespace: "merkledb_bench", | ||||||
Name: "entry_update_rate", | ||||||
Help: "The rate at which elements are updated", | ||||||
}) | ||||||
insertRate := prometheus.NewGauge(prometheus.GaugeOpts{ | ||||||
Namespace: "merkledb_bench", | ||||||
Name: "entry_insert_rate", | ||||||
Help: "The rate at which elements are inserted", | ||||||
}) | ||||||
batchWriteRate := prometheus.NewGauge(prometheus.GaugeOpts{ | ||||||
Namespace: "merkledb_bench", | ||||||
Name: "batch_write_rate", | ||||||
Help: "The rate at which the batch was written", | ||||||
}) | ||||||
promRegistry.MustRegister(writesCounter) | ||||||
promRegistry.MustRegister(batchCounter) | ||||||
promRegistry.MustRegister(deleteRate) | ||||||
promRegistry.MustRegister(updateRate) | ||||||
promRegistry.MustRegister(insertRate) | ||||||
promRegistry.MustRegister(batchWriteRate) | ||||||
|
||||||
http.Handle("/metrics", promhttp.Handler()) | ||||||
|
||||||
server := &http.Server{ | ||||||
Addr: fmt.Sprintf(":%d", *httpMetricPort), | ||||||
ReadHeaderTimeout: 3 * time.Second, | ||||||
} | ||||||
go func() { | ||||||
err := server.ListenAndServe() | ||||||
if err != nil && err != http.ErrServerClosed { | ||||||
fmt.Fprintf(os.Stderr, "unable to listen and serve : %v\n", err) | ||||||
} | ||||||
}() | ||||||
|
||||||
levelDB, err := leveldb.New( | ||||||
getRunningDatabaseDirectory(), | ||||||
nil, | ||||||
logging.NoLog{}, | ||||||
promRegistry, | ||||||
) | ||||||
if err != nil { | ||||||
fmt.Fprintf(os.Stderr, "unable to create levelDB database : %v\n", err) | ||||||
return err | ||||||
} | ||||||
|
||||||
mdb, err := merkledb.New(context.Background(), levelDB, getMerkleDBConfig(promRegistry)) | ||||||
if err != nil { | ||||||
fmt.Fprintf(os.Stderr, "unable to create merkleDB database : %v\n", err) | ||||||
return err | ||||||
} | ||||||
defer func() { | ||||||
mdb.Close() | ||||||
levelDB.Close() | ||||||
}() | ||||||
|
||||||
low := uint64(0) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good catch; fixed. |
||||||
var deleteDuration, addDuration, updateDuration, batchDuration time.Duration | ||||||
batch := mdb.NewBatch() | ||||||
for { | ||||||
startBatchTime := time.Now() | ||||||
|
||||||
// delete first 2.5k keys from the beginning | ||||||
startDeleteTime := time.Now() | ||||||
for keyToDeleteIdx := low; keyToDeleteIdx < low+databaseRunningBatchSize; keyToDeleteIdx++ { | ||||||
entryHash := calculateIndexEncoding(keyToDeleteIdx) | ||||||
err = batch.Delete(entryHash) | ||||||
if err != nil { | ||||||
fmt.Fprintf(os.Stderr, "unable to delete merkleDB entry : %v\n", err) | ||||||
return err | ||||||
} | ||||||
} | ||||||
deleteDuration = time.Since(startDeleteTime) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can avoid all this math and just report the raw number of deletes. Grafana can convert this to a rate for you. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added both. I believe that my calculation would be more accurate, but let's have both for the time being. |
||||||
deleteRate.Set(float64(databaseRunningBatchSize) * float64(time.Second) / float64(deleteDuration)) | ||||||
|
||||||
// add 2.5k past end. | ||||||
startInsertTime := time.Now() | ||||||
for keyToAddIdx := low + (*databaseEntries); keyToAddIdx < low+(*databaseEntries)+databaseRunningBatchSize; keyToAddIdx++ { | ||||||
entryHash := calculateIndexEncoding(keyToAddIdx) | ||||||
err = batch.Put(entryHash, entryHash) | ||||||
if err != nil { | ||||||
fmt.Fprintf(os.Stderr, "unable to insert merkleDB entry : %v\n", err) | ||||||
return err | ||||||
} | ||||||
} | ||||||
addDuration = time.Since(startInsertTime) | ||||||
insertRate.Set(float64(databaseRunningBatchSize) * float64(time.Second) / float64(addDuration)) | ||||||
|
||||||
// update middle 5k entries | ||||||
startUpdateTime := time.Now() | ||||||
for keyToUpdateIdx := low + ((*databaseEntries) / 2); keyToUpdateIdx < low+((*databaseEntries)/2)+databaseRunningUpdateSize; keyToUpdateIdx++ { | ||||||
updateEntryKey := calculateIndexEncoding(keyToUpdateIdx) | ||||||
updateEntryValue := calculateIndexEncoding(keyToUpdateIdx - ((*databaseEntries) / 2)) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is incorrect, should be:
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm.. I think that it's ok to use the |
||||||
err = batch.Put(updateEntryKey, updateEntryValue) | ||||||
if err != nil { | ||||||
fmt.Fprintf(os.Stderr, "unable to update merkleDB entry : %v\n", err) | ||||||
return err | ||||||
} | ||||||
} | ||||||
updateDuration = time.Since(startUpdateTime) | ||||||
updateRate.Set(float64(databaseRunningUpdateSize) * float64(time.Second) / float64(updateDuration)) | ||||||
|
||||||
batchWriteStartTime := time.Now() | ||||||
err = batch.Write() | ||||||
if err != nil { | ||||||
fmt.Fprintf(os.Stderr, "unable to write batch : %v\n", err) | ||||||
return err | ||||||
} | ||||||
batchDuration = time.Since(startBatchTime) | ||||||
batchWriteDuration := time.Since(batchWriteStartTime) | ||||||
batchWriteRate.Set(float64(time.Second) / float64(batchWriteDuration)) | ||||||
|
||||||
batch.Reset() | ||||||
|
||||||
if *verbose { | ||||||
fmt.Printf("delete rate [%d] update rate [%d] insert rate [%d] batch rate [%d]\n", | ||||||
time.Second/deleteDuration, | ||||||
time.Second/updateDuration, | ||||||
time.Second/addDuration, | ||||||
time.Second/batchDuration) | ||||||
} | ||||||
writesCounter.Add(databaseRunningBatchSize + databaseRunningBatchSize + databaseRunningUpdateSize) | ||||||
batchCounter.Inc() | ||||||
} | ||||||
|
||||||
// return nil | ||||||
} | ||||||
|
||||||
func main() { | ||||||
pflag.Parse() | ||||||
|
||||||
goldenDir := getGoldenDatabaseDirectory() | ||||||
if _, err := os.Stat(goldenDir); os.IsNotExist(err) { | ||||||
// create golden image. | ||||||
if createGoldenDatabase() != nil { | ||||||
os.Exit(1) | ||||||
return | ||||||
} | ||||||
} | ||||||
if resetRunningDatabaseDirectory() != nil { | ||||||
os.Exit(1) | ||||||
return | ||||||
} | ||||||
if runBenchmark() != nil { | ||||||
os.Exit(1) | ||||||
return | ||||||
} | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Batch size is supposed to be 10k. This is 1M.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.