Skip to content

Commit

Permalink
feat(handler, s3store): implement ContentServerDataStore for direct c…
Browse files Browse the repository at this point in the history
…ontent serving, closes #1064

- Add ServerDataStore interface
- Update handlers to use ContentServerDataStore when available
- Implement range request handling for s3upload
- Add tests for new ContentServerDataStore functionality
- Update Go version to 1.22.1
  • Loading branch information
pcfreak30 committed Oct 21, 2024
1 parent 9779a84 commit 6e09111
Show file tree
Hide file tree
Showing 7 changed files with 451 additions and 3 deletions.
4 changes: 2 additions & 2 deletions cmd/tusd/cli/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,10 @@ func Serve() {
err = serveTLS(server, listener)
}

// Note: http.Server.Serve and http.Server.ServeTLS (in serveTLS) always return a non-nil error code. So
// Note: http.ContentServer.Serve and http.ContentServer.ServeTLS (in serveTLS) always return a non-nil error code. So
// we can assume from here that `err != nil`
if err == http.ErrServerClosed {
// ErrServerClosed means that http.Server.Shutdown was called due to an interruption signal.
// ErrServerClosed means that http.ContentServer.Shutdown was called due to an interruption signal.
// We wait until the interruption procedure is complete or times out and then exit main.
<-shutdownComplete
} else {
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ module github.com/tus/tusd/v2
// Specify the Go version needed for the Heroku deployment
// See https://github.com/heroku/heroku-buildpack-go#go-module-specifics
// +heroku goVersion go1.22
go 1.21.0
go 1.22.1

toolchain go1.22.7

require (
Expand Down
6 changes: 6 additions & 0 deletions pkg/handler/composer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ type StoreComposer struct {
Concater ConcaterDataStore
UsesLengthDeferrer bool
LengthDeferrer LengthDeferrerDataStore
ContentServer ContentServerDataStore
UsesContentServer bool
}

// NewStoreComposer creates a new and empty store composer.
Expand Down Expand Up @@ -85,3 +87,7 @@ func (store *StoreComposer) UseLengthDeferrer(ext LengthDeferrerDataStore) {
store.UsesLengthDeferrer = ext != nil
store.LengthDeferrer = ext
}
func (store *StoreComposer) UseContentServer(ext ContentServerDataStore) {
store.UsesContentServer = ext != nil
store.ContentServer = ext
}
11 changes: 11 additions & 0 deletions pkg/handler/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handler
import (
"context"
"io"
"net/http"
)

type MetaData map[string]string
Expand Down Expand Up @@ -121,6 +122,16 @@ type DataStore interface {
GetUpload(ctx context.Context, id string) (upload Upload, err error)
}

// ServableUpload defines the method for serving content directly
type ServableUpload interface {
ServeContent(ctx context.Context, w http.ResponseWriter, r *http.Request) error
}

// ContentServerDataStore is the interface for data stores that can serve content directly
type ContentServerDataStore interface {
AsServableUpload(upload Upload) ServableUpload
}

type TerminatableUpload interface {
// Terminate an upload so any further requests to the upload resource will
// return the ErrNotFound error.
Expand Down
11 changes: 11 additions & 0 deletions pkg/handler/unrouted_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,17 @@ func (handler *UnroutedHandler) GetFile(w http.ResponseWriter, r *http.Request)
return
}

// If the data store implements ContentServerDataStore, use the ServableUpload interface
if handler.composer.UsesContentServer {
servableUpload := handler.composer.ContentServer.AsServableUpload(upload)
err = servableUpload.ServeContent(c, w, r)
if err != nil {
handler.sendError(c, err)
}
return
}

// Fall back to the existing GetReader implementation if ContentServerDataStore is not implemented
contentType, contentDisposition := filterContentType(info)
resp := HTTPResponse{
StatusCode: http.StatusOK,
Expand Down
138 changes: 138 additions & 0 deletions pkg/s3store/s3store.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ import (
"net/http"
"os"
"regexp"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -376,6 +377,81 @@ func (store S3Store) AsConcatableUpload(upload handler.Upload) handler.Concatabl
return upload.(*s3Upload)
}

func (store S3Store) AsServableUpload(upload handler.Upload) handler.ServableUpload {
return upload.(*s3Upload)
}

func (su *s3Upload) ServeContent(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
// Get file info
info, err := su.GetInfo(ctx)
if err != nil {
return err
}

// Prepare GetObject input
input := &s3.GetObjectInput{
Bucket: aws.String(su.store.Bucket),
Key: su.store.keyWithPrefix(su.objectId),
}

// Handle range requests
rangeHeader := r.Header.Get("Range")
if rangeHeader != "" {
if err := su.handleRangeRequest(ctx, w, r, info, input, rangeHeader); err != nil {
return err
}
return nil
}

// For non-range requests, serve the entire file
result, err := su.store.Service.GetObject(ctx, input)
if err != nil {
return err
}
defer result.Body.Close()

// Set headers
w.Header().Set("Content-Length", strconv.FormatInt(info.Size, 10))
w.Header().Set("Content-Type", info.MetaData["filetype"])
w.Header().Set("ETag", *result.ETag)

// Stream the content
_, err = io.Copy(w, result.Body)
return err
}

func (su *s3Upload) handleRangeRequest(ctx context.Context, w http.ResponseWriter, _ *http.Request, info handler.FileInfo, input *s3.GetObjectInput, rangeHeader string) error {
ranges, err := parseRange(rangeHeader, info.Size)
if err != nil {
http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable)
return err
}

if len(ranges) > 1 {
return fmt.Errorf("multiple ranges are not supported")
}

// Set the range in the GetObject input
input.Range = aws.String(fmt.Sprintf("bytes=%d-%d", ranges[0].start, ranges[0].end))

result, err := su.store.Service.GetObject(ctx, input)
if err != nil {
return err
}
defer result.Body.Close()

// Set headers for partial content
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", ranges[0].start, ranges[0].end, info.Size))
w.Header().Set("Content-Length", strconv.FormatInt(ranges[0].end-ranges[0].start+1, 10))
w.Header().Set("Content-Type", info.MetaData["filetype"])
w.Header().Set("ETag", *result.ETag)
w.WriteHeader(http.StatusPartialContent)

// Stream the content
_, err = io.Copy(w, result.Body)
return err
}

func (upload *s3Upload) writeInfo(ctx context.Context, info handler.FileInfo) error {
store := upload.store

Expand Down Expand Up @@ -1249,3 +1325,65 @@ func (store S3Store) releaseUploadSemaphore() {
store.uploadSemaphore.Release()
store.uploadSemaphoreDemandMetric.Dec()
}

// Helper function to parse range header
func parseRange(rangeHeader string, size int64) ([]struct{ start, end int64 }, error) {
if rangeHeader == "" {
return nil, fmt.Errorf("empty range header")
}

const b = "bytes="
if !strings.HasPrefix(rangeHeader, b) {
return nil, fmt.Errorf("invalid range header format")
}

var ranges []struct{ start, end int64 }
for _, ra := range strings.Split(rangeHeader[len(b):], ",") {
ra = strings.TrimSpace(ra)
if ra == "" {
continue
}
i := strings.Index(ra, "-")
if i < 0 {
return nil, fmt.Errorf("invalid range format")
}
start, end := strings.TrimSpace(ra[:i]), strings.TrimSpace(ra[i+1:])
var r struct{ start, end int64 }
if start == "" {
// suffix-byte-range-spec, like "-100"
n, err := strconv.ParseInt(end, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid range format")
}
if n > size {
n = size
}
r.start = size - n
r.end = size - 1
} else {
i, err := strconv.ParseInt(start, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid range format")
}
if i >= size {
return nil, fmt.Errorf("range out of bounds")
}
r.start = i
if end == "" {
// byte-range-spec, like "100-"
r.end = size - 1
} else {
i, err := strconv.ParseInt(end, 10, 64)
if err != nil || i >= size || i < r.start {
return nil, fmt.Errorf("invalid range format")
}
r.end = i
}
}
ranges = append(ranges, r)
}
if len(ranges) == 0 {
return nil, fmt.Errorf("no valid ranges")
}
return ranges, nil
}
Loading

0 comments on commit 6e09111

Please sign in to comment.