Skip to content

Commit

Permalink
Add placeholders for Paul
Browse files Browse the repository at this point in the history
  • Loading branch information
firelizzard18 committed Sep 16, 2023
1 parent 3e1da0f commit b00b5e1
Showing 1 changed file with 158 additions and 86 deletions.
244 changes: 158 additions & 86 deletions internal/node/abci/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,120 @@ package abci

import (
"context"
"fmt"
"io"
"os"
"path/filepath"

abci "github.com/cometbft/cometbft/abci/types"
"gitlab.com/accumulatenetwork/accumulate/exp/ioutil"
"gitlab.com/accumulatenetwork/accumulate/internal/core"
"gitlab.com/accumulatenetwork/accumulate/internal/database/snapshot"
sv1 "gitlab.com/accumulatenetwork/accumulate/internal/database/snapshot"
"gitlab.com/accumulatenetwork/accumulate/internal/node/config"
sv2 "gitlab.com/accumulatenetwork/accumulate/pkg/database/snapshot"
"gitlab.com/accumulatenetwork/accumulate/pkg/errors"
)

// ListSnapshots queries the node for available snapshots.
func ListSnapshots(cfg *config.Config) ([]*snapshot.Header, error) {
snapDir := config.MakeAbsolute(cfg.RootDir, cfg.Accumulate.Snapshots.Directory)
entries, err := os.ReadDir(snapDir)
info, err := listSnapshots(snapDir)
if err != nil {
return nil, err
}

snapshots := make([]*snapshot.Header, 0, len(info))
for _, info := range info {
snapshots = append(snapshots, &sv1.Header{
Version: info.version,
Height: info.height,
RootHash: info.rootHash,
})
}
return snapshots, nil
}

func (app *Accumulator) ListSnapshots(_ context.Context, req *abci.RequestListSnapshots) (*abci.ResponseListSnapshots, error) {
info, err := listSnapshots(config.MakeAbsolute(app.Config.RootDir, app.Config.Accumulate.Snapshots.Directory))
if err != nil {
return nil, err
}

resp := new(abci.ResponseListSnapshots)
resp.Snapshots = make([]*abci.Snapshot, 0, len(info))
for _, info := range info {
resp.Snapshots = append(resp.Snapshots, &abci.Snapshot{
Height: info.height,
Format: uint32(info.version),
Chunks: uint32(info.chunks),
Hash: info.rootHash[:],
})
}
return resp, nil
}

// LoadSnapshotChunk queries the node for the body of a snapshot.
func (app *Accumulator) LoadSnapshotChunk(_ context.Context, req *abci.RequestLoadSnapshotChunk) (*abci.ResponseLoadSnapshotChunk, error) {
snapDir := config.MakeAbsolute(app.RootDir, app.Accumulate.Snapshots.Directory)
f, err := os.Open(filepath.Join(snapDir, fmt.Sprintf(core.SnapshotMajorFormat, req.Height)))
if err != nil {
return nil, err
}
defer f.Close()

chunk, err := app.getSnapshotChunk(f, req.Chunk)
if err != nil {
return nil, err
}

return &abci.ResponseLoadSnapshotChunk{Chunk: chunk}, nil
}

// OfferSnapshot offers a snapshot to the node.
func (app *Accumulator) OfferSnapshot(_ context.Context, req *abci.RequestOfferSnapshot) (*abci.ResponseOfferSnapshot, error) {
if req.Snapshot == nil {
return &abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil
}
if req.Snapshot.Format != sv2.Version2 {
return &abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_FORMAT}, nil
}

r, err := app.startSnapshotSync(req.Snapshot, req.AppHash)
if err != nil {
return nil, err
}
return &abci.ResponseOfferSnapshot{Result: r}, nil
}

// ApplySnapshotChunk applies a snapshot to the node.
func (app *Accumulator) ApplySnapshotChunk(_ context.Context, req *abci.RequestApplySnapshotChunk) (*abci.ResponseApplySnapshotChunk, error) {
r, err := app.acceptSnapshotChunk(req.Index, req.Chunk)
if err != nil {
return nil, err
}
return &abci.ResponseApplySnapshotChunk{Result: r}, nil
}

func (app *Accumulator) finishedLoadingSnapshot() error {
var assembled ioutil.SectionReader
return snapshot.FullRestore(app.Database, assembled, app.logger, app.Accumulate.PartitionUrl())
}

type snapshotInfo struct {
version uint64
height uint64
rootHash [32]byte
chunks int
}

func listSnapshots(dir string) ([]*snapshotInfo, error) {
entries, err := os.ReadDir(dir)
if err != nil {
return nil, errors.UnknownError.WithFormat("load snapshot: %w", err)
}

snapshots := make([]*snapshot.Header, 0, len(entries))
snapshots := make([]*snapshotInfo, 0, len(entries))
for _, entry := range entries {
if entry.IsDir() {
continue
Expand All @@ -35,107 +130,84 @@ func ListSnapshots(cfg *config.Config) ([]*snapshot.Header, error) {
continue
}

filename := filepath.Join(snapDir, entry.Name())
filename := filepath.Join(dir, entry.Name())
f, err := os.Open(filename)
if err != nil {
return nil, errors.UnknownError.WithFormat("load snapshot %s: %w", entry.Name(), err)
}
defer f.Close()

header, _, err := snapshot.Open(f)
ver, err := sv2.GetVersion(f)
if err != nil {
return nil, errors.UnknownError.WithFormat("open snapshot %s: %w", entry.Name(), err)
return nil, err
}
_, err = f.Seek(0, io.SeekStart)
if err != nil {
return nil, err
}

var info *snapshotInfo
switch ver {
case sv1.Version1:
info, err = snapshotInfoV1(f)
case sv2.Version2:
info, err = snapshotInfoV2(f)
default:
return nil, errors.InternalError.WithFormat("unsupported snapshot version %d", ver)
}

snapshots = append(snapshots, header)
snapshots = append(snapshots, info)
}
return snapshots, nil
}

func (app *Accumulator) ListSnapshots(_ context.Context, req *abci.RequestListSnapshots) (*abci.ResponseListSnapshots, error) {
// https://gitlab.com/accumulatenetwork/accumulate/-/issues/3356
return &abci.ResponseListSnapshots{}, nil

// entries, err := ListSnapshots(app.Config)
// if err != nil {
// app.logger.Error("Failed to list snapshots", "error", err)
// return abci.ResponseListSnapshots{}
// }

// var resp abci.ResponseListSnapshots
// resp.Snapshots = make([]*abci.Snapshot, 0, len(entries))
// for _, header := range entries {
// resp.Snapshots = append(resp.Snapshots, &abci.Snapshot{
// Height: header.Height,
// Format: uint32(header.Version),
// Chunks: 1,
// Hash: header.RootHash[:],
// })
// }
// return resp
func snapshotInfoV1(f *os.File) (*snapshotInfo, error) {
header, _, err := sv1.Open(f)
if err != nil {
return nil, errors.UnknownError.WithFormat("open snapshot %s: %w", f.Name(), err)
}

return &snapshotInfo{
version: header.Version, //
height: header.Height, //
rootHash: header.RootHash, //
chunks: 1, // <--- ***** CHANGE *****
}, nil
}

// LoadSnapshotChunk queries the node for the body of a snapshot.
func (app *Accumulator) LoadSnapshotChunk(_ context.Context, req *abci.RequestLoadSnapshotChunk) (*abci.ResponseLoadSnapshotChunk, error) {
// https://gitlab.com/accumulatenetwork/accumulate/-/issues/3356
return &abci.ResponseLoadSnapshotChunk{}, nil

// if req.Format != snapshot.Version1 || req.Chunk != 0 {
// app.logger.Error("Invalid snapshot request", "height", req.Height, "format", req.Format, "chunk", req.Chunk)
// return abci.ResponseLoadSnapshotChunk{}
// }

// snapDir := config.MakeAbsolute(app.RootDir, app.Accumulate.Snapshots.Directory)
// f, err := os.Open(filepath.Join(snapDir, fmt.Sprintf(core.SnapshotMajorFormat, req.Height)))
// if err != nil {
// app.logger.Error("Failed to load snapshot", "error", err, "height", req.Height, "format", req.Format, "chunk", req.Chunk)
// return abci.ResponseLoadSnapshotChunk{}
// }
// defer f.Close()

// data, err := io.ReadAll(f)
// if err != nil {
// app.logger.Error("Failed to load snapshot", "error", err, "height", req.Height, "format", req.Format, "chunk", req.Chunk)
// return abci.ResponseLoadSnapshotChunk{}
// }

// return abci.ResponseLoadSnapshotChunk{Chunk: data}
func snapshotInfoV2(f *os.File) (*snapshotInfo, error) {
s, err := sv2.Open(f)
if err != nil {
return nil, errors.UnknownError.WithFormat("open snapshot %s: %w", f.Name(), err)
}

return &snapshotInfo{
version: s.Header.Version, //
height: s.Header.SystemLedger.Index, //
rootHash: s.Header.RootHash, //
chunks: 1, // <--- ***** CHANGE *****
}, nil
}

// OfferSnapshot offers a snapshot to the node.
func (app *Accumulator) OfferSnapshot(_ context.Context, req *abci.RequestOfferSnapshot) (*abci.ResponseOfferSnapshot, error) {
// https://gitlab.com/accumulatenetwork/accumulate/-/issues/3356
return &abci.ResponseOfferSnapshot{}, nil

// if req.Snapshot == nil {
// return abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}
// }
// if req.Snapshot.Format != snapshot.Version1 {
// return abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_FORMAT}
// }
// if req.Snapshot.Chunks != 1 {
// return abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}
// }

// return abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ACCEPT}
func (*Accumulator) getSnapshotChunk(snapshot *os.File, chunk uint32) ([]byte, error) {
panic("TODO")
}

// ApplySnapshotChunk applies a snapshot to the node.
func (app *Accumulator) ApplySnapshotChunk(_ context.Context, req *abci.RequestApplySnapshotChunk) (*abci.ResponseApplySnapshotChunk, error) {
// https://gitlab.com/accumulatenetwork/accumulate/-/issues/3356
return &abci.ResponseApplySnapshotChunk{}, nil

// if req.Index != 0 {
// return abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_REJECT_SNAPSHOT}
// }

// rd := bytes.NewReader(req.Chunk)
// err := snapshot.FullRestore(app.Database, rd, app.logger, &app.Accumulate.Describe)
// if err != nil {
// app.logger.Error("Failed to restore snapshot", "error", err)
// return abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ABORT}
// }

// app.ready = true
// return abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}
func (*Accumulator) startSnapshotSync(snapshot *abci.Snapshot, bptHash []byte) (abci.ResponseOfferSnapshot_Result, error) {
// Reference:
// - Cosmos ABCI: https://github.com/cosmos/cosmos-sdk/blob/167b702732620d35cf3c8e786fe828d4f97e5268/baseapp/abci.go#L276
// - Snapshot manager: https://github.com/cosmos/cosmos-sdk/blob/main/store/snapshots/manager.go#L265
panic("TODO")
}

func (app *Accumulator) acceptSnapshotChunk(chunk uint32, data []byte) (abci.ResponseApplySnapshotChunk_Result, error) {
// Reference:
// - Cosmos ABCI: https://github.com/cosmos/cosmos-sdk/blob/167b702732620d35cf3c8e786fe828d4f97e5268/baseapp/abci.go#L325
// - Snapshot manager: https://github.com/cosmos/cosmos-sdk/blob/main/store/snapshots/manager.go#L401
// Optional inputs:
// - Sender
// Optional outputs:
// - Chunks to refetch
// - Senders to reject
panic("TODO")
}

0 comments on commit b00b5e1

Please sign in to comment.