diff --git a/jprov/archive/archive.go b/jprov/archive/archive.go index d89723e..12172a9 100644 --- a/jprov/archive/archive.go +++ b/jprov/archive/archive.go @@ -2,6 +2,7 @@ package archive import ( "errors" + "fmt" "io" "os" "path/filepath" @@ -32,11 +33,161 @@ type Archive interface { // Returns error if the tree is not found RetrieveTree(fid string) (tree *merkletree.MerkleTree, err error) // Delete deletes archive from disk. This include the file and merkle tree. - Delete(fid string) + Delete(fid string) error } var _ Archive = &SingleCellArchive{} +var _ Archive = &HybridCellArchive{} + +type HybridCellArchive struct { + rootDir string + pathFactory *SingleCellPathFactory + legacyPathFactory *MultiCellPathFactory +} + +func NewHybridCellArchive(rootDir string) *HybridCellArchive { + return &HybridCellArchive{ + rootDir: rootDir, + pathFactory: NewSingleCellPathFactory(rootDir), + legacyPathFactory: NewMultiCellPathFactory(rootDir), + } +} + +func (f *HybridCellArchive) WriteFileToDisk(data io.Reader, fid string) (written int64, err error) { + path := f.pathFactory.FilePath(fid) + err = os.MkdirAll(filepath.Dir(path), os.ModePerm) + if err != nil { + return + } + + file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, FilePerm) + if err != nil { + return + } + defer func() { + err = errors.Join(err, file.Close()) + }() + + written, err = io.Copy(file, data) + return +} + +func (h *HybridCellArchive) getLegacyPiece(file *os.File, blockSize int64) ([]byte, error) { + block := make([]byte, blockSize) + + _, err := file.Read(block) + if err != nil { + return nil, err + } + + return block, nil +} + +func (h *HybridCellArchive) GetPiece(fid string, index, blockSize int64) (block []byte, err error) { + file, err := os.Open(filepath.Join(h.rootDir, "storage", fid, fmt.Sprintf("%d.jkl", index))) + if err == nil { + // legacy file system + defer func() { + err = errors.Join(err, file.Close()) + }() + return h.getLegacyPiece(file, blockSize) + } else if !os.IsNotExist(err) { // unkown error + return nil, err + } + + file, err = os.Open(h.pathFactory.FilePath(fid)) + if err != nil { + return nil, err + } + defer func() { + err = errors.Join(err, file.Close()) + }() + + block = make([]byte, blockSize) + n, err := file.ReadAt(block, index*blockSize) + // ignoring io.EOF with n > 0 because the file size is not always n * blockSize + if (err != nil && err != io.EOF) || (err == io.EOF && n == 0) { + return nil, err + } + + block = block[:n] + return block, nil +} + +func (h *HybridCellArchive) RetrieveFile(fid string) (data io.ReadSeekCloser, err error) { + data, err = os.Open(h.pathFactory.FilePath(fid)) + return +} + +func (h *HybridCellArchive) FileExist(fid string) bool { + _, err := os.Stat(h.pathFactory.FilePath(fid)) + return errors.Is(err, os.ErrNotExist) +} + +func (h *HybridCellArchive) WriteTreeToDisk(fid string, tree *merkletree.MerkleTree) (err error) { + path := h.pathFactory.TreePath(fid) + err = os.MkdirAll(filepath.Dir(path), os.ModePerm) + if err != nil { + return + } + + file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, FilePerm) + if err != nil { + return + } + defer func() { + err = errors.Join(err, file.Close()) + }() + + data, err := tree.Export() + if err != nil { + return + } + + _, err = file.Write(data) + return +} + +func (h *HybridCellArchive) retrieveLegacyTree(fid string) (*merkletree.MerkleTree, error) { + rawTree, err := os.ReadFile(h.legacyPathFactory.TreePath(fid)) + if err != nil { + return nil, err + } + + tree, err := merkletree.ImportMerkleTree(rawTree, sha3.New512()) + return tree, err +} + +func (h *HybridCellArchive) RetrieveTree(fid string) (tree *merkletree.MerkleTree, err error) { + tree, err = h.retrieveLegacyTree(fid) // attempt to get legacy + if err == nil { + return tree, nil + } else if !os.IsNotExist(err) { + return nil, err + } + + rawTree, err := os.ReadFile(h.pathFactory.TreePath(fid)) + if err != nil { + return + } + + tree, err = merkletree.ImportMerkleTree(rawTree, sha3.New512()) + return +} + +func (h *HybridCellArchive) Delete(fid string) error { + // since the file and merkle tree is saved together in an isolated directory, + // just delete the whole directory + err := os.RemoveAll(h.pathFactory.FileDir(fid)) + if err != nil { + // filePath factory might be broken + // read os.RemoveAll error conditions at std doc + return err + } + return nil +} + type SingleCellArchive struct { rootDir string pathFactory *SingleCellPathFactory @@ -132,13 +283,8 @@ func (f *SingleCellArchive) RetrieveTree(fid string) (tree *merkletree.MerkleTre return } -func (f *SingleCellArchive) Delete(fid string) { +func (f *SingleCellArchive) Delete(fid string) error { // since the file and merkle tree is saved together in an isolated directory, // just delete the whole directory - err := os.RemoveAll(f.pathFactory.FileDir(fid)) - if err != nil { - // filePath factory might be broken - // read os.RemoveAll error conditions at std doc - panic(err) - } + return os.RemoveAll(f.pathFactory.FileDir(fid)) } diff --git a/jprov/archive/archive_test.go b/jprov/archive/archive_test.go index 1e3c34a..08d8cf3 100644 --- a/jprov/archive/archive_test.go +++ b/jprov/archive/archive_test.go @@ -2,10 +2,16 @@ package archive_test import ( "bytes" + "encoding/hex" + "errors" "os" + "path/filepath" "testing" "github.com/JackalLabs/jackal-provider/jprov/archive" + + merkletree "github.com/wealdtech/go-merkletree" + "github.com/wealdtech/go-merkletree/sha3" ) func TestGetPiece(t *testing.T) { @@ -54,3 +60,223 @@ func TestGetPiece(t *testing.T) { t.Errorf("GetPiece 1, 8: have %q, want %q", string(resData), "orld\n") } } + +func prepareTestDir(rootDir string) (string, error) { + tmpDir, err := os.MkdirTemp(rootDir, "") + if err != nil { + return "", err + } + err = os.Mkdir(filepath.Join(tmpDir, "storage"), 0o755) + if err != nil { + err = errors.Join(err, os.RemoveAll(tmpDir)) + return "", err + } + + return tmpDir, nil +} + +func TestHybridCellArchiveGetLegacyPiece(t *testing.T) { + tmpRootDir, err := prepareTestDir(".") + if err != nil { + t.Errorf("failed to create temporary directory for testing: %v", err) + } + defer func() { + err = os.RemoveAll(tmpRootDir) + if err != nil { + t.Errorf("failed to delete testing directory: %v", err) + } + }() + + storageDir := filepath.Join(tmpRootDir, "storage") + + fid0Dir := filepath.Join(storageDir, "fid0") + err = os.Mkdir(fid0Dir, 0o755) + if err != nil { + t.Errorf("failed to make directory for fid0: %v", err) + } + + zeroDotJkl, err := os.Create(filepath.Join(fid0Dir, "0.jkl")) + if err != nil { + t.Errorf("failed to create fid0 0.jkl file: %v", err) + } + defer func() { + err = zeroDotJkl.Close() + if err != nil { + t.Errorf("failed to close fid0 0.jkl: %v", err) + } + }() + + contents := []byte("hello world!\n") + _, err = zeroDotJkl.Write(contents) + if err != nil { + t.Errorf("failed to write test contents at fid0 0.jkl: %v", err) + } + + // fid0.jkl might exist in the same dir if migrations is also happening + fid0DotJkl, err := os.Create(filepath.Join(fid0Dir, "fid0.jkl")) + if err != nil { + t.Errorf("failed to create fid0 0.jkl file: %v", err) + } + defer func() { + err = fid0DotJkl.Close() + if err != nil { + t.Errorf("failed to close fid0 0.jkl: %v", err) + } + }() + + _, err = fid0DotJkl.Write(contents) + if err != nil { + t.Errorf("failed to write test contents at fid0 0.jkl: %v", err) + } + + hybrid := archive.NewHybridCellArchive(tmpRootDir) + + data, err := hybrid.GetPiece("fid0", 0, int64(len(contents))) + if err != nil { + t.Errorf("GetPiece fid0, 0, %d: unexpected error %v", len(contents), err) + } + + if string(data) != string(contents) { + t.Errorf( + "GetPiece fid0, 0, %d: have %q, want %q", + len(contents), + string(data), + string(contents), + ) + } +} + +func TestHybridCellArchiveGetSingleCellPiece(t *testing.T) { + tmpRootDir, err := prepareTestDir(".") + if err != nil { + t.Errorf("failed to create temporary directory for testing: %v", err) + } + defer func() { + err = os.RemoveAll(tmpRootDir) + if err != nil { + t.Errorf("failed to delete testing directory: %v", err) + } + }() + + storageDir := filepath.Join(tmpRootDir, "storage") + + fid0Dir := filepath.Join(storageDir, "fid0") + err = os.Mkdir(fid0Dir, 0o755) + if err != nil { + t.Errorf("failed to make directory for fid0: %v", err) + } + + fid0, err := os.Create(filepath.Join(fid0Dir, "fid0.jkl")) + if err != nil { + t.Errorf("failed to create fid0 0.jkl file: %v", err) + } + defer func() { + err = fid0.Close() + if err != nil { + t.Errorf("failed to close fid0 0.jkl: %v", err) + } + }() + + contents := []byte("hello world!\n") + _, err = fid0.Write(contents) + if err != nil { + t.Errorf("failed to write test contents at fid0 0.jkl: %v", err) + } + + hybrid := archive.NewHybridCellArchive(tmpRootDir) + + data, err := hybrid.GetPiece("fid0", 0, int64(len(contents))) + if err != nil { + t.Errorf("GetPiece fid0, 0, %d: unexpected error %v", len(contents), err) + } + + if string(data) != string(contents) { + t.Errorf( + "GetPiece fid0, 0, %d: have %q, want %q", + len(contents), + string(data), + string(contents), + ) + } +} + +func TestHybridCellArchiveGetLegacyTree(t *testing.T) { + tmpRootDir, err := prepareTestDir(".") + if err != nil { + t.Errorf("failed to create temporary directory for testing: %v", err) + } + defer func() { + err = os.RemoveAll(tmpRootDir) + if err != nil { + t.Errorf("failed to delete testing directory: %v", err) + } + }() + + storageDir := filepath.Join(tmpRootDir, "storage") + + fid0Dir := filepath.Join(storageDir, "fid0") + err = os.Mkdir(fid0Dir, 0o755) + if err != nil { + t.Errorf("failed to make directory for fid0: %v", err) + } + + singleCellTree, err := os.Create(filepath.Join(fid0Dir, "fid0.tree")) + if err != nil { + t.Errorf("failed to create fid0 0.jkl file: %v", err) + } + defer func() { + err = singleCellTree.Close() + if err != nil { + t.Errorf("failed to close fid0 0.jkl: %v", err) + } + }() + + data := [][]byte{[]byte("hello, world\n")} + tree, err := merkletree.NewUsing(data, sha3.New512(), false) + if err != nil { + t.Errorf("failed to create merkletree: %v", err) + } + export, err := tree.Export() + if err != nil { + t.Errorf("failed to export merkletree: %v", err) + } + + _, err = singleCellTree.Write(export) + if err != nil { + t.Errorf("failed to write test contents at fid0 0.tree: %v", err) + } + + // legacy tree location + fid0DotJkl, err := os.Create(filepath.Join(storageDir, "fid0.tree")) + if err != nil { + t.Errorf("failed to create fid0.tree: %v", err) + } + defer func() { + err = fid0DotJkl.Close() + if err != nil { + t.Errorf("failed to close fid0.tree: %v", err) + } + }() + + _, err = fid0DotJkl.Write(export) + if err != nil { + t.Errorf("failed to write test contents at fid0 0.jkl: %v", err) + } + + hybrid := archive.NewHybridCellArchive(tmpRootDir) + + merkletree, err := hybrid.RetrieveTree("fid0") + if err != nil { + t.Errorf("RetrieveTree fid0: unexpected error: %v", err) + } + + expected := tree.Root() + res := merkletree.Root() + + want := hex.EncodeToString(expected) + have := hex.EncodeToString(res) + if want != have { + t.Errorf( + "RetrieveTree fid0: have %q, want %q", want, have) + } +} diff --git a/jprov/archive/path_factory.go b/jprov/archive/path_factory.go index d59ccf5..a4ab8c8 100644 --- a/jprov/archive/path_factory.go +++ b/jprov/archive/path_factory.go @@ -1,7 +1,10 @@ package archive import ( + "fmt" + "os" "path/filepath" + "strconv" "strings" ) @@ -54,3 +57,78 @@ func (s *SingleCellPathFactory) treeName(fid string) (name string) { _, _ = b.WriteString(s.treeExt) return b.String() } + +type MultiCellPathFactory struct { + rootDir string + fileExt string + treeExt string +} + +func NewMultiCellPathFactory(rootDir string) *MultiCellPathFactory { + return &MultiCellPathFactory{rootDir: rootDir, fileExt: ".jkl", treeExt: ".tree"} +} + +func (m *MultiCellPathFactory) PiecePath(fid string, index int) (path string) { + return filepath.Join(m.rootDir, "storage", fmt.Sprintf("%d.jkl", index)) +} + +// Reads directory that stores fid, returns the last piece of fid. +// It only accounts for file names with .jkl format where index is an int. +// It returns os.NotExists error when there are no files with such format. +func (m *MultiCellPathFactory) LastPiece(fid string) (int, error) { + entries, err := os.ReadDir(m.FileDir(fid)) + if err != nil { + return 0, err + } + // work backwards since the entries are in sorted order + i := len(entries) - 1 + for ; i > 0; i++ { + if entries[i].IsDir() { + continue + } + ext := filepath.Ext(entries[i].Name()) + if ext != m.fileExt { + continue + } + subStr, _ := strings.CutSuffix(entries[i].Name(), m.fileExt) + index, err := strconv.ParseInt(subStr, 10, 0) + if err != nil { + continue + } + return int(index), nil + } + return 0, os.ErrNotExist +} + +// returns the last piece of the file +func (m *MultiCellPathFactory) FilePath(fid string) (path string) { + return filepath.Join(m.FileDir(fid), m.fileName(fid)) +} + +func (m *MultiCellPathFactory) FileDir(fid string) (dir string) { + return filepath.Join(m.rootDir, "storage", string(fid)) +} + +func (m *MultiCellPathFactory) fileName(fid string) (name string) { + var b strings.Builder + // ignore length of string and nil error + _, _ = b.WriteString(string(fid)) + _, _ = b.WriteString(m.fileExt) + + return b.String() +} + +func (m *MultiCellPathFactory) TreePath(fid string) (path string) { + return filepath.Join(m.rootDir, "storage", m.treeName(fid)) +} + +func (m *MultiCellPathFactory) TreeDir(fid string) (dir string) { + return filepath.Join(m.rootDir, "storage") +} + +func (m *MultiCellPathFactory) treeName(fid string) (name string) { + var b strings.Builder + _, _ = b.WriteString(string(fid)) + _, _ = b.WriteString(m.treeExt) + return b.String() +} diff --git a/jprov/jprovd/provider_commands.go b/jprov/jprovd/provider_commands.go index fed46cb..771edc2 100644 --- a/jprov/jprovd/provider_commands.go +++ b/jprov/jprovd/provider_commands.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "os" + "syscall" "github.com/JackalLabs/blanket/blanket" "github.com/cosmos/cosmos-sdk/version" @@ -221,6 +222,74 @@ func ResetCommand() *cobra.Command { return cmd } +func PruneCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "prune", + Short: "Prune files that are no longer on contract according to chain data", + Long: "Prune files that are no longer on contract according to chain data", + Args: cobra.ExactArgs(0), + RunE: func(cmd *cobra.Command, args []string) error { + buf := bufio.NewReader(cmd.InOrStdin()) + yes, err := input.GetConfirmation("Are you sure you want to prune expired files?", buf, cmd.ErrOrStderr()) + if err != nil { + return err + } + + if !yes { + return nil + } + + clientCtx := client.GetClientContextFromCmd(cmd) + + dbPath := utils.GetArchiveDBPath(clientCtx) + archivedb, err := archive.NewDoubleRefArchiveDB(dbPath) + if err != nil { + return err + } + defer func() { + err = errors.Join(err, archivedb.Close()) + }() + + downtimedbPath := utils.GetDowntimeDBPath(clientCtx) + downtimedb, err := archive.NewDowntimeDB(downtimedbPath) + if err != nil { + return err + } + defer func() { + err = errors.Join(err, downtimedb.Close()) + }() + + fs, err := server.NewFileServer(cmd, archivedb, downtimedb) + if err != nil { + return err + } + + err = fs.Init() + if err != nil { + return err + } + + err = fs.RecollectActiveDeals() + if err != nil { + return err + } + + interval, err := cmd.Flags().GetUint16(types.FlagInterval) + if err != nil { + interval = 0 + } + + fmt.Println("starting proof server") + go fs.StartProofServer(interval) + + return fs.PruneExpiredFiles() + }, + } + + cmd.Flags().Int64(types.FlagChunkSize, types.DefaultChunkSize, "The size of a single file chunk.") + return cmd +} + func MigrateCommand() *cobra.Command { cmd := &cobra.Command{ Use: "migrate", @@ -228,22 +297,63 @@ func MigrateCommand() *cobra.Command { Long: `Migrate old file system. This will glue all blocks together into one file per fids stored in your machine`, Args: cobra.ExactArgs(0), RunE: func(cmd *cobra.Command, args []string) error { - clientCtx, err := client.GetClientTxContext(cmd) + buf := bufio.NewReader(cmd.InOrStdin()) + yes, err := input.GetConfirmation("Are you sure you want to migrate from old file system?", buf, cmd.ErrOrStderr()) if err != nil { return err } - buf := bufio.NewReader(cmd.InOrStdin()) + if !yes { + return nil + } - yes, err := input.GetConfirmation("Are you sure you want to migrate from old file system?", buf, cmd.ErrOrStderr()) + defer func() { + // required to stop proof server + err = errors.Join(err, syscall.Kill(os.Getpid(), syscall.SIGTERM)) + }() + + clientCtx := client.GetClientContextFromCmd(cmd) + dbPath := utils.GetArchiveDBPath(clientCtx) + archivedb, err := archive.NewDoubleRefArchiveDB(dbPath) if err != nil { return err } + defer func() { + err = errors.Join(err, archivedb.Close()) + }() - if !yes { - return nil + downtimedbPath := utils.GetDowntimeDBPath(clientCtx) + downtimedb, err := archive.NewDowntimeDB(downtimedbPath) + if err != nil { + return err + } + defer func() { + err = errors.Join(err, downtimedb.Close()) + }() + + fs, err := server.NewFileServer(cmd, archivedb, downtimedb) + if err != nil { + return err + } + + err = fs.Init() + if err != nil { + return err + } + + err = fs.RecollectActiveDeals() + if err != nil { + return err } + interval, err := cmd.Flags().GetUint16(types.FlagInterval) + if err != nil { + interval = 0 + } + + fmt.Println("starting proof server") + go fs.StartProofServer(interval) + chunkSize, err := cmd.Flags().GetInt64(types.FlagChunkSize) if err != nil { err = errors.Join(errors.New("Migrate: cannot migrate without chunk size"), err) @@ -251,12 +361,25 @@ func MigrateCommand() *cobra.Command { } utils.Migrate(clientCtx, chunkSize) - - return nil + return err }, } - + AddTxFlagsToCmd(cmd) + cmd.Flags().Int(types.FlagPort, types.DefaultPort, "Port to host the server on.") + cmd.Flags().String(types.VersionFlag, "", "The value exposed by the version api to allow for custom deployments.") + cmd.Flags().Bool(types.HaltStraysFlag, false, "Debug flag to stop picking up strays.") + cmd.Flags().Uint16(types.FlagInterval, types.DefaultInterval, "The interval in seconds for which to check proofs. Must be >=1800 if you need a custom interval") + cmd.Flags().Uint(types.FlagThreads, types.DefaultThreads, "The amount of stray threads.") + cmd.Flags().Int(types.FlagMaxMisses, types.DefaultMaxMisses, "The amount of intervals a provider can miss their proofs before removing a file.") cmd.Flags().Int64(types.FlagChunkSize, types.DefaultChunkSize, "The size of a single file chunk.") + cmd.Flags().Int64(types.FlagStrayInterval, types.DefaultStrayInterval, "The interval in seconds to check for new strays.") + cmd.Flags().Int(types.FlagMessageSize, types.DefaultMessageSize, "The max size of all messages in bytes to submit to the chain at one time.") + cmd.Flags().Int(types.FlagGasCap, types.DefaultGasCap, "The maximum gas to be used per message.") + cmd.Flags().Int64(types.FlagQueueInterval, types.DefaultQueueInterval, "The time, in seconds, between running a queue loop.") + cmd.Flags().String(types.FlagProviderName, "A Storage Provider", "The name to identify this provider in block explorers.") + cmd.Flags().Int64(types.FlagSleep, types.DefaultSleep, "The time, in milliseconds, before adding another proof msg to the queue.") + cmd.Flags().Bool(types.FlagDoReport, types.DefaultDoReport, "Should this provider report deals (uses gas).") + return cmd } diff --git a/jprov/jprovd/root.go b/jprov/jprovd/root.go index 5348f28..fc81a43 100644 --- a/jprov/jprovd/root.go +++ b/jprov/jprovd/root.go @@ -81,6 +81,7 @@ func NewRootCmd() *cobra.Command { NetworkCmd(), BlanketCmd(), CmdShutdownProvider(), + PruneCommand(), ) return rootCmd diff --git a/jprov/server/file_server.go b/jprov/server/file_server.go index 37e86a6..76f5c31 100644 --- a/jprov/server/file_server.go +++ b/jprov/server/file_server.go @@ -213,10 +213,10 @@ func (f *FileServer) MakeContract(fid string, sender string, wg *sync.WaitGroup, return k, nil } -func (f *FileServer) Init() (router *httprouter.Router, err error) { +func (f *FileServer) Init() error { address, err := crypto.GetAddress(f.cosmosCtx) if err != nil { - return + return err } request := &storageTypes.QueryProviderRequest{ @@ -226,18 +226,12 @@ func (f *FileServer) Init() (router *httprouter.Router, err error) { response, err := f.queryClient.Providers(context.Background(), request) if err != nil { err = fmt.Errorf("Provider not initialized on the blockchain, or connection to the RPC node has been lost. Please make sure your RPC node is available then run `jprovd init` to fix this.") - return + return err } f.provider = response.Providers - router = httprouter.New() - - f.GetRoutes(router) - f.PostRoutes(router) - PProfRoutes(router) - - return + return err } func (f *FileServer) RecollectActiveDeals() error { @@ -259,7 +253,7 @@ func (f *FileServer) RecollectActiveDeals() error { } } - f.logger.Info("recollected deals: ", count) + f.logger.Info(fmt.Sprintf("recollected deals: %d\n", count)) return nil } @@ -272,11 +266,11 @@ func (f *FileServer) StartFileServer(cmd *cobra.Command) { log.Fatalf("Failed to close db: %s", err) } }() - router, err := f.Init() - if err != nil { - fmt.Println(err) - return - } + router := httprouter.New() + + f.GetRoutes(router) + f.PostRoutes(router) + PProfRoutes(router) handler := cors.Default().Handler(router) providerName, err := cmd.Flags().GetString(types.FlagProviderName) diff --git a/jprov/server/proofs.go b/jprov/server/proofs.go index 4801d0d..8db4010 100644 --- a/jprov/server/proofs.go +++ b/jprov/server/proofs.go @@ -12,9 +12,11 @@ import ( "net/http" "net/url" "os" + "os/signal" "strconv" "strings" "sync" + "syscall" "time" "github.com/JackalLabs/jackal-provider/jprov/archive" @@ -308,7 +310,10 @@ func (f *FileServer) Purge(cid string) error { } if purge { - f.archive.Delete(fid) + err := f.archive.Delete(fid) + if err != nil { + return err + } } return nil @@ -399,8 +404,7 @@ func (f *FileServer) handleContracts() error { continue } - f.logger.Info(fmt.Sprintf("FID: %s", string(fid))) - f.logger.Info(fmt.Sprintf("CID: %s", cid)) + f.logger.Info(fmt.Sprintf("CID: %s FID: %s", cid, fid)) switch state := f.QueryContractState(cid); state { case verified: @@ -441,22 +445,31 @@ func (f *FileServer) startShift() error { } func (f *FileServer) StartProofServer(interval uint16) { + // catch interrupt or termination sig and stop proving + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + for { - start := time.Now() - err := f.startShift() - if err != nil { - f.logger.Error(err.Error()) - } + select { + case <-sigChan: + return + default: + start := time.Now() + err := f.startShift() + if err != nil { + f.logger.Error(err.Error()) + } - end := time.Since(start) - if end.Seconds() > 120 { - f.logger.Error(fmt.Sprintf("proof took %d", end.Nanoseconds())) - } + end := time.Since(start) + if end.Seconds() > 120 { + f.logger.Error(fmt.Sprintf("proof took %d", end.Nanoseconds())) + } - tm := time.Duration(interval) * time.Second + tm := time.Duration(interval) * time.Second - if tm.Nanoseconds()-end.Nanoseconds() > 0 { - time.Sleep(time.Duration(interval) * time.Second) + if tm.Nanoseconds()-end.Nanoseconds() > 0 { + time.Sleep(time.Duration(interval) * time.Second) + } } } } diff --git a/jprov/server/prune.go b/jprov/server/prune.go new file mode 100644 index 0000000..841c62d --- /dev/null +++ b/jprov/server/prune.go @@ -0,0 +1,80 @@ +package server + +import ( + "errors" + "fmt" + "os" + + "github.com/JackalLabs/jackal-provider/jprov/archive" + "github.com/JackalLabs/jackal-provider/jprov/utils" +) + +func (f *FileServer) allFilesAtStorage() ([]string, error) { + fids := make([]string, 0) + dirs, err := os.ReadDir(utils.GetStorageRootDir(f.cosmosCtx.HomeDir)) + if err != nil { + return nil, err + } + + for _, dir := range dirs { + if dir.IsDir() { + fids = append(fids, dir.Name()) + } + } + + return fids, err +} + +func findOldMerkleTree(homedir, fid string) (bool, error) { + oldTreePath := utils.GetOldTreePath(homedir, fid) + + _, err := os.Stat(oldTreePath) + if err == nil { + return true, nil + } else if errors.Is(err, os.ErrNotExist) { + return false, nil + } + + // we should never reach here because at this point file exist and not exist + // manual investigation required at this point + return false, fmt.Errorf("Error: FindMigratedFile: file exist and not exist at the same time: %s", err.Error()) +} + +func (f *FileServer) purge(fid string) error { + exists, err := findOldMerkleTree(f.cosmosCtx.HomeDir, fid) + if err != nil { + return err + } + + if exists { + if err := os.Remove(utils.GetOldTreePath(f.cosmosCtx.HomeDir, fid)); err != nil { + return err + } + } + + return f.archive.Delete(fid) +} + +func (f *FileServer) PruneExpiredFiles() error { + fids, err := f.allFilesAtStorage() + if err != nil { + return err + } + + count := 0 + for _, fid := range fids { + _, err := f.archivedb.GetContracts(fid) + if errors.Is(err, archive.ErrFidNotFound) { + err := f.purge(fid) + if err != nil { + return err + } + count++ + } else if err != nil { + return err + } + } + + fmt.Printf("pruned %d out of %d files", count, len(fids)) + return err +} diff --git a/jprov/utils/migrate.go b/jprov/utils/migrate.go index f056b17..0ada8b9 100644 --- a/jprov/utils/migrate.go +++ b/jprov/utils/migrate.go @@ -207,7 +207,7 @@ func cleanOld(homeDir, fid string) error { // DiscoverFids reads all directory entry of the storage and returns fids func DiscoverFids(homeDir string) (fids []string, err error) { - dirs, err := os.ReadDir(getStorageRootDir(homeDir)) + dirs, err := os.ReadDir(GetStorageRootDir(homeDir)) if err != nil { return } diff --git a/jprov/utils/utils.go b/jprov/utils/utils.go index 55ac3be..0df3a18 100644 --- a/jprov/utils/utils.go +++ b/jprov/utils/utils.go @@ -33,7 +33,7 @@ func MakeDowntimeKey(cid string) []byte { return []byte(fmt.Sprintf("%s%s", DowntimeKey, cid)) } -func getStorageRootDir(homeDir string) string { +func GetStorageRootDir(homeDir string) string { return filepath.Join(homeDir, "storage") } @@ -42,7 +42,7 @@ func GetContentsFileName(fid string) string { } func GetFidDir(homeDir, fid string) string { - return filepath.Join(getStorageRootDir(homeDir), fid) + return filepath.Join(GetStorageRootDir(homeDir), fid) } // GetContentsPath returns file path for the file that stores the user uploaded contents diff --git a/scripts/test-migration.sh b/scripts/test-migration.sh index 6be0dd7..b91a392 100755 --- a/scripts/test-migration.sh +++ b/scripts/test-migration.sh @@ -91,18 +91,18 @@ sleep 5 start_provider 54f86a701648e8324e920f9592c21cc591b244ae46eac935d45fe962bba1102c \ jkl1xclg3utp4yuvaxa54r39xzrudc988s82ykve3f 0 -start_provider a29c5f0033606d1ac47db6a3327bc13a6b0c426dbfe5c15b2fcd7334b4165033 \ - jkl1tcveayn80pe3d5wallj9kev3rfefctsmrqf6ks 1 -start_provider a490cb438024cddca16470771fb9a21938c4cf61176a46005c6a7b25ee25a649 \ - jkl1eg3gm3e3k4dypvvme26ejmajnyvtgwwlaaeu2y 2 -start_provider 6c8a948c347079706e404ab48afc5f03203556e34ea921f3b132f2b2e9bcc87d \ - jkl1ga0348r8zhn8k4xy3fagwvkwzvyh5lynxr5kak 3 -start_provider 8144389a23c6535e276068ff9043b2b6ff95aa3c103c35486c8f2d2363606fd5 \ - jkl18encuf0esmxv3pxqjqvn0u4tgd6yzuc8urzlp0 4 -start_provider 0e019088a0fafa8f77cb5c0d0f6cb6b63a0015f20d2450480cbcdee44d170aab \ - jkl1sqt9v0zwwx362szrek7pr3lpq29aygw06hgyza 5 -start_provider adf5a86ac54146b172c20b865c548e900c51439c3723af14aeab668ccd2b8ecf \ - jkl1yu099xns2qpslvyrymxq3hwrqhevs7qxksvu8p 6 +#start_provider a29c5f0033606d1ac47db6a3327bc13a6b0c426dbfe5c15b2fcd7334b4165033 \ +# jkl1tcveayn80pe3d5wallj9kev3rfefctsmrqf6ks 1 +#start_provider a490cb438024cddca16470771fb9a21938c4cf61176a46005c6a7b25ee25a649 \ +# jkl1eg3gm3e3k4dypvvme26ejmajnyvtgwwlaaeu2y 2 +#start_provider 6c8a948c347079706e404ab48afc5f03203556e34ea921f3b132f2b2e9bcc87d \ +# jkl1ga0348r8zhn8k4xy3fagwvkwzvyh5lynxr5kak 3 +#start_provider 8144389a23c6535e276068ff9043b2b6ff95aa3c103c35486c8f2d2363606fd5 \ +# jkl18encuf0esmxv3pxqjqvn0u4tgd6yzuc8urzlp0 4 +#start_provider 0e019088a0fafa8f77cb5c0d0f6cb6b63a0015f20d2450480cbcdee44d170aab \ +# jkl1sqt9v0zwwx362szrek7pr3lpq29aygw06hgyza 5 +#start_provider adf5a86ac54146b172c20b865c548e900c51439c3723af14aeab668ccd2b8ecf \ +# jkl1yu099xns2qpslvyrymxq3hwrqhevs7qxksvu8p 6 echo "provider started!!!" sleep 30 @@ -111,16 +111,15 @@ canined tx storage buy-storage jkl10k05lmc88q5ft3lm00q30qkd9x6654h3lejnct 720h 3 sleep 5 upload_file ./scripts/dummy_data/1.png 0 -upload_file ./scripts/dummy_data/2.png 1 -upload_file ./scripts/dummy_data/3.png 6 -#upload_file ./scripts/dummy_data/4.png 0 -#upload_file ./scripts/dummy_data/5.svg 0 -#upload_file ./scripts/dummy_data/6.wav 0 -#upload_file ./scripts/dummy_data/test.txt 0 +#upload_file ./scripts/dummy_data/2.png 1 +#upload_file ./scripts/dummy_data/3.png 6 +upload_file ./scripts/dummy_data/4.png 0 +upload_file ./scripts/dummy_data/5.svg 0 +upload_file ./scripts/dummy_data/6.wav 0 +upload_file ./scripts/dummy_data/test.txt 0 sleep 10 -#read -rsp $'Press any key to shutdown and upgrade provider...\n' -n1 key echo "shutting down providers..." killall jprovd @@ -130,23 +129,24 @@ sleep 4 echo "upgrading provider..." install_new -migrate_provider 0 -migrate_provider 1 -migrate_provider 2 -migrate_provider 3 -migrate_provider 4 -migrate_provider 5 -migrate_provider 6 - - -sleep 5 - -restart_provider 0 -restart_provider 1 -restart_provider 2 -restart_provider 3 -restart_provider 4 -restart_provider 5 +read -rsp $'Press any key to shutdown and upgrade provider...\n' -n1 key +#migrate_provider 0 +#migrate_provider 1 +#migrate_provider 2 +#migrate_provider 3 +#migrate_provider 4 +#migrate_provider 5 +#migrate_provider 6 +# +# +#sleep 5 +# +#restart_provider 0 +#restart_provider 1 +#restart_provider 2 +#restart_provider 3 +#restart_provider 4 +#restart_provider 5 #restart_provider 6 diff --git a/scripts/test-prune.sh b/scripts/test-prune.sh new file mode 100755 index 0000000..61217f7 --- /dev/null +++ b/scripts/test-prune.sh @@ -0,0 +1,177 @@ +#!/bin/bash + +set -eu + +source ./scripts/setup-chain.sh + +current_dir=$(pwd) +TMP_ROOT="$(dirname $(pwd))/_build" +mkdir -p "${TMP_ROOT}" + +TMP_BUILD="${TMP_ROOT}"/jprovd + +install_old () { + TMP_GOCACHE="${TMP_ROOT}"/gocache + mkdir -p "${TMP_GOCACHE}" + + curl -o v1.1.2.zip --output-dir "${TMP_ROOT}" \ + -L https://github.com/JackalLabs/canine-provider/archive/refs/tags/v1.1.2.zip + + unzip -u -d "${TMP_ROOT}" "${TMP_ROOT}"/v1.1.2.zip + + cd "${TMP_ROOT}"/canine-provider-1.1.2 + + go install -mod=readonly ./jprov/jprovd + + cd "${current_dir}" + jprovd version +} + +install_new () { + make install + jprovd version +} + +start_chain () { + startup + from_scratch + fix_config + + screen -d -m -S "canined" bash -c "canined start --pruning=nothing --minimum-gas-prices=0ujkl" +} + +restart_chain () { + screen -d -m -S "canined" bash -c "canined start --pruning=nothing --minimum-gas-prices=0ujkl" +} + +start_provider () { + screen -d -m -L -Logfile "provider$3.log" \ + -S "provider$3" bash -c "./scripts/start-provider.sh $1 $2 $3" +} + +restart_provider () { + screen -d -m -L -Logfile "provider$1.log" \ + -S "provider$1" bash -c "./scripts/restart-provider.sh $1" +} + +migrate_provider () { + echo y | jprovd migrate --home="$HOME/providers/provider$1" +} + +sender="jkl10k05lmc88q5ft3lm00q30qkd9x6654h3lejnct" + +upload_file () { + resp=$(curl -v -F sender=$sender -F file=@$1 http://localhost:333$2/upload) + + # get cid value from json respnose and strip double quote at front and end + # example: + # {"cid":"jklc1amfnkh8fj8wpvadxp8zjm4h3kgnr0m6qqk5a7dkt0a87pc3yc6nqq4sawe","fid":"jklf12g2ae3tw5397rjehjavcfxzxp4nu9nggpm6lvs6m9wfns0gs3ecqpxt6vq"} + # gets: + # jklc1amfnkh8fj8wpvadxp8zjm4h3kgnr0m6qqk5a7dkt0a87pc3yc6nqq4sawe + #cid=$(echo "$resp" | jq '.cid' | sed 's/"//g') + #fid=$(echo "$resp" | jq '.fid' | sed 's/"//g') + + #sleep 5 + + #canined tx storage sign-contract "$cid" --from charlie -y + + #echo "$1 uploaded... fid: ${fid}" + #sleep 5 +} + +upload_file_and_sign () { + resp=$(curl -v -F sender=$sender -F file=@$1 http://localhost:333$2/upload) + + # get cid value from json respnose and strip double quote at front and end + # example: + # {"cid":"jklc1amfnkh8fj8wpvadxp8zjm4h3kgnr0m6qqk5a7dkt0a87pc3yc6nqq4sawe","fid":"jklf12g2ae3tw5397rjehjavcfxzxp4nu9nggpm6lvs6m9wfns0gs3ecqpxt6vq"} + # gets: + # jklc1amfnkh8fj8wpvadxp8zjm4h3kgnr0m6qqk5a7dkt0a87pc3yc6nqq4sawe + cid=$(echo "$resp" | jq '.cid' | sed 's/"//g') + fid=$(echo "$resp" | jq '.fid' | sed 's/"//g') + + sleep 5 + + canined tx storage sign-contract "$cid" --from charlie -y + + echo "$1 uploaded... fid: ${fid}" + sleep 5 +} + + + +shut_down () { + killall canined jprovd +} + +install_old + +start_chain +echo "CHAIN STARTED!!!" +sleep 5 + +start_provider 54f86a701648e8324e920f9592c21cc591b244ae46eac935d45fe962bba1102c \ + jkl1xclg3utp4yuvaxa54r39xzrudc988s82ykve3f 0 +#start_provider a29c5f0033606d1ac47db6a3327bc13a6b0c426dbfe5c15b2fcd7334b4165033 \ +# jkl1tcveayn80pe3d5wallj9kev3rfefctsmrqf6ks 1 +#start_provider a490cb438024cddca16470771fb9a21938c4cf61176a46005c6a7b25ee25a649 \ +# jkl1eg3gm3e3k4dypvvme26ejmajnyvtgwwlaaeu2y 2 +#start_provider 6c8a948c347079706e404ab48afc5f03203556e34ea921f3b132f2b2e9bcc87d \ +# jkl1ga0348r8zhn8k4xy3fagwvkwzvyh5lynxr5kak 3 +#start_provider 8144389a23c6535e276068ff9043b2b6ff95aa3c103c35486c8f2d2363606fd5 \ +# jkl18encuf0esmxv3pxqjqvn0u4tgd6yzuc8urzlp0 4 +#start_provider 0e019088a0fafa8f77cb5c0d0f6cb6b63a0015f20d2450480cbcdee44d170aab \ +# jkl1sqt9v0zwwx362szrek7pr3lpq29aygw06hgyza 5 +#start_provider adf5a86ac54146b172c20b865c548e900c51439c3723af14aeab668ccd2b8ecf \ +# jkl1yu099xns2qpslvyrymxq3hwrqhevs7qxksvu8p 6 +echo "provider started!!!" +sleep 30 + +# upload files +canined tx storage buy-storage jkl10k05lmc88q5ft3lm00q30qkd9x6654h3lejnct 720h 3000000000 ujkl --from charlie -y +sleep 5 + +upload_file_and_sign ./scripts/dummy_data/1.png 0 +#upload_file ./scripts/dummy_data/2.png 1 +#upload_file ./scripts/dummy_data/3.png 6 +upload_file ./scripts/dummy_data/4.png 0 +upload_file ./scripts/dummy_data/5.svg 0 +upload_file ./scripts/dummy_data/6.wav 0 +upload_file ./scripts/dummy_data/test.txt 0 + +sleep 10 + + +echo "shutting down providers..." +killall jprovd + +sleep 4 + +echo "upgrading provider..." +install_new + +read -rsp $'Press any key to shutdown and upgrade provider...\n' -n1 key +#migrate_provider 0 +#migrate_provider 1 +#migrate_provider 2 +#migrate_provider 3 +#migrate_provider 4 +#migrate_provider 5 +#migrate_provider 6 +# +# +#sleep 5 +# +#restart_provider 0 +#restart_provider 1 +#restart_provider 2 +#restart_provider 3 +#restart_provider 4 +#restart_provider 5 +#restart_provider 6 + + +read -rsp $'Press any key to shutdown...\n' -n1 key + +shut_down +#cleanup