Skip to content

Commit

Permalink
Create bucket on upload service start
Browse files Browse the repository at this point in the history
  • Loading branch information
anbsky committed May 22, 2023
1 parent f9b5e0a commit ae3de30
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 74 deletions.
21 changes: 10 additions & 11 deletions apps/uploads/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (

"github.com/alecthomas/kong"
"github.com/go-redis/redis/v8"
"github.com/tus/tusd/pkg/s3store"
)

var cli struct {
Expand Down Expand Up @@ -68,8 +67,6 @@ func serve(logger logging.KVLogger) {
logger.Fatal("s3 client failed", "err", err)
}

store := s3store.New(s3cfg.Bucket, client)

redisOpts, err := redis.ParseURL(cfg.V.GetString("RedisLocker"))
if err != nil {
logger.Fatal("redis config parse failed", "err", err)
Expand All @@ -92,14 +89,16 @@ func serve(logger logging.KVLogger) {

runCtx, runCancel := context.WithCancel(context.Background())

launcher := uploads.NewLauncher().
FileLocker(locker).
Store(store).
DB(db).
PublicKey(k).
Logger(logger).
CORSDomains(cfg.V.GetStringSlice("CORSDomains")).
BusRedisURL(cfg.V.GetString("RedisBus"))
launcher := uploads.NewLauncher(
uploads.WithFileLocker(locker),
uploads.WithS3Client(client),
uploads.WithS3Bucket(s3cfg.Bucket),
uploads.WithDB(db),
uploads.WithPublicKey(k),
uploads.WithLogger(logger),
uploads.WithCORSDomains(cfg.V.GetStringSlice("CORSDomains")),
uploads.WithBusRedisURL(cfg.V.GetString("RedisBus")),
)

go func() {
trap := make(chan os.Signal, 1)
Expand Down
10 changes: 0 additions & 10 deletions apps/uploads/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/Pallinder/go-randomdata"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/s3"
)

Expand Down Expand Up @@ -43,15 +42,6 @@ func NewUploadTestHelper(t *testing.T) (*UploadTestHelper, error) {
return nil, err
}

_, err = client.CreateBucket(&s3.CreateBucketInput{
Bucket: aws.String(s3cfg.Bucket),
})
if err != nil {
if awsErr, ok := err.(awserr.Error); !ok || awsErr.Code() != "BucketAlreadyOwnedByYou" {
return nil, err
}
}

t.Cleanup(func() {
listObjectsOutput, err := client.ListObjects(&s3.ListObjectsInput{
Bucket: aws.String(s3cfg.Bucket),
Expand Down
139 changes: 90 additions & 49 deletions apps/uploads/uploads.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ import (
"github.com/OdyseeTeam/odysee-api/pkg/bus"
"github.com/OdyseeTeam/odysee-api/pkg/keybox"
"github.com/OdyseeTeam/odysee-api/pkg/logging"
"github.com/hibiken/asynq"
"github.com/lestrrat-go/jwx/v2/jwt"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/go-chi/chi/v5"
"github.com/go-chi/cors"
"github.com/go-chi/jwtauth/v5"
"github.com/hibiken/asynq"
"github.com/lestrrat-go/jwx/v2/jwt"
tusd "github.com/tus/tusd/pkg/handler"
"github.com/tus/tusd/pkg/s3store"
)
Expand All @@ -35,6 +38,22 @@ var (
reExtractFileID = regexp.MustCompile(`([^/]{32,})\/?$`)
)

// Handler handle media publishing on odysee-api, it implements TUS
// specifications to support resumable file upload and extends the handler to
// support fetching media from remote url.
type Handler struct {
*tusd.UnroutedHandler
bus *bus.Client
s3bucket string
queries *database.Queries
logger logging.KVLogger
jwtAuth *jwtauth.JWTAuth
tokenValidator *keybox.Validator
stopChan chan struct{}
}

type LauncherOption func(*Launcher)

type Launcher struct {
corsDomains []string
db database.DBTX
Expand All @@ -44,79 +63,86 @@ type Launcher struct {
publicKey crypto.PublicKey
router chi.Router
busRedisURL string
s3Store s3store.S3Store
s3client *s3.S3
s3bucket string
handler *Handler
httpServer *http.Server
logger logging.KVLogger
readyCancel context.CancelFunc
}

// Handler handle media publishing on odysee-api, it implements TUS
// specifications to support resumable file upload and extends the handler to
// support fetching media from remote url.
type Handler struct {
*tusd.UnroutedHandler
bus *bus.Client
bucketName string
queries *database.Queries
logger logging.KVLogger
jwtAuth *jwtauth.JWTAuth
tokenValidator *keybox.Validator
stopChan chan struct{}
}

func NewLauncher() *Launcher {
func NewLauncher(options ...LauncherOption) *Launcher {
launcher := &Launcher{
logger: logging.NoopKVLogger{},
prefix: "/v1/uploads",
httpAddress: ":8080",
}

for _, opt := range options {
opt(launcher)
}

return launcher
}

func (c *Launcher) Logger(logger logging.KVLogger) *Launcher {
c.logger = logger
return c
func WithLogger(logger logging.KVLogger) LauncherOption {
return func(l *Launcher) {
l.logger = logger
}
}

func (c *Launcher) Store(s s3store.S3Store) *Launcher {
c.s3Store = s
return c
func WithS3Client(client *s3.S3) LauncherOption {
return func(l *Launcher) {
l.s3client = client
}
}

func (c *Launcher) FileLocker(fileLocker tusd.Locker) *Launcher {
c.fileLocker = fileLocker
return c
func WithS3Bucket(bucket string) LauncherOption {
return func(l *Launcher) {
l.s3bucket = bucket
}
}

func (c *Launcher) Prefix(prefix string) *Launcher {
c.prefix = prefix
return c
func WithFileLocker(fileLocker tusd.Locker) LauncherOption {
return func(l *Launcher) {
l.fileLocker = fileLocker
}
}

func (c *Launcher) PublicKey(publicKey crypto.PublicKey) *Launcher {
c.publicKey = publicKey
return c
func WithPrefix(prefix string) LauncherOption {
return func(l *Launcher) {
l.prefix = prefix
}
}

func (c *Launcher) BusRedisURL(busRedisURL string) *Launcher {
c.busRedisURL = busRedisURL
return c
func WithPublicKey(publicKey crypto.PublicKey) LauncherOption {
return func(l *Launcher) {
l.publicKey = publicKey
}
}

func (c *Launcher) HTTPAddress(address string) *Launcher {
c.httpAddress = address
return c
func WithBusRedisURL(busRedisURL string) LauncherOption {
return func(l *Launcher) {
l.busRedisURL = busRedisURL
}
}

func (c *Launcher) DB(db database.DBTX) *Launcher {
c.db = db
return c
func WithHTTPAddress(address string) LauncherOption {
return func(l *Launcher) {
l.httpAddress = address
}
}

func (c *Launcher) CORSDomains(domains []string) *Launcher {
c.corsDomains = domains
return c
func WithDB(db database.DBTX) LauncherOption {
return func(l *Launcher) {
l.db = db
}
}

func WithCORSDomains(domains []string) LauncherOption {
return func(l *Launcher) {
l.corsDomains = domains
}
}

func (l *Launcher) Build() (chi.Router, error) {
Expand All @@ -125,10 +151,25 @@ func (l *Launcher) Build() (chi.Router, error) {
return nil, err
}

l.logger.Info("building upload handler")
l.logger.Info("creating s3 bucket", "bucket", l.s3bucket)
_, err = l.s3client.CreateBucket(&s3.CreateBucketInput{
Bucket: aws.String(l.s3bucket),
})
if err != nil {
var awsError awserr.Error
if errors.As(err, &awsError) && awsError.Code() == "BucketAlreadyOwnedByYou" {
l.logger.Info("bucket already exists", "bucket", l.s3bucket)
} else {
return nil, err
}
}

readyCtx, readyCancel := context.WithCancel(context.Background())

l.logger.Info("building upload handler")
store := s3store.New(l.s3bucket, l.s3client)
handler := &Handler{
bucketName: l.s3Store.Bucket,
s3bucket: l.s3bucket,
logger: l.logger,
queries: database.New(l.db),
tokenValidator: validator,
Expand All @@ -150,7 +191,7 @@ func (l *Launcher) Build() (chi.Router, error) {

composer := tusd.NewStoreComposer()
composer.UseLocker(l.fileLocker)
l.s3Store.UseIn(composer)
store.UseIn(composer)

tusConfig := tusd.Config{
StoreComposer: composer,
Expand Down Expand Up @@ -304,7 +345,7 @@ func (h *Handler) listenToHooks() {

go listen(h.CompleteUploads, func(uid int32, event tusd.HookEvent) {
err := completeUpload(
h.queries, h.bus, uid, event.Upload.ID, event.Upload.MetaData["filename"], h.bucketName, event.Upload.Storage["Key"])
h.queries, h.bus, uid, event.Upload.ID, event.Upload.MetaData["filename"], h.s3bucket, event.Upload.Storage["Key"])
if err != nil {
h.logger.Warn("completing upload failed", "user_id", uid, "upload_id", event.Upload.ID, "err", err)
return
Expand Down
13 changes: 9 additions & 4 deletions apps/uploads/uploads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/go-chi/chi/v5"
"github.com/go-redis/redis/v8"
"github.com/stretchr/testify/suite"
"github.com/tus/tusd/pkg/s3store"
)

type uploadSuite struct {
Expand Down Expand Up @@ -187,7 +186,7 @@ func (s *uploadSuite) TestUploadLarger() {
return nil
})
s.Equal(database.UploadStatusCreated, upload.Status)
s.Equal(fmt.Sprintf("%v", userID), upload.UserID)
s.Equal(userID, upload.UserID)
s.Empty(upload.Filename)
s.Empty(upload.Key)

Expand Down Expand Up @@ -388,7 +387,6 @@ func (s *uploadSuite) SetupSuite() {

client, err := configng.NewS3Client(upHelper.S3Config)
s.Require().NoError(err)
store := s3store.New(upHelper.S3Config.Bucket, client)

redisOpts, err := redis.ParseURL("redis://:odyredis@localhost:6379/0")
if err != nil {
Expand All @@ -405,7 +403,14 @@ func (s *uploadSuite) SetupSuite() {
kf, err := keybox.GenerateKeyfob()
s.Require().NoError(err)

l := NewLauncher().FileLocker(locker).Store(store).DB(upHelper.DB).PublicKey(kf.PublicKey()).Logger(zapadapter.NewKV(nil))
l := NewLauncher(
WithFileLocker(locker),
WithS3Client(client),
WithS3Bucket(upHelper.S3Config.Bucket),
WithDB(upHelper.DB),
WithPublicKey(kf.PublicKey()),
WithLogger(zapadapter.NewKV(nil)),
)
r, err := l.Build()
s.Require().NoError(err)

Expand Down

0 comments on commit ae3de30

Please sign in to comment.