From 9b1ec9eb426372533b4eb0588b93b4794b6b3cfd Mon Sep 17 00:00:00 2001 From: reus Date: Fri, 29 Nov 2024 10:03:53 +0800 Subject: [PATCH] vm: add Process.GetSpillFileService --- cmd/mo-service/main.go | 16 +++++++++++++ pkg/defines/const.go | 3 +++ pkg/fileservice/sub_path.go | 15 ++++++++++++ pkg/fileservice/sub_path_test.go | 41 ++++++++++++++++++++++++++++++++ pkg/vm/process/process.go | 9 +++++++ pkg/vm/process/process2_test.go | 36 ++++++++++++++++++++++++++++ 6 files changed, 120 insertions(+) diff --git a/cmd/mo-service/main.go b/cmd/mo-service/main.go index 423ae8abf153a..576554b598b25 100644 --- a/cmd/mo-service/main.go +++ b/cmd/mo-service/main.go @@ -215,6 +215,10 @@ func startService( return err } + if err := clearSpillFiles(ctx, fs); err != nil { + return err + } + if globalEtlFS == nil { globalEtlFS = etlFS globalServiceType = st.String() @@ -514,3 +518,15 @@ func maybeRunInDaemonMode() { os.Exit(0) } } + +func clearSpillFiles(ctx context.Context, fs fileservice.FileService) error { + localFS, err := fileservice.Get[fileservice.FileService](fs, defines.LocalFileServiceName) + if err != nil { + return err + } + spillFS := fileservice.SubPath(localFS, defines.SpillFileServiceName) + for entry := range spillFS.List(ctx, "/") { + _ = spillFS.Delete(ctx, entry.Name) + } + return nil +} diff --git a/pkg/defines/const.go b/pkg/defines/const.go index 12cbb93a0864c..c69f33d245d8a 100644 --- a/pkg/defines/const.go +++ b/pkg/defines/const.go @@ -29,6 +29,9 @@ const ( LocalFileServiceName = "LOCAL" ETLFileServiceName = "ETL" StandbyFileServiceName = "STANDBY" + + // sub fileservices + SpillFileServiceName = "__spill" ) const ( diff --git a/pkg/fileservice/sub_path.go b/pkg/fileservice/sub_path.go index 66ae0b9185548..1d4360d786eaa 100644 --- a/pkg/fileservice/sub_path.go +++ b/pkg/fileservice/sub_path.go @@ -16,6 +16,7 @@ package fileservice import ( "context" + "fmt" "iter" "path" "strings" @@ -141,3 +142,17 @@ func (s *subPathFS) PrefetchFile(ctx context.Context, filePath string) error { func (s *subPathFS) Cost() *CostAttr { return s.upstream.Cost() } + +var _ MutableFileService = new(subPathFS) + +func (s *subPathFS) NewMutator(ctx context.Context, filePath string) (Mutator, error) { + p, err := s.toUpstreamPath(filePath) + if err != nil { + return nil, err + } + fs, ok := s.upstream.(MutableFileService) + if !ok { + panic(fmt.Sprintf("%T does not implement MutableFileService", s.upstream)) + } + return fs.NewMutator(ctx, p) +} diff --git a/pkg/fileservice/sub_path_test.go b/pkg/fileservice/sub_path_test.go index 543bb7c15d35f..b63a37603f3b3 100644 --- a/pkg/fileservice/sub_path_test.go +++ b/pkg/fileservice/sub_path_test.go @@ -15,12 +15,16 @@ package fileservice import ( + "context" + "fmt" + "strings" "testing" "github.com/stretchr/testify/assert" ) func TestSubPathFS(t *testing.T) { + t.Run("file service", func(t *testing.T) { testFileService(t, 0, func(name string) FileService { upstream, err := NewMemoryFS(name, DisabledCacheConfig, nil) @@ -28,4 +32,41 @@ func TestSubPathFS(t *testing.T) { return SubPath(upstream, "foo") }) }) + + t.Run("mutable file service", func(t *testing.T) { + testMutableFileService(t, func() MutableFileService { + upstream, err := NewLocalFS(context.Background(), "test", t.TempDir(), DisabledCacheConfig, nil) + assert.Nil(t, err) + return SubPath(upstream, "foo").(MutableFileService) + }) + }) + + t.Run("bad path", func(t *testing.T) { + ctx := context.Background() + fs, err := NewLocalFS(ctx, "test", t.TempDir(), DisabledCacheConfig, nil) + assert.Nil(t, err) + subFS := SubPath(fs, "foo").(MutableFileService) + _, err = subFS.NewMutator(ctx, "~~") + assert.NotNil(t, err) + }) + + t.Run("not mutable", func(t *testing.T) { + fs, err := NewMemoryFS("test", DisabledCacheConfig, nil) + assert.Nil(t, err) + subFS := SubPath(fs, "foo").(MutableFileService) + func() { + defer func() { + p := recover() + if p == nil { + t.Fatal("should panic") + } + msg := fmt.Sprintf("%v", p) + if !strings.Contains(msg, "does not implement MutableFileService") { + t.Fatalf("got %v", msg) + } + }() + subFS.NewMutator(context.Background(), "foo") + }() + }) + } diff --git a/pkg/vm/process/process.go b/pkg/vm/process/process.go index 7d7ebc21922b4..9eed88db29c33 100644 --- a/pkg/vm/process/process.go +++ b/pkg/vm/process/process.go @@ -26,6 +26,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/nulls" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" + "github.com/matrixorigin/matrixone/pkg/defines" "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/incrservice" "github.com/matrixorigin/matrixone/pkg/lockservice" @@ -234,3 +235,11 @@ func appendTraceField(fields []zap.Field, ctx context.Context) []zap.Field { } return fields } + +func (proc *Process) GetSpillFileService() (fileservice.MutableFileService, error) { + local, err := fileservice.Get[fileservice.MutableFileService](proc.Base.FileService, defines.LocalFileServiceName) + if err != nil { + return nil, err + } + return fileservice.SubPath(local, defines.SpillFileServiceName).(fileservice.MutableFileService), nil +} diff --git a/pkg/vm/process/process2_test.go b/pkg/vm/process/process2_test.go index b3e8ebe89a239..4fbf16e007063 100644 --- a/pkg/vm/process/process2_test.go +++ b/pkg/vm/process/process2_test.go @@ -18,6 +18,8 @@ import ( "context" "testing" + "github.com/matrixorigin/matrixone/pkg/defines" + "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/stretchr/testify/assert" ) @@ -53,3 +55,37 @@ func TestBuildPipelineContext(t *testing.T) { proc.Cancel(nil) assert.Error(t, proc.Ctx.Err()) } + +func TestGetSpillFileService(t *testing.T) { + localFS, err := fileservice.NewLocalFS( + context.Background(), + defines.LocalFileServiceName, + t.TempDir(), + fileservice.DisabledCacheConfig, + nil, + ) + assert.Nil(t, err) + proc := &Process{ + Base: &BaseProcess{ + FileService: localFS, + }, + } + _, err = proc.GetSpillFileService() + assert.Nil(t, err) +} + +func TestGetSpillFileServiceError(t *testing.T) { + fs, err := fileservice.NewMemoryFS( + "foo", + fileservice.DisabledCacheConfig, + nil, + ) + assert.Nil(t, err) + proc := &Process{ + Base: &BaseProcess{ + FileService: fs, + }, + } + _, err = proc.GetSpillFileService() + assert.NotNil(t, err) +}