Skip to content

Commit

Permalink
vm: add Process.GetSpillFileService
Browse files Browse the repository at this point in the history
  • Loading branch information
reusee committed Nov 29, 2024
1 parent 6386d14 commit 6e53d30
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 0 deletions.
20 changes: 20 additions & 0 deletions cmd/mo-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ func startService(
return err
}

if err := clearSpillFiles(ctx, fs); err != nil {
return err
}

if globalEtlFS == nil {
globalEtlFS = etlFS
globalServiceType = st.String()
Expand Down Expand Up @@ -514,3 +518,19 @@ func maybeRunInDaemonMode() {
os.Exit(0)
}
}

func clearSpillFiles(ctx context.Context, fs fileservice.FileService) error {
localFS, err := fileservice.Get[fileservice.FileService](fs, defines.LocalFileServiceName)
if err != nil {
return err
}
spillFS := fileservice.SubPath(localFS, defines.SpillFileServiceName)
for entry, err := range spillFS.List(ctx, "/") {
if err != nil {
continue
}
logutil.Info("delete spill file", zap.Any("name", entry.Name))
_ = spillFS.Delete(ctx, entry.Name)
}
return nil
}
3 changes: 3 additions & 0 deletions pkg/defines/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ const (
LocalFileServiceName = "LOCAL"
ETLFileServiceName = "ETL"
StandbyFileServiceName = "STANDBY"

// sub fileservices
SpillFileServiceName = "__spill"
)

const (
Expand Down
9 changes: 9 additions & 0 deletions pkg/vm/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/container/nulls"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/incrservice"
"github.com/matrixorigin/matrixone/pkg/lockservice"
Expand Down Expand Up @@ -234,3 +235,11 @@ func appendTraceField(fields []zap.Field, ctx context.Context) []zap.Field {
}
return fields
}

func (proc *Process) GetSpillFileService() (fileservice.FileService, error) {
local, err := fileservice.Get[fileservice.FileService](proc.Base.FileService, defines.LocalFileServiceName)
if err != nil {
return nil, err
}
return fileservice.SubPath(local, defines.SpillFileServiceName), nil
}
14 changes: 14 additions & 0 deletions pkg/vm/process/process2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"testing"

"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -53,3 +55,15 @@ func TestBuildPipelineContext(t *testing.T) {
proc.Cancel(nil)
assert.Error(t, proc.Ctx.Err())
}

func TestGetSpillFileService(t *testing.T) {
localFS, err := fileservice.NewMemoryFS(defines.LocalFileServiceName, fileservice.DisabledCacheConfig, nil)
assert.Nil(t, err)
proc := &Process{
Base: &BaseProcess{
FileService: localFS,
},
}
_, err = proc.GetSpillFileService()
assert.Nil(t, err)
}

0 comments on commit 6e53d30

Please sign in to comment.