Skip to content

Commit

Permalink
add bisect multithread dimg merge
Browse files Browse the repository at this point in the history
Signed-off-by: Naoki MATSUMOTO <[email protected]>
  • Loading branch information
naoki9911 committed Apr 7, 2024
1 parent 49fd705 commit 2d88471
Show file tree
Hide file tree
Showing 5 changed files with 311 additions and 48 deletions.
209 changes: 169 additions & 40 deletions cmd/ctr-cli/merge/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package merge

import (
"context"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/containerd/containerd/log"
Expand All @@ -28,18 +31,24 @@ func DimgCommand() *cli.Command {
&cli.StringFlag{
Name: "lowerDimg",
Usage: "path to lower dimg",
Required: true,
Required: false,
},
&cli.StringFlag{
Name: "upperDimg",
Usage: "path to upper dimg",
Required: true,
Required: false,
},
&cli.StringFlag{
Name: "outDimg",
Usage: "path to merged dimg",
Required: true,
},
&cli.StringFlag{
Name: "dimgs",
Usage: "path to merged dimgs. path is ordered in upper to lower form (upperN, upperN-1, upperN-2, .. upper0)",
Value: "",
Required: false,
},
&cli.BoolFlag{
Name: "benchmark",
Usage: "enable benchmark",
Expand All @@ -58,6 +67,18 @@ func DimgCommand() *cli.Command {
Value: 1,
Required: false,
},
&cli.IntFlag{
Name: "mergeDimgConcurrentNum",
Usage: "The nubmer of merged concurrently",
Value: 1,
Required: false,
},
&cli.StringFlag{
Name: "mergeMode",
Usage: "The mode to merge (linear, bisect)",
Value: "linear",
Required: false,
},
},
}

Expand All @@ -71,6 +92,9 @@ func dimgAction(c *cli.Context) error {
enableBench := c.Bool("benchmark")
enableBenchPerFile := c.Bool("benchmarkPerFile")
threadNum := c.Int("threadNum")
mergeDimgConcurrentNum := c.Int("mergeDimgConcurrentNum")
mergeMode := c.String("mergeMode")
dimgs := c.String("dimgs")
logger.WithFields(logrus.Fields{
"lowerDimg": lowerDimg,
"upperDimg": upperDimg,
Expand All @@ -88,21 +112,61 @@ func dimgAction(c *cli.Context) error {
b.SetDefaultLabels(utils.ParseLabels(c.StringSlice("labels")))
}

start := time.Now()

mergeFile, err := os.Create(outDimg)
if err != nil {
panic(err)
}
defer mergeFile.Close()
mergeConfig := image.MergeConfig{
ThreadNum: threadNum,
BenchmarkPerFile: enableBenchPerFile,
Benchmarker: b,
ThreadNum: threadNum,
MergeDimgConcurrentNum: mergeDimgConcurrentNum,
BenchmarkPerFile: enableBenchPerFile,
Benchmarker: b,
}
header, err := image.MergeDimg(lowerDimg, upperDimg, mergeFile, mergeConfig)
if err != nil {
panic(err)
var header *image.DimgHeader
start := time.Now()

if dimgs != "" {
dimgEntry := []*image.DimgEntry{}
for _, path := range strings.Split(dimgs, ",") {
dimgFile, err := image.OpenDimgFile(path)
if err != nil {
return fmt.Errorf("failed to open %s: %v", path, err)
}
defer dimgFile.Close()
dimgEntry = append(dimgEntry, &image.DimgEntry{
DimgHeader: *dimgFile.Header(),
Path: path,
})
}
start = time.Now()
tmpDir := filepath.Join("/tmp/d4c", utils.GetRandomId("merge-tmp"))
err = os.MkdirAll(tmpDir, 0755)
if err != nil {
return fmt.Errorf("failed to create %s: %v", tmpDir, err)
}
defer os.RemoveAll(tmpDir)
var mergedDimg *image.DimgEntry
switch mergeMode {
case "linear":
mergedDimg, err = image.MergeDimgsWithLinear(dimgEntry, tmpDir, mergeConfig, false)
case "bisect":
mergedDimg, err = image.MergeDimgsWithBisectMultithread(dimgEntry, tmpDir, mergeConfig, false)
default:
return fmt.Errorf("invalid mergeMode %s (only 'linear' or 'bisect' are allowed)", mergeMode)
}
if err != nil {
return fmt.Errorf("failed to merge: %v", err)
}
err = os.Rename(mergedDimg.Path, outDimg)
if err != nil {
return fmt.Errorf("failed to rename %s to %s: %v", mergedDimg.Path, outDimg, err)
}
} else {
mergeFile, err := os.Create(outDimg)
if err != nil {
panic(err)
}
defer mergeFile.Close()
header, err = image.MergeDimg(lowerDimg, upperDimg, mergeFile, mergeConfig)
if err != nil {
panic(err)
}
}

elapsed := time.Since(start)
Expand All @@ -117,11 +181,13 @@ func dimgAction(c *cli.Context) error {
ElapsedMilli: int(elapsed.Milliseconds()),
Size: stat.Size(),
Labels: map[string]string{
"lowerDimg": lowerDimg,
"upperDimg": upperDimg,
"outDimg": outDimg,
"threadNum": strconv.Itoa(threadNum),
"compressionMode": bsdiffx.CompressionModeToString(header.CompressionMode),
"lowerDimg": lowerDimg,
"upperDimg": upperDimg,
"outDimg": outDimg,
"threadNum": strconv.Itoa(threadNum),
"compressionMode": bsdiffx.CompressionModeToString(header.CompressionMode),
"mergeDimgConcurrentNum": strconv.Itoa(mergeDimgConcurrentNum),
"mergeMode": mergeMode,
},
}
err = b.AppendResult(metric)
Expand All @@ -144,18 +210,24 @@ func CdimgCommand() *cli.Command {
&cli.StringFlag{
Name: "lowerCdimg",
Usage: "path to lower cdimg",
Required: true,
Required: false,
},
&cli.StringFlag{
Name: "upperCdimg",
Usage: "path to upper cdimg",
Required: true,
Required: false,
},
&cli.StringFlag{
Name: "outCdimg",
Usage: "path to merged cdimg",
Required: true,
},
&cli.StringFlag{
Name: "cdimgs",
Usage: "path to merged cdimgs. path is ordered in upper to lower form (upperN, upperN-1, upperN-2, .. upper0)",
Value: "",
Required: false,
},
&cli.BoolFlag{
Name: "benchmark",
Usage: "enable benchmark",
Expand All @@ -174,6 +246,18 @@ func CdimgCommand() *cli.Command {
Value: 1,
Required: false,
},
&cli.IntFlag{
Name: "mergeDimgConcurrentNum",
Usage: "The nubmer of merged concurrently",
Value: 1,
Required: false,
},
&cli.StringFlag{
Name: "mergeMode",
Usage: "The mode to merge (linear, bisect)",
Value: "linear",
Required: false,
},
},
}

Expand All @@ -187,6 +271,9 @@ func cdimgAction(c *cli.Context) error {
enableBench := c.Bool("benchmark")
enableBenchPerFile := c.Bool("benchmarkPerFile")
threadNum := c.Int("threadNum")
mergeDimgConcurrentNum := c.Int("mergeDimgConcurrentNum")
mergeMode := c.String("mergeMode")
cdimgs := c.String("cdimgs")
logger.WithFields(logrus.Fields{
"lowerCdimg": lowerCdimg,
"upperCdimg": upperCdimg,
Expand All @@ -204,21 +291,61 @@ func cdimgAction(c *cli.Context) error {
b.SetDefaultLabels(utils.ParseLabels(c.StringSlice("labels")))
}

start := time.Now()

mergeFile, err := os.Create(outCdimg)
if err != nil {
panic(err)
}
defer mergeFile.Close()
mergeConfig := image.MergeConfig{
ThreadNum: threadNum,
BenchmarkPerFile: enableBenchPerFile,
Benchmarker: b,
ThreadNum: threadNum,
MergeDimgConcurrentNum: mergeDimgConcurrentNum,
BenchmarkPerFile: enableBenchPerFile,
Benchmarker: b,
}
header, err := image.MergeCdimg(lowerCdimg, upperCdimg, mergeFile, mergeConfig)
if err != nil {
panic(err)
var header *image.DimgHeader
start := time.Now()

if cdimgs != "" {
dimgEntry := []*image.DimgEntry{}
for _, path := range strings.Split(cdimgs, ",") {
cdimgFile, err := image.OpenCdimgFile(path)
if err != nil {
return fmt.Errorf("failed to open %s: %v", path, err)
}
defer cdimgFile.Close()
dimgEntry = append(dimgEntry, &image.DimgEntry{
DimgHeader: *cdimgFile.Dimg.Header(),
Path: path,
})
}
start = time.Now()
tmpDir := filepath.Join("/tmp/d4c", utils.GetRandomId("merge-tmp"))
err = os.MkdirAll(tmpDir, 0755)
if err != nil {
return fmt.Errorf("failed to create %s: %v", tmpDir, err)
}
defer os.RemoveAll(tmpDir)
var mergedDimg *image.DimgEntry
switch mergeMode {
case "linear":
mergedDimg, err = image.MergeDimgsWithLinear(dimgEntry, tmpDir, mergeConfig, true)
case "bisect":
mergedDimg, err = image.MergeDimgsWithBisectMultithread(dimgEntry, tmpDir, mergeConfig, true)
default:
return fmt.Errorf("invalid mergeMode %s (only 'linear' or 'bisect' are allowed)", mergeMode)
}
if err != nil {
return fmt.Errorf("failed to merge: %v", err)
}
err = os.Rename(mergedDimg.Path, outCdimg)
if err != nil {
return fmt.Errorf("failed to rename %s to %s: %v", mergedDimg.Path, outCdimg, err)
}
} else {
mergeFile, err := os.Create(outCdimg)
if err != nil {
panic(err)
}
defer mergeFile.Close()
header, err = image.MergeCdimg(lowerCdimg, upperCdimg, mergeFile, mergeConfig)
if err != nil {
panic(err)
}
}

elapsed := time.Since(start)
Expand All @@ -233,11 +360,13 @@ func cdimgAction(c *cli.Context) error {
ElapsedMilli: int(elapsed.Milliseconds()),
Size: stat.Size(),
Labels: map[string]string{
"lowerCdimg": lowerCdimg,
"upperCdimg": upperCdimg,
"outCdimg": outCdimg,
"threadNum": strconv.Itoa(threadNum),
"compressionMode": bsdiffx.CompressionModeToString(header.CompressionMode),
"lowerCdimg": lowerCdimg,
"upperCdimg": upperCdimg,
"outCdimg": outCdimg,
"threadNum": strconv.Itoa(threadNum),
"compressionMode": bsdiffx.CompressionModeToString(header.CompressionMode),
"mergeDimgConcurrentNum": strconv.Itoa(mergeDimgConcurrentNum),
"mergeMode": mergeMode,
},
}
err = b.AppendResult(metric)
Expand Down
3 changes: 2 additions & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ func main() {
threadNum := flag.Int("threadNum", 1, "Te number of threads to merge diffs")
flag.Parse()
mc := image.MergeConfig{
ThreadNum: *threadNum,
ThreadNum: *threadNum,
MergeDimgConcurrentNum: 4,
}
ds, err := server.NewDiffServer(mc)
if err != nil {
Expand Down
Loading

0 comments on commit 2d88471

Please sign in to comment.