Skip to content

Commit

Permalink
vm: add Process.GetSpillFileService
Browse files Browse the repository at this point in the history
  • Loading branch information
reusee committed Nov 29, 2024
1 parent 83bc719 commit e213ea3
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 0 deletions.
16 changes: 16 additions & 0 deletions cmd/mo-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ func startService(
return err
}

if err := clearSpillFiles(ctx, fs); err != nil {
return err
}

if globalEtlFS == nil {
globalEtlFS = etlFS
globalServiceType = st.String()
Expand Down Expand Up @@ -514,3 +518,15 @@ func maybeRunInDaemonMode() {
os.Exit(0)
}
}

func clearSpillFiles(ctx context.Context, fs fileservice.FileService) error {
localFS, err := fileservice.Get[fileservice.FileService](fs, defines.LocalFileServiceName)
if err != nil {
return err
}
spillFS := fileservice.SubPath(localFS, defines.SpillFileServiceName)
for entry, _ := range spillFS.List(ctx, "/") {
_ = spillFS.Delete(ctx, entry.Name)
}
return nil
}
3 changes: 3 additions & 0 deletions pkg/defines/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ const (
LocalFileServiceName = "LOCAL"
ETLFileServiceName = "ETL"
StandbyFileServiceName = "STANDBY"

// sub fileservices
SpillFileServiceName = "__spill"
)

const (
Expand Down
15 changes: 15 additions & 0 deletions pkg/fileservice/sub_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package fileservice

import (
"context"
"fmt"
"iter"
"path"
"strings"
Expand Down Expand Up @@ -141,3 +142,17 @@ func (s *subPathFS) PrefetchFile(ctx context.Context, filePath string) error {
func (s *subPathFS) Cost() *CostAttr {
return s.upstream.Cost()
}

var _ MutableFileService = new(subPathFS)

func (s *subPathFS) NewMutator(ctx context.Context, filePath string) (Mutator, error) {
p, err := s.toUpstreamPath(filePath)
if err != nil {
return nil, err
}
fs, ok := s.upstream.(MutableFileService)
if !ok {
panic(fmt.Sprintf("%T does not implement MutableFileService", s.upstream))
}
return fs.NewMutator(ctx, p)
}
41 changes: 41 additions & 0 deletions pkg/fileservice/sub_path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,58 @@
package fileservice

import (
"context"
"fmt"
"strings"
"testing"

"github.com/stretchr/testify/assert"
)

func TestSubPathFS(t *testing.T) {

t.Run("file service", func(t *testing.T) {
testFileService(t, 0, func(name string) FileService {
upstream, err := NewMemoryFS(name, DisabledCacheConfig, nil)
assert.Nil(t, err)
return SubPath(upstream, "foo")
})
})

t.Run("mutable file service", func(t *testing.T) {
testMutableFileService(t, func() MutableFileService {
upstream, err := NewLocalFS(context.Background(), "test", t.TempDir(), DisabledCacheConfig, nil)
assert.Nil(t, err)
return SubPath(upstream, "foo").(MutableFileService)
})
})

t.Run("bad path", func(t *testing.T) {
ctx := context.Background()
fs, err := NewLocalFS(ctx, "test", t.TempDir(), DisabledCacheConfig, nil)
assert.Nil(t, err)
subFS := SubPath(fs, "foo").(MutableFileService)
_, err = subFS.NewMutator(ctx, "~~")
assert.NotNil(t, err)
})

t.Run("not mutable", func(t *testing.T) {
fs, err := NewMemoryFS("test", DisabledCacheConfig, nil)
assert.Nil(t, err)
subFS := SubPath(fs, "foo").(MutableFileService)
func() {
defer func() {
p := recover()
if p == nil {
t.Fatal("should panic")
}
msg := fmt.Sprintf("%v", p)
if !strings.Contains(msg, "does not implement MutableFileService") {
t.Fatalf("got %v", msg)
}
}()
subFS.NewMutator(context.Background(), "foo")
}()
})

}
9 changes: 9 additions & 0 deletions pkg/vm/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/container/nulls"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/incrservice"
"github.com/matrixorigin/matrixone/pkg/lockservice"
Expand Down Expand Up @@ -234,3 +235,11 @@ func appendTraceField(fields []zap.Field, ctx context.Context) []zap.Field {
}
return fields
}

func (proc *Process) GetSpillFileService() (fileservice.MutableFileService, error) {
local, err := fileservice.Get[fileservice.MutableFileService](proc.Base.FileService, defines.LocalFileServiceName)
if err != nil {
return nil, err
}
return fileservice.SubPath(local, defines.SpillFileServiceName).(fileservice.MutableFileService), nil
}
36 changes: 36 additions & 0 deletions pkg/vm/process/process2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"testing"

"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -53,3 +55,37 @@ func TestBuildPipelineContext(t *testing.T) {
proc.Cancel(nil)
assert.Error(t, proc.Ctx.Err())
}

func TestGetSpillFileService(t *testing.T) {
localFS, err := fileservice.NewLocalFS(
context.Background(),
defines.LocalFileServiceName,
t.TempDir(),
fileservice.DisabledCacheConfig,
nil,
)
assert.Nil(t, err)
proc := &Process{
Base: &BaseProcess{
FileService: localFS,
},
}
_, err = proc.GetSpillFileService()
assert.Nil(t, err)
}

func TestGetSpillFileServiceError(t *testing.T) {
fs, err := fileservice.NewMemoryFS(
"foo",
fileservice.DisabledCacheConfig,
nil,
)
assert.Nil(t, err)
proc := &Process{
Base: &BaseProcess{
FileService: fs,
},
}
_, err = proc.GetSpillFileService()
assert.NotNil(t, err)
}

0 comments on commit e213ea3

Please sign in to comment.