From 2d884710d88a22612d41fe3eba3b3a6c51b17c45 Mon Sep 17 00:00:00 2001 From: Naoki MATSUMOTO Date: Mon, 8 Apr 2024 00:50:51 +0900 Subject: [PATCH] add bisect multithread dimg merge Signed-off-by: Naoki MATSUMOTO --- cmd/ctr-cli/merge/merge.go | 209 ++++++++++++++++++++++++++++++------- cmd/server/main.go | 3 +- pkg/image/merge.go | 138 ++++++++++++++++++++++-- pkg/server/server.go | 3 +- tests/test_merge.sh | 6 ++ 5 files changed, 311 insertions(+), 48 deletions(-) diff --git a/cmd/ctr-cli/merge/merge.go b/cmd/ctr-cli/merge/merge.go index 9b322a9..b298804 100644 --- a/cmd/ctr-cli/merge/merge.go +++ b/cmd/ctr-cli/merge/merge.go @@ -2,8 +2,11 @@ package merge import ( "context" + "fmt" "os" + "path/filepath" "strconv" + "strings" "time" "github.com/containerd/containerd/log" @@ -28,18 +31,24 @@ func DimgCommand() *cli.Command { &cli.StringFlag{ Name: "lowerDimg", Usage: "path to lower dimg", - Required: true, + Required: false, }, &cli.StringFlag{ Name: "upperDimg", Usage: "path to upper dimg", - Required: true, + Required: false, }, &cli.StringFlag{ Name: "outDimg", Usage: "path to merged dimg", Required: true, }, + &cli.StringFlag{ + Name: "dimgs", + Usage: "path to merged dimgs. path is ordered in upper to lower form (upperN, upperN-1, upperN-2, .. upper0)", + Value: "", + Required: false, + }, &cli.BoolFlag{ Name: "benchmark", Usage: "enable benchmark", @@ -58,6 +67,18 @@ func DimgCommand() *cli.Command { Value: 1, Required: false, }, + &cli.IntFlag{ + Name: "mergeDimgConcurrentNum", + Usage: "The nubmer of merged concurrently", + Value: 1, + Required: false, + }, + &cli.StringFlag{ + Name: "mergeMode", + Usage: "The mode to merge (linear, bisect)", + Value: "linear", + Required: false, + }, }, } @@ -71,6 +92,9 @@ func dimgAction(c *cli.Context) error { enableBench := c.Bool("benchmark") enableBenchPerFile := c.Bool("benchmarkPerFile") threadNum := c.Int("threadNum") + mergeDimgConcurrentNum := c.Int("mergeDimgConcurrentNum") + mergeMode := c.String("mergeMode") + dimgs := c.String("dimgs") logger.WithFields(logrus.Fields{ "lowerDimg": lowerDimg, "upperDimg": upperDimg, @@ -88,21 +112,61 @@ func dimgAction(c *cli.Context) error { b.SetDefaultLabels(utils.ParseLabels(c.StringSlice("labels"))) } - start := time.Now() - - mergeFile, err := os.Create(outDimg) - if err != nil { - panic(err) - } - defer mergeFile.Close() mergeConfig := image.MergeConfig{ - ThreadNum: threadNum, - BenchmarkPerFile: enableBenchPerFile, - Benchmarker: b, + ThreadNum: threadNum, + MergeDimgConcurrentNum: mergeDimgConcurrentNum, + BenchmarkPerFile: enableBenchPerFile, + Benchmarker: b, } - header, err := image.MergeDimg(lowerDimg, upperDimg, mergeFile, mergeConfig) - if err != nil { - panic(err) + var header *image.DimgHeader + start := time.Now() + + if dimgs != "" { + dimgEntry := []*image.DimgEntry{} + for _, path := range strings.Split(dimgs, ",") { + dimgFile, err := image.OpenDimgFile(path) + if err != nil { + return fmt.Errorf("failed to open %s: %v", path, err) + } + defer dimgFile.Close() + dimgEntry = append(dimgEntry, &image.DimgEntry{ + DimgHeader: *dimgFile.Header(), + Path: path, + }) + } + start = time.Now() + tmpDir := filepath.Join("/tmp/d4c", utils.GetRandomId("merge-tmp")) + err = os.MkdirAll(tmpDir, 0755) + if err != nil { + return fmt.Errorf("failed to create %s: %v", tmpDir, err) + } + defer os.RemoveAll(tmpDir) + var mergedDimg *image.DimgEntry + switch mergeMode { + case "linear": + mergedDimg, err = image.MergeDimgsWithLinear(dimgEntry, tmpDir, mergeConfig, false) + case "bisect": + mergedDimg, err = image.MergeDimgsWithBisectMultithread(dimgEntry, tmpDir, mergeConfig, false) + default: + return fmt.Errorf("invalid mergeMode %s (only 'linear' or 'bisect' are allowed)", mergeMode) + } + if err != nil { + return fmt.Errorf("failed to merge: %v", err) + } + err = os.Rename(mergedDimg.Path, outDimg) + if err != nil { + return fmt.Errorf("failed to rename %s to %s: %v", mergedDimg.Path, outDimg, err) + } + } else { + mergeFile, err := os.Create(outDimg) + if err != nil { + panic(err) + } + defer mergeFile.Close() + header, err = image.MergeDimg(lowerDimg, upperDimg, mergeFile, mergeConfig) + if err != nil { + panic(err) + } } elapsed := time.Since(start) @@ -117,11 +181,13 @@ func dimgAction(c *cli.Context) error { ElapsedMilli: int(elapsed.Milliseconds()), Size: stat.Size(), Labels: map[string]string{ - "lowerDimg": lowerDimg, - "upperDimg": upperDimg, - "outDimg": outDimg, - "threadNum": strconv.Itoa(threadNum), - "compressionMode": bsdiffx.CompressionModeToString(header.CompressionMode), + "lowerDimg": lowerDimg, + "upperDimg": upperDimg, + "outDimg": outDimg, + "threadNum": strconv.Itoa(threadNum), + "compressionMode": bsdiffx.CompressionModeToString(header.CompressionMode), + "mergeDimgConcurrentNum": strconv.Itoa(mergeDimgConcurrentNum), + "mergeMode": mergeMode, }, } err = b.AppendResult(metric) @@ -144,18 +210,24 @@ func CdimgCommand() *cli.Command { &cli.StringFlag{ Name: "lowerCdimg", Usage: "path to lower cdimg", - Required: true, + Required: false, }, &cli.StringFlag{ Name: "upperCdimg", Usage: "path to upper cdimg", - Required: true, + Required: false, }, &cli.StringFlag{ Name: "outCdimg", Usage: "path to merged cdimg", Required: true, }, + &cli.StringFlag{ + Name: "cdimgs", + Usage: "path to merged cdimgs. path is ordered in upper to lower form (upperN, upperN-1, upperN-2, .. upper0)", + Value: "", + Required: false, + }, &cli.BoolFlag{ Name: "benchmark", Usage: "enable benchmark", @@ -174,6 +246,18 @@ func CdimgCommand() *cli.Command { Value: 1, Required: false, }, + &cli.IntFlag{ + Name: "mergeDimgConcurrentNum", + Usage: "The nubmer of merged concurrently", + Value: 1, + Required: false, + }, + &cli.StringFlag{ + Name: "mergeMode", + Usage: "The mode to merge (linear, bisect)", + Value: "linear", + Required: false, + }, }, } @@ -187,6 +271,9 @@ func cdimgAction(c *cli.Context) error { enableBench := c.Bool("benchmark") enableBenchPerFile := c.Bool("benchmarkPerFile") threadNum := c.Int("threadNum") + mergeDimgConcurrentNum := c.Int("mergeDimgConcurrentNum") + mergeMode := c.String("mergeMode") + cdimgs := c.String("cdimgs") logger.WithFields(logrus.Fields{ "lowerCdimg": lowerCdimg, "upperCdimg": upperCdimg, @@ -204,21 +291,61 @@ func cdimgAction(c *cli.Context) error { b.SetDefaultLabels(utils.ParseLabels(c.StringSlice("labels"))) } - start := time.Now() - - mergeFile, err := os.Create(outCdimg) - if err != nil { - panic(err) - } - defer mergeFile.Close() mergeConfig := image.MergeConfig{ - ThreadNum: threadNum, - BenchmarkPerFile: enableBenchPerFile, - Benchmarker: b, + ThreadNum: threadNum, + MergeDimgConcurrentNum: mergeDimgConcurrentNum, + BenchmarkPerFile: enableBenchPerFile, + Benchmarker: b, } - header, err := image.MergeCdimg(lowerCdimg, upperCdimg, mergeFile, mergeConfig) - if err != nil { - panic(err) + var header *image.DimgHeader + start := time.Now() + + if cdimgs != "" { + dimgEntry := []*image.DimgEntry{} + for _, path := range strings.Split(cdimgs, ",") { + cdimgFile, err := image.OpenCdimgFile(path) + if err != nil { + return fmt.Errorf("failed to open %s: %v", path, err) + } + defer cdimgFile.Close() + dimgEntry = append(dimgEntry, &image.DimgEntry{ + DimgHeader: *cdimgFile.Dimg.Header(), + Path: path, + }) + } + start = time.Now() + tmpDir := filepath.Join("/tmp/d4c", utils.GetRandomId("merge-tmp")) + err = os.MkdirAll(tmpDir, 0755) + if err != nil { + return fmt.Errorf("failed to create %s: %v", tmpDir, err) + } + defer os.RemoveAll(tmpDir) + var mergedDimg *image.DimgEntry + switch mergeMode { + case "linear": + mergedDimg, err = image.MergeDimgsWithLinear(dimgEntry, tmpDir, mergeConfig, true) + case "bisect": + mergedDimg, err = image.MergeDimgsWithBisectMultithread(dimgEntry, tmpDir, mergeConfig, true) + default: + return fmt.Errorf("invalid mergeMode %s (only 'linear' or 'bisect' are allowed)", mergeMode) + } + if err != nil { + return fmt.Errorf("failed to merge: %v", err) + } + err = os.Rename(mergedDimg.Path, outCdimg) + if err != nil { + return fmt.Errorf("failed to rename %s to %s: %v", mergedDimg.Path, outCdimg, err) + } + } else { + mergeFile, err := os.Create(outCdimg) + if err != nil { + panic(err) + } + defer mergeFile.Close() + header, err = image.MergeCdimg(lowerCdimg, upperCdimg, mergeFile, mergeConfig) + if err != nil { + panic(err) + } } elapsed := time.Since(start) @@ -233,11 +360,13 @@ func cdimgAction(c *cli.Context) error { ElapsedMilli: int(elapsed.Milliseconds()), Size: stat.Size(), Labels: map[string]string{ - "lowerCdimg": lowerCdimg, - "upperCdimg": upperCdimg, - "outCdimg": outCdimg, - "threadNum": strconv.Itoa(threadNum), - "compressionMode": bsdiffx.CompressionModeToString(header.CompressionMode), + "lowerCdimg": lowerCdimg, + "upperCdimg": upperCdimg, + "outCdimg": outCdimg, + "threadNum": strconv.Itoa(threadNum), + "compressionMode": bsdiffx.CompressionModeToString(header.CompressionMode), + "mergeDimgConcurrentNum": strconv.Itoa(mergeDimgConcurrentNum), + "mergeMode": mergeMode, }, } err = b.AppendResult(metric) diff --git a/cmd/server/main.go b/cmd/server/main.go index ae7f92f..421ba1f 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -15,7 +15,8 @@ func main() { threadNum := flag.Int("threadNum", 1, "Te number of threads to merge diffs") flag.Parse() mc := image.MergeConfig{ - ThreadNum: *threadNum, + ThreadNum: *threadNum, + MergeDimgConcurrentNum: 4, } ds, err := server.NewDiffServer(mc) if err != nil { diff --git a/pkg/image/merge.go b/pkg/image/merge.go index b620863..22b3cfa 100644 --- a/pkg/image/merge.go +++ b/pkg/image/merge.go @@ -825,9 +825,10 @@ func enqueueMergeTaskToQueue(lowerEntry, upperEntry *FileEntry, taskChan chan me } type MergeConfig struct { - ThreadNum int - BenchmarkPerFile bool - Benchmarker *benchmark.Benchmark + ThreadNum int + MergeDimgConcurrentNum int + BenchmarkPerFile bool + Benchmarker *benchmark.Benchmark } func MergeDimg(lowerDimg, upperDimg string, merged io.Writer, mc MergeConfig) (*DimgHeader, error) { @@ -906,11 +907,16 @@ func MergeCdimg(lowerCdimg, upperCdimg string, merged io.Writer, mc MergeConfig) return &header, nil } -func MergeDimgsWithLinear(dimgs []*DimgEntry, tmpDir string, mc MergeConfig) (*DimgEntry, error) { +func MergeDimgsWithLinear(dimgs []*DimgEntry, tmpDir string, mc MergeConfig, isCdimg bool) (*DimgEntry, error) { lowerDimg := dimgs[len(dimgs)-1] for idx := len(dimgs) - 2; idx >= 0; idx-- { upperDimg := dimgs[idx] - mergedDimgPath := filepath.Join(tmpDir, utils.GetRandomId("merge")+".dimg") + var mergedDimgPath string + if isCdimg { + mergedDimgPath = filepath.Join(tmpDir, utils.GetRandomId("merge")+".cdimg") + } else { + mergedDimgPath = filepath.Join(tmpDir, utils.GetRandomId("merge")+".dimg") + } mergedFile, err := os.Create(mergedDimgPath) if err != nil { return nil, fmt.Errorf("failed to create temporary dimg %s: %v", mergedDimgPath, err) @@ -918,7 +924,12 @@ func MergeDimgsWithLinear(dimgs []*DimgEntry, tmpDir string, mc MergeConfig) (*D defer mergedFile.Close() logger.Infof("merge %s and %s", lowerDimg.Digest(), upperDimg.Digest()) - header, err := MergeDimg(lowerDimg.Path, upperDimg.Path, mergedFile, mc) + var header *DimgHeader + if isCdimg { + header, err = MergeCdimg(lowerDimg.Path, upperDimg.Path, mergedFile, mc) + } else { + header, err = MergeDimg(lowerDimg.Path, upperDimg.Path, mergedFile, mc) + } if err != nil { return nil, fmt.Errorf("failed to merge dimgs: %v", err) } @@ -928,3 +939,118 @@ func MergeDimgsWithLinear(dimgs []*DimgEntry, tmpDir string, mc MergeConfig) (*D return lowerDimg, nil } + +type mergeDimgTask struct { + lowerMergeTask *mergeDimgTask + upperMergeTask *mergeDimgTask + + dimg *DimgEntry + + done chan error +} + +func MergeDimgsWithBisectMultithread(dimgs []*DimgEntry, tmpDir string, mc MergeConfig, isCdimg bool) (*DimgEntry, error) { + mergeTask := buildMergeDimgTask(dimgs) + if mergeTask == nil { + return nil, fmt.Errorf("mergeTasks is nil") + } + + threadLimit := make(chan struct{}, mc.MergeDimgConcurrentNum) + err := runMergeDimgTask(mergeTask, tmpDir, threadLimit, mc, isCdimg) + if err != nil { + return nil, err + } + + if mergeErr := <-mergeTask.done; mergeErr != nil { + return nil, fmt.Errorf("failed to merge: %v", mergeErr) + } + + return mergeTask.dimg, nil +} + +func buildMergeDimgTask(dimgs []*DimgEntry) *mergeDimgTask { + dimgsLen := len(dimgs) + switch dimgsLen { + case 0: + return nil + case 1: + return &mergeDimgTask{ + dimg: dimgs[0], + done: make(chan error, 1), + } + default: + return &mergeDimgTask{ + upperMergeTask: buildMergeDimgTask(dimgs[0 : dimgsLen/2]), + lowerMergeTask: buildMergeDimgTask(dimgs[dimgsLen/2:]), + done: make(chan error, 1), + } + } +} + +func runMergeDimgTask(task *mergeDimgTask, tmpDir string, threadLimit chan struct{}, mc MergeConfig, isCdimg bool) error { + if task.upperMergeTask == nil && task.lowerMergeTask == nil { + task.done <- nil + return nil + } + + if task.lowerMergeTask != nil { + err := runMergeDimgTask(task.lowerMergeTask, tmpDir, threadLimit, mc, isCdimg) + if err != nil { + return err + } + } + + if task.upperMergeTask != nil { + err := runMergeDimgTask(task.upperMergeTask, tmpDir, threadLimit, mc, isCdimg) + if err != nil { + return err + } + } + + if lowerErr := <-task.lowerMergeTask.done; lowerErr != nil { + return fmt.Errorf("lowerMergeTask has error: %v", lowerErr) + } + if upperErr := <-task.upperMergeTask.done; upperErr != nil { + return fmt.Errorf("upperMergeTask has error: %v", upperErr) + } + + threadLimit <- struct{}{} + go func() { + upperDimg := task.upperMergeTask.dimg + lowerDimg := task.lowerMergeTask.dimg + + var mergedDimgPath string + if isCdimg { + mergedDimgPath = filepath.Join(tmpDir, utils.GetRandomId("merge")+".cdimg") + } else { + mergedDimgPath = filepath.Join(tmpDir, utils.GetRandomId("merge")+".dimg") + } + mergedFile, err := os.Create(mergedDimgPath) + if err != nil { + task.done <- fmt.Errorf("failed to create temporary dimg %s: %v", mergedDimgPath, err) + return + } + defer mergedFile.Close() + + logger.Infof("merge %s and %s", lowerDimg.Digest(), upperDimg.Digest()) + var header *DimgHeader + if isCdimg { + header, err = MergeCdimg(lowerDimg.Path, upperDimg.Path, mergedFile, mc) + } else { + header, err = MergeDimg(lowerDimg.Path, upperDimg.Path, mergedFile, mc) + } + if err != nil { + task.done <- fmt.Errorf("failed to merge: %v", err) + return + } + + task.dimg = upperDimg + task.dimg.DimgHeader = *header + task.dimg.Path = mergedDimgPath + task.done <- nil + + <-threadLimit + }() + + return nil +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 2df4a30..16256d6 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -198,7 +198,8 @@ func (ds *DiffServer) handleGetUpdateData(w http.ResponseWriter, r *http.Request w.WriteHeader(http.StatusInternalServerError) return } - resDimg, err := image.MergeDimgsWithLinear(selectedDimgPaths, tmpDir, ds.mergeConfig) + //resDimg, err := image.MergeDimgsWithLinear(selectedDimgPaths, tmpDir, ds.mergeConfig) + resDimg, err := image.MergeDimgsWithBisectMultithread(selectedDimgPaths, tmpDir, ds.mergeConfig, false) if err != nil { logger.Errorf("failed to merge: %v", err) w.WriteHeader(http.StatusInternalServerError) diff --git a/tests/test_merge.sh b/tests/test_merge.sh index bf89980..55c2230 100755 --- a/tests/test_merge.sh +++ b/tests/test_merge.sh @@ -65,6 +65,12 @@ diff_image 1.25.1 1.25.2 diff_image 1.25.2 1.25.3 diff_image 1.25.3 1.25.4 +$BIN_CTR_CLI cdimg merge --cdimgs $IMAGE_DIR/1.25.3-1.25.4.cdimg,$IMAGE_DIR/1.25.2-1.25.3.cdimg,$IMAGE_DIR/1.25.1-1.25.2.cdimg,$IMAGE_DIR/1.25.0-1.25.1.cdimg,$IMAGE_DIR/1.24.0-1.25.0.cdimg,$IMAGE_DIR/1.23.4-1.24.0.cdimg,$IMAGE_DIR/1.23.3-1.23.4.cdimg,$IMAGE_DIR/1.23.2-1.23.3.cdimg,$IMAGE_DIR/1.23.1-1.23.2.cdimg --outCdimg $IMAGE_DIR/1.23.1-1.25.4.cdimg --threadNum 8 --mergeDimgConcurrentNum 4 --mergeMode bisect +rm -rf $IMAGE_DIR/1.25.4-patched +mkdir $IMAGE_DIR/1.25.4-patched + +$BIN_CTR_CLI cdimg patch --baseDir $IMAGE_DIR/image-1.23.1 --outDir $IMAGE_DIR/1.25.4-patched --diffCdimg $IMAGE_DIR/1.23.1-1.25.4.cdimg +diff -r --no-dereference $IMAGE_DIR/image-1.25.4 $IMAGE_DIR/1.25.4-patched $BIN_CTR_CLI push --cdimg $IMAGE_DIR/1.23.1.cdimg --imageTag nginx:1.23.1 $BIN_CTR_CLI push --cdimg $IMAGE_DIR/1.23.1-1.23.2.cdimg