Skip to content

Commit

Permalink
feat: initial max2max poc
Browse files Browse the repository at this point in the history
  • Loading branch information
deryrahman committed Sep 20, 2024
1 parent 335cd55 commit 0530cf0
Show file tree
Hide file tree
Showing 16 changed files with 583 additions and 0 deletions.
3 changes: 3 additions & 0 deletions task/max2max/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
build/
dist/
.env
6 changes: 6 additions & 0 deletions task/max2max/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
FROM alpine:3

RUN apk --no-cache add tzdata
COPY ./build/max2max /usr/local/bin/max2max

ENTRYPOINT ["/usr/local/bin/max2max"]
19 changes: 19 additions & 0 deletions task/max2max/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
module github.com/goto/maxcompute-transformation

go 1.22.3

require github.com/aliyun/aliyun-odps-go-sdk v0.3.4

require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v23.5.26+incompatible // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/stretchr/testify v1.9.0 // indirect
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)
272 changes: 272 additions & 0 deletions task/max2max/go.sum

Large diffs are not rendered by default.

49 changes: 49 additions & 0 deletions task/max2max/internal/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package client

import (
"fmt"
"log/slog"
"os"

"github.com/aliyun/aliyun-odps-go-sdk/odps"
)

type Loader interface {
GetQuery(tableID, query string) string
}

type client struct {
logger *slog.Logger
odpsIns *odps.Odps
}

func NewClient(logger *slog.Logger, odpsIns *odps.Odps) *client {
return &client{
logger: logger,
odpsIns: odpsIns,
}
}

func (c *client) Execute(loader Loader, tableID, queryFilePath string) error {
// read query from filepath
c.logger.Info(fmt.Sprintf("executing query from %s", queryFilePath))
queryRaw, err := os.ReadFile(queryFilePath)
if err != nil {
return err
}

// execute query with odps client
c.logger.Info(fmt.Sprintf("execute: %s", string(queryRaw)))
ins, err := c.odpsIns.ExecSQl(loader.GetQuery(tableID, string(queryRaw)))
if err != nil {
return err
}
c.logger.Info(fmt.Sprintf("taskId: %s", ins.Id()))

// wait execution success
if err := ins.WaitForSuccess(); err != nil {
return err
}
c.logger.Info("execution done")
return nil
}
32 changes: 32 additions & 0 deletions task/max2max/internal/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package config

import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
)

type Config struct {
*odps.Config
LogLevel string
LoadMethod string
QueryFilePath string
DestinationTableID string
}

func NewConfig() *Config {
cfg := &Config{
Config: odps.NewConfig(),
// max2max related config
LogLevel: getEnv("LOG_LEVEL", "INFO"),
LoadMethod: getEnv("LOAD_METHOD", "APPEND"),
QueryFilePath: getEnv("QUERY_FILE_PATH", ""),
DestinationTableID: getEnv("DESTINATION_TABLE_ID", ""),
}
// ali-odps-go-sdk related config
cfg.Config.AccessId = getEnv("ACCESS_ID", "")
cfg.Config.AccessKey = getEnv("ACCESS_KEY", "")
cfg.Config.Endpoint = getEnv("ENDPOINT", "http://service.ap-southeast-5.maxcompute.aliyun.com/api")
cfg.Config.ProjectName = getEnv("PROJECT", "")
cfg.Config.HttpTimeout = getEnvDuration("HTTP_TIMEOUT", "10s")

return cfg
}
18 changes: 18 additions & 0 deletions task/max2max/internal/config/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package config

import (
"os"
"time"
)

func getEnv(key, fallback string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return fallback
}

func getEnvDuration(key, fallback string) time.Duration {
result, _ := time.ParseDuration(getEnv(key, fallback))
return result
}
20 changes: 20 additions & 0 deletions task/max2max/internal/loader/append.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package loader

import (
"fmt"
"log/slog"
)

type appendLoader struct {
logger *slog.Logger
}

func NewAppendLoader(logger *slog.Logger) *appendLoader {
return &appendLoader{
logger: logger,
}
}

func (l *appendLoader) GetQuery(tableID, query string) string {
return fmt.Sprintf("INSERT INTO TABLE %s %s", tableID, query)
}
9 changes: 9 additions & 0 deletions task/max2max/internal/loader/const.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package loader

const (
APPEND = "APPEND"
REPLACE = "REPLACE"
REPLACE_ALL = "REPLACE_ALL"
MERGE = "MERGE"
MERGE_REPLACE = "MERGE_REPLACE"
)
27 changes: 27 additions & 0 deletions task/max2max/internal/loader/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package loader

import (
"errors"
"log/slog"
)

type Loader interface {
GetQuery(tableID, query string) string
}

func GetLoader(name string, logger *slog.Logger) (Loader, error) {
switch name {
case APPEND:
return NewAppendLoader(logger), nil
case REPLACE:
return NewReplaceLoader(logger), nil
case REPLACE_ALL:
return NewReplaceAllLoader(logger), nil
case MERGE:
return NewMergeLoader(logger), nil
case MERGE_REPLACE:
return NewMergeReplaceLoader(logger), nil
default:
return nil, errors.New("loader not found")
}
}
19 changes: 19 additions & 0 deletions task/max2max/internal/loader/merge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package loader

import (
"log/slog"
)

type mergeLoader struct {
logger *slog.Logger
}

func NewMergeLoader(logger *slog.Logger) *mergeLoader {
return &mergeLoader{
logger: logger,
}
}

func (l *mergeLoader) GetQuery(tableID, query string) string {
return "-- TODO merge loader"
}
19 changes: 19 additions & 0 deletions task/max2max/internal/loader/merge_replace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package loader

import (
"log/slog"
)

type mergeReplaceLoader struct {
logger *slog.Logger
}

func NewMergeReplaceLoader(logger *slog.Logger) *mergeReplaceLoader {
return &mergeReplaceLoader{
logger: logger,
}
}

func (l *mergeReplaceLoader) GetQuery(tableID, query string) string {
return "-- TODO merge replace loader"
}
19 changes: 19 additions & 0 deletions task/max2max/internal/loader/replace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package loader

import (
"log/slog"
)

type replaceLoader struct {
logger *slog.Logger
}

func NewReplaceLoader(logger *slog.Logger) *replaceLoader {
return &replaceLoader{
logger: logger,
}
}

func (l *replaceLoader) GetQuery(tableID, query string) string {
return "-- TODO replace loader"
}
19 changes: 19 additions & 0 deletions task/max2max/internal/loader/replace_all.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package loader

import (
"log/slog"
)

type replaceAllLoader struct {
logger *slog.Logger
}

func NewReplaceAllLoader(logger *slog.Logger) *replaceAllLoader {
return &replaceAllLoader{
logger: logger,
}
}

func (l *replaceAllLoader) GetQuery(tableID, query string) string {
return "-- TODO replace all loader"
}
16 changes: 16 additions & 0 deletions task/max2max/internal/logger/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package logger

import (
"log/slog"
"os"
)

func NewLogger(logLevel string) (*slog.Logger, error) {
var level slog.Level
if err := level.UnmarshalText([]byte(logLevel)); err != nil {
return nil, err
}

writter := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: level})
return slog.New(writter), nil
}
36 changes: 36 additions & 0 deletions task/max2max/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package main

import (
_ "github.com/aliyun/aliyun-odps-go-sdk/sqldriver"
"github.com/goto/maxcompute-transformation/internal/client"
"github.com/goto/maxcompute-transformation/internal/config"
"github.com/goto/maxcompute-transformation/internal/loader"
"github.com/goto/maxcompute-transformation/internal/logger"
)

// TODO:
// - graceful shutdown
// - error handling
// - instrumentation
func main() {
// load config
cfg := config.NewConfig()

// initiate dependencies
logger, err := logger.NewLogger(cfg.LogLevel)
if err != nil {
panic(err)
}
loader, err := loader.GetLoader(cfg.LoadMethod, logger)
if err != nil {
panic(err)
}
// initiate client
client := client.NewClient(logger, cfg.GenOdps())

// execute query
err = client.Execute(loader, cfg.DestinationTableID, cfg.QueryFilePath)
if err != nil {
panic(err)
}
}

0 comments on commit 0530cf0

Please sign in to comment.