diff --git a/internal/node/abci/snapshot.go b/internal/node/abci/snapshot.go index e5a3b715f..310dc2f42 100644 --- a/internal/node/abci/snapshot.go +++ b/internal/node/abci/snapshot.go @@ -8,25 +8,120 @@ package abci import ( "context" + "fmt" + "io" "os" "path/filepath" abci "github.com/cometbft/cometbft/abci/types" + "gitlab.com/accumulatenetwork/accumulate/exp/ioutil" "gitlab.com/accumulatenetwork/accumulate/internal/core" "gitlab.com/accumulatenetwork/accumulate/internal/database/snapshot" + sv1 "gitlab.com/accumulatenetwork/accumulate/internal/database/snapshot" "gitlab.com/accumulatenetwork/accumulate/internal/node/config" + sv2 "gitlab.com/accumulatenetwork/accumulate/pkg/database/snapshot" "gitlab.com/accumulatenetwork/accumulate/pkg/errors" ) // ListSnapshots queries the node for available snapshots. func ListSnapshots(cfg *config.Config) ([]*snapshot.Header, error) { snapDir := config.MakeAbsolute(cfg.RootDir, cfg.Accumulate.Snapshots.Directory) - entries, err := os.ReadDir(snapDir) + info, err := listSnapshots(snapDir) + if err != nil { + return nil, err + } + + snapshots := make([]*snapshot.Header, 0, len(info)) + for _, info := range info { + snapshots = append(snapshots, &sv1.Header{ + Version: info.version, + Height: info.height, + RootHash: info.rootHash, + }) + } + return snapshots, nil +} + +func (app *Accumulator) ListSnapshots(_ context.Context, req *abci.RequestListSnapshots) (*abci.ResponseListSnapshots, error) { + info, err := listSnapshots(config.MakeAbsolute(app.Config.RootDir, app.Config.Accumulate.Snapshots.Directory)) + if err != nil { + return nil, err + } + + resp := new(abci.ResponseListSnapshots) + resp.Snapshots = make([]*abci.Snapshot, 0, len(info)) + for _, info := range info { + resp.Snapshots = append(resp.Snapshots, &abci.Snapshot{ + Height: info.height, + Format: uint32(info.version), + Chunks: uint32(info.chunks), + Hash: info.rootHash[:], + }) + } + return resp, nil +} + +// LoadSnapshotChunk queries the node for the body of a snapshot. +func (app *Accumulator) LoadSnapshotChunk(_ context.Context, req *abci.RequestLoadSnapshotChunk) (*abci.ResponseLoadSnapshotChunk, error) { + snapDir := config.MakeAbsolute(app.RootDir, app.Accumulate.Snapshots.Directory) + f, err := os.Open(filepath.Join(snapDir, fmt.Sprintf(core.SnapshotMajorFormat, req.Height))) + if err != nil { + return nil, err + } + defer f.Close() + + chunk, err := app.getSnapshotChunk(f, req.Chunk) + if err != nil { + return nil, err + } + + return &abci.ResponseLoadSnapshotChunk{Chunk: chunk}, nil +} + +// OfferSnapshot offers a snapshot to the node. +func (app *Accumulator) OfferSnapshot(_ context.Context, req *abci.RequestOfferSnapshot) (*abci.ResponseOfferSnapshot, error) { + if req.Snapshot == nil { + return &abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil + } + if req.Snapshot.Format != sv2.Version2 { + return &abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_FORMAT}, nil + } + + r, err := app.startSnapshotSync(req.Snapshot, req.AppHash) + if err != nil { + return nil, err + } + return &abci.ResponseOfferSnapshot{Result: r}, nil +} + +// ApplySnapshotChunk applies a snapshot to the node. +func (app *Accumulator) ApplySnapshotChunk(_ context.Context, req *abci.RequestApplySnapshotChunk) (*abci.ResponseApplySnapshotChunk, error) { + r, err := app.acceptSnapshotChunk(req.Index, req.Chunk) + if err != nil { + return nil, err + } + return &abci.ResponseApplySnapshotChunk{Result: r}, nil +} + +func (app *Accumulator) finishedLoadingSnapshot() error { + var assembled ioutil.SectionReader + return snapshot.FullRestore(app.Database, assembled, app.logger, app.Accumulate.PartitionUrl()) +} + +type snapshotInfo struct { + version uint64 + height uint64 + rootHash [32]byte + chunks int +} + +func listSnapshots(dir string) ([]*snapshotInfo, error) { + entries, err := os.ReadDir(dir) if err != nil { return nil, errors.UnknownError.WithFormat("load snapshot: %w", err) } - snapshots := make([]*snapshot.Header, 0, len(entries)) + snapshots := make([]*snapshotInfo, 0, len(entries)) for _, entry := range entries { if entry.IsDir() { continue @@ -35,107 +130,84 @@ func ListSnapshots(cfg *config.Config) ([]*snapshot.Header, error) { continue } - filename := filepath.Join(snapDir, entry.Name()) + filename := filepath.Join(dir, entry.Name()) f, err := os.Open(filename) if err != nil { return nil, errors.UnknownError.WithFormat("load snapshot %s: %w", entry.Name(), err) } defer f.Close() - header, _, err := snapshot.Open(f) + ver, err := sv2.GetVersion(f) if err != nil { - return nil, errors.UnknownError.WithFormat("open snapshot %s: %w", entry.Name(), err) + return nil, err + } + _, err = f.Seek(0, io.SeekStart) + if err != nil { + return nil, err + } + + var info *snapshotInfo + switch ver { + case sv1.Version1: + info, err = snapshotInfoV1(f) + case sv2.Version2: + info, err = snapshotInfoV2(f) + default: + return nil, errors.InternalError.WithFormat("unsupported snapshot version %d", ver) } - snapshots = append(snapshots, header) + snapshots = append(snapshots, info) } return snapshots, nil } -func (app *Accumulator) ListSnapshots(_ context.Context, req *abci.RequestListSnapshots) (*abci.ResponseListSnapshots, error) { - // https://gitlab.com/accumulatenetwork/accumulate/-/issues/3356 - return &abci.ResponseListSnapshots{}, nil - - // entries, err := ListSnapshots(app.Config) - // if err != nil { - // app.logger.Error("Failed to list snapshots", "error", err) - // return abci.ResponseListSnapshots{} - // } - - // var resp abci.ResponseListSnapshots - // resp.Snapshots = make([]*abci.Snapshot, 0, len(entries)) - // for _, header := range entries { - // resp.Snapshots = append(resp.Snapshots, &abci.Snapshot{ - // Height: header.Height, - // Format: uint32(header.Version), - // Chunks: 1, - // Hash: header.RootHash[:], - // }) - // } - // return resp +func snapshotInfoV1(f *os.File) (*snapshotInfo, error) { + header, _, err := sv1.Open(f) + if err != nil { + return nil, errors.UnknownError.WithFormat("open snapshot %s: %w", f.Name(), err) + } + + return &snapshotInfo{ + version: header.Version, // + height: header.Height, // + rootHash: header.RootHash, // + chunks: 1, // <--- ***** CHANGE ***** + }, nil } -// LoadSnapshotChunk queries the node for the body of a snapshot. -func (app *Accumulator) LoadSnapshotChunk(_ context.Context, req *abci.RequestLoadSnapshotChunk) (*abci.ResponseLoadSnapshotChunk, error) { - // https://gitlab.com/accumulatenetwork/accumulate/-/issues/3356 - return &abci.ResponseLoadSnapshotChunk{}, nil - - // if req.Format != snapshot.Version1 || req.Chunk != 0 { - // app.logger.Error("Invalid snapshot request", "height", req.Height, "format", req.Format, "chunk", req.Chunk) - // return abci.ResponseLoadSnapshotChunk{} - // } - - // snapDir := config.MakeAbsolute(app.RootDir, app.Accumulate.Snapshots.Directory) - // f, err := os.Open(filepath.Join(snapDir, fmt.Sprintf(core.SnapshotMajorFormat, req.Height))) - // if err != nil { - // app.logger.Error("Failed to load snapshot", "error", err, "height", req.Height, "format", req.Format, "chunk", req.Chunk) - // return abci.ResponseLoadSnapshotChunk{} - // } - // defer f.Close() - - // data, err := io.ReadAll(f) - // if err != nil { - // app.logger.Error("Failed to load snapshot", "error", err, "height", req.Height, "format", req.Format, "chunk", req.Chunk) - // return abci.ResponseLoadSnapshotChunk{} - // } - - // return abci.ResponseLoadSnapshotChunk{Chunk: data} +func snapshotInfoV2(f *os.File) (*snapshotInfo, error) { + s, err := sv2.Open(f) + if err != nil { + return nil, errors.UnknownError.WithFormat("open snapshot %s: %w", f.Name(), err) + } + + return &snapshotInfo{ + version: s.Header.Version, // + height: s.Header.SystemLedger.Index, // + rootHash: s.Header.RootHash, // + chunks: 1, // <--- ***** CHANGE ***** + }, nil } -// OfferSnapshot offers a snapshot to the node. -func (app *Accumulator) OfferSnapshot(_ context.Context, req *abci.RequestOfferSnapshot) (*abci.ResponseOfferSnapshot, error) { - // https://gitlab.com/accumulatenetwork/accumulate/-/issues/3356 - return &abci.ResponseOfferSnapshot{}, nil - - // if req.Snapshot == nil { - // return abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT} - // } - // if req.Snapshot.Format != snapshot.Version1 { - // return abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_FORMAT} - // } - // if req.Snapshot.Chunks != 1 { - // return abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT} - // } - - // return abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ACCEPT} +func (*Accumulator) getSnapshotChunk(snapshot *os.File, chunk uint32) ([]byte, error) { + panic("TODO") } -// ApplySnapshotChunk applies a snapshot to the node. -func (app *Accumulator) ApplySnapshotChunk(_ context.Context, req *abci.RequestApplySnapshotChunk) (*abci.ResponseApplySnapshotChunk, error) { - // https://gitlab.com/accumulatenetwork/accumulate/-/issues/3356 - return &abci.ResponseApplySnapshotChunk{}, nil - - // if req.Index != 0 { - // return abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_REJECT_SNAPSHOT} - // } - - // rd := bytes.NewReader(req.Chunk) - // err := snapshot.FullRestore(app.Database, rd, app.logger, &app.Accumulate.Describe) - // if err != nil { - // app.logger.Error("Failed to restore snapshot", "error", err) - // return abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ABORT} - // } - - // app.ready = true - // return abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT} +func (*Accumulator) startSnapshotSync(snapshot *abci.Snapshot, bptHash []byte) (abci.ResponseOfferSnapshot_Result, error) { + // Reference: + // - Cosmos ABCI: https://github.com/cosmos/cosmos-sdk/blob/167b702732620d35cf3c8e786fe828d4f97e5268/baseapp/abci.go#L276 + // - Snapshot manager: https://github.com/cosmos/cosmos-sdk/blob/main/store/snapshots/manager.go#L265 + panic("TODO") +} + +func (app *Accumulator) acceptSnapshotChunk(chunk uint32, data []byte) (abci.ResponseApplySnapshotChunk_Result, error) { + // Reference: + // - Cosmos ABCI: https://github.com/cosmos/cosmos-sdk/blob/167b702732620d35cf3c8e786fe828d4f97e5268/baseapp/abci.go#L325 + // - Snapshot manager: https://github.com/cosmos/cosmos-sdk/blob/main/store/snapshots/manager.go#L401 + // Optional inputs: + // - Sender + // Optional outputs: + // - Chunks to refetch + // - Senders to reject + panic("TODO") }