Skip to content

Commit

Permalink
Update standard columns to use civil.Date (#46)
Browse files Browse the repository at this point in the history
* Use civil.Date type for standard column date
* Use civil.Date throughout jsonlbundle and uploadbundle packages
  • Loading branch information
stephen-soltesz authored Apr 3, 2023
1 parent 14e4d6d commit 976d651
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 46 deletions.
6 changes: 5 additions & 1 deletion api/standard_columns.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
// Package api defines the datatype(s) generated by this tool.
package api

import (
"cloud.google.com/go/civil"
)

// StandardColumnsV0 defines version 0 of the standard columns included
// in every line (row) along with the raw data from the measurement service.
//
Expand All @@ -9,7 +13,7 @@ package api
// type should be declared as "any" but bigquery.InferSchema() does not
// support "any" (or "interface{}").
type StandardColumnsV0 struct {
Date string `bigquery:"date"` // yyyy-mm-dd pathname component of measurement data
Date civil.Date `bigquery:"date"` // yyyy-mm-dd pathname component of measurement data
Archiver ArchiverV0 `bigquery:"archiver"` // archiver details
Raw string `bigquery:"raw"` // measurement data (file contents) in JSON format
}
Expand Down
12 changes: 7 additions & 5 deletions internal/jsonlbundle/jsonlbundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"strings"
"time"

"cloud.google.com/go/civil"

"github.com/m-lab/go/timex"
"github.com/m-lab/jostler/api"
)
Expand All @@ -24,7 +26,7 @@ type JSONLBundle struct {
Index []api.IndexV1 // pathnames of data files in the index
Timestamp string // bundle's in-memory creation time that serves as its identifier
Datatype string // bundle's datatype
DateSubdir string // date subdirectory of files in this bundle (yyyy/mm/dd)
Date civil.Date // date subdirectory of files in this bundle (yyyy/mm/dd)
bucket string // GCS bucket
BundleDir string // GCS directory to upload this bundle to
BundleName string // GCS object name of this bundle
Expand Down Expand Up @@ -61,15 +63,15 @@ func Verbose(v func(string, ...interface{})) {
// |--------GCSConfig.DataDir--------| |------GCSConfig.BaseID------|
// autoload/v1/<experiment>/index1/<yyyy>/<mm>/<dd>/<timestamp>-<datatype>-<node>-<experiment>-index1.jsonl
// |------GCSConfig.IndexDir-----| |------GCSConfig.BaseID------|
func New(bucket, gcsDataDir, gcsIndexDir, gcsBaseID, datatype, dateSubdir string) *JSONLBundle {
func New(bucket, gcsDataDir, gcsIndexDir, gcsBaseID, datatype string, date civil.Date) *JSONLBundle {
nowUTC := time.Now().UTC()
return &JSONLBundle{
Lines: []string{},
BadFiles: []string{},
Index: []api.IndexV1{},
Timestamp: nowUTC.Format("2006/01/02T150405.000000Z"),
Datatype: datatype,
DateSubdir: dateSubdir,
Date: date,
BundleDir: dirName(gcsDataDir, nowUTC),
BundleName: objectName(nowUTC, gcsBaseID, "data"),
IndexDir: dirName(gcsIndexDir, nowUTC),
Expand All @@ -81,7 +83,7 @@ func New(bucket, gcsDataDir, gcsIndexDir, gcsBaseID, datatype, dateSubdir string

// Description returns a string describing the bundle for log messages.
func (jb *JSONLBundle) Description() string {
return fmt.Sprintf("bundle <%v %v %v>", jb.Timestamp, jb.Datatype, jb.DateSubdir)
return fmt.Sprintf("bundle <%v %v %v>", jb.Timestamp, jb.Datatype, jb.Date)
}

// HasFile returns true or false depending on whether the bundle includes
Expand Down Expand Up @@ -110,7 +112,7 @@ func (jb *JSONLBundle) AddFile(fullPath, version, gitCommit string) error {
return err
}
stdCols := api.StandardColumnsV0{
Date: strings.ReplaceAll(jb.DateSubdir, "/", "-"),
Date: jb.Date,
Archiver: api.ArchiverV0{
Version: version,
GitCommit: gitCommit,
Expand Down
20 changes: 11 additions & 9 deletions internal/jsonlbundle/jsonlbundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"testing"
"time"

"cloud.google.com/go/civil"

"github.com/m-lab/jostler/api"
"github.com/m-lab/jostler/internal/testhelper"
)
Expand All @@ -24,26 +26,26 @@ func TestNew(t *testing.T) {
gcsIndexDir string
gcsBaseID string
datatype string
dateSubdir string
date civil.Date
}{
{
gcsBucket: "some-bucket",
gcsDataDir: "some/path/in/gcs",
gcsIndexDir: "some/path/in/gcs",
gcsBaseID: "some-string",
datatype: "some-datatype",
dateSubdir: "2022/11/14",
date: civil.Date{Year: 2022, Month: time.November, Day: 14},
},
}
for i, test := range tests {
test := test
t.Logf("%s>>> test %02d%s", testhelper.ANSIPurple, i, testhelper.ANSIEnd)
gotjb := New(test.gcsBucket, test.gcsDataDir, test.gcsIndexDir, test.gcsBaseID, test.datatype, test.dateSubdir)
gotjb := New(test.gcsBucket, test.gcsDataDir, test.gcsIndexDir, test.gcsBaseID, test.datatype, test.date)
timestamp, err := time.Parse("2006/01/02T150405.000000Z", gotjb.Timestamp)
if err != nil {
t.Fatalf("time.Parse() = %v", err)
}
wantjb := newJb(test.gcsBucket, test.gcsDataDir, test.gcsIndexDir, test.gcsBaseID, test.datatype, test.dateSubdir, timestamp)
wantjb := newJb(test.gcsBucket, test.gcsDataDir, test.gcsIndexDir, test.gcsBaseID, test.datatype, test.date, timestamp)
if !reflect.DeepEqual(gotjb, wantjb) {
t.Fatalf("New() = %+v, want %+v", gotjb, wantjb)
}
Expand All @@ -54,7 +56,7 @@ func TestDescription(t *testing.T) {
t.Parallel()
nowUTC := time.Now().UTC()
jb := newTestJb(nowUTC)
wantDescription := fmt.Sprintf("bundle <%v %v %v>", nowUTC.Format("2006/01/02T150405.000000Z"), jb.Datatype, jb.DateSubdir)
wantDescription := fmt.Sprintf("bundle <%v %v %v>", nowUTC.Format("2006/01/02T150405.000000Z"), jb.Datatype, jb.Date)
if jb.Description() != wantDescription {
t.Fatalf("jb.Description() = %v, want %v", jb.Description(), wantDescription)
}
Expand Down Expand Up @@ -171,18 +173,18 @@ func newTestJb(timestamp time.Time) *JSONLBundle {
gcsIndexDir := "some/path/in/gcs"
gcsBaseID := "some-string"
datatype := "some-datatype"
dateSubdir := "2022/11/14"
return newJb(gcsBucket, gcsDataDir, gcsIndexDir, gcsBaseID, datatype, dateSubdir, timestamp)
date := civil.Date{Year: 2022, Month: time.November, Day: 14}
return newJb(gcsBucket, gcsDataDir, gcsIndexDir, gcsBaseID, datatype, date, timestamp)
}

func newJb(bucket, gcsDataDir, gcsIndexDir, gcsBaseID, datatype, dateSubdir string, timestamp time.Time) *JSONLBundle {
func newJb(bucket, gcsDataDir, gcsIndexDir, gcsBaseID, datatype string, date civil.Date, timestamp time.Time) *JSONLBundle {
return &JSONLBundle{
Lines: []string{},
BadFiles: []string{},
Index: []api.IndexV1{},
Timestamp: timestamp.Format("2006/01/02T150405.000000Z"),
Datatype: datatype,
DateSubdir: dateSubdir,
Date: date,
BundleDir: dirName(gcsDataDir, timestamp),
BundleName: objectName(timestamp, gcsBaseID, "data"),
IndexDir: dirName(gcsIndexDir, timestamp),
Expand Down
68 changes: 37 additions & 31 deletions internal/uploadbundle/uploadbundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"strings"
"time"

"cloud.google.com/go/civil"
"github.com/m-lab/jostler/internal/jsonlbundle"
"github.com/m-lab/jostler/internal/watchdir"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -50,12 +51,12 @@ type DirWatcher interface {
// UploadBundle defines configuration options and other fields that are
// common to all instances of JSONL bundles (see jsonlBundle).
type UploadBundle struct {
wdClient DirWatcher // directory watcher that notifies us
gcsConf GCSConfig // GCS configuration
bundleConf BundleConfig // bundle configuration
ageChan chan *jsonlbundle.JSONLBundle // notification channel for when bundle reaches maximum age
activeBundles map[string]*jsonlbundle.JSONLBundle // bundles that are active
uploadBundles map[string]struct{} // bundles that are being uploaded or were uploaded
wdClient DirWatcher // directory watcher that notifies us
gcsConf GCSConfig // GCS configuration
bundleConf BundleConfig // bundle configuration
ageChan chan *jsonlbundle.JSONLBundle // notification channel for when bundle reaches maximum age
activeBundles map[civil.Date]*jsonlbundle.JSONLBundle // bundles that are active
uploadBundles map[string]struct{} // bundles that are being uploaded or were uploaded
}

// Uploader interface.
Expand Down Expand Up @@ -96,6 +97,7 @@ var (
ErrNotRegular = errors.New("is not a regular file")
ErrEmpty = errors.New("is empty")
ErrTooBig = errors.New("is too big to fit in a bundle")
ErrDateParse = errors.New("date unparseable")
)

var (
Expand Down Expand Up @@ -134,7 +136,7 @@ func New(ctx context.Context, wdClient DirWatcher, gcsConf GCSConfig, bundleConf
gcsConf: gcsConf,
bundleConf: bundleConf,
ageChan: make(chan *jsonlbundle.JSONLBundle),
activeBundles: make(map[string]*jsonlbundle.JSONLBundle, weekDays),
activeBundles: make(map[civil.Date]*jsonlbundle.JSONLBundle, weekDays),
uploadBundles: make(map[string]struct{}, numUploads),
}
ub.bundleConf.SpoolDir = filepath.Clean(ub.bundleConf.SpoolDir)
Expand Down Expand Up @@ -180,15 +182,15 @@ func (ub *UploadBundle) BundleAndUpload(ctx context.Context) error {
func (ub *UploadBundle) bundleFile(ctx context.Context, fullPath string) {
// Validate the file's pathname and get its date subdirectory
// and size.
dateSubdir, fileSize, err := ub.fileDetails(fullPath)
date, fileSize, err := ub.fileDetails(fullPath)
if err != nil {
verbose("WARNING: ignoring %v: %v", fullPath, err)
return
}
verbose("%v %v bytes", fullPath, fileSize)

// Is there an active bundle that this file belongs to?
jb := ub.activeBundles[dateSubdir]
jb := ub.activeBundles[date]
if jb != nil {
// Sanity check.
if jb.HasFile(fullPath) {
Expand All @@ -204,7 +206,7 @@ func (ub *UploadBundle) bundleFile(ctx context.Context, fullPath string) {
}
}
if jb == nil {
jb = ub.newJSONLBundle(dateSubdir)
jb = ub.newJSONLBundle(date)
}
// Add the contents of this file to the bundle.
if err := jb.AddFile(fullPath, ub.bundleConf.Version, ub.bundleConf.GitCommit); err != nil {
Expand All @@ -218,60 +220,64 @@ func (ub *UploadBundle) bundleFile(ctx context.Context, fullPath string) {
// /cache/data/<experiment>/<datatype>/<yyyy>/<mm>/<dd>/<filename>
// and is a regular file. Then it makes sure it's not too big.
// If all is OK, it returns the date component of the file's pathname
// ("yyyy/mm/dd") and the file size.
func (ub *UploadBundle) fileDetails(fullPath string) (string, int64, error) {
// ("yyyy/mm/dd") as a civil.Date with the file size.
func (ub *UploadBundle) fileDetails(fullPath string) (civil.Date, int64, error) {
cleanFilePath := filepath.Clean(fullPath)
dataDir := ub.bundleConf.SpoolDir
if !strings.HasPrefix(cleanFilePath, dataDir) {
return "", 0, fmt.Errorf("%v: %w", cleanFilePath, ErrNotInDataDir)
return civil.Date{}, 0, fmt.Errorf("%v: %w", cleanFilePath, ErrNotInDataDir)
}
if len(cleanFilePath) <= len(dataDir) {
return "", 0, fmt.Errorf("%v: %w", cleanFilePath, ErrTooShort)
return civil.Date{}, 0, fmt.Errorf("%v: %w", cleanFilePath, ErrTooShort)
}
pathName := regexp.MustCompile(`[^a-zA-Z0-9/:._-]`)
if pathName.MatchString(cleanFilePath) {
return "", 0, fmt.Errorf("%v: %w", cleanFilePath, ErrInvalidChars)
return civil.Date{}, 0, fmt.Errorf("%v: %w", cleanFilePath, ErrInvalidChars)
}
if strings.Contains(cleanFilePath, "..") {
return "", 0, fmt.Errorf("%v: %w", cleanFilePath, ErrDotDot)
return civil.Date{}, 0, fmt.Errorf("%v: %w", cleanFilePath, ErrDotDot)
}
dateSubdir, filename := filepath.Split(cleanFilePath[len(dataDir):])
yyyymmdd := regexp.MustCompile(`/20[0-9][0-9]/[0-9]{2}/[0-9]{2}/`)
if len(dateSubdir) != 12 || !yyyymmdd.MatchString(dateSubdir) {
return "", 0, fmt.Errorf("%v: %w", cleanFilePath, ErrDateDir)
return civil.Date{}, 0, fmt.Errorf("%v: %w", cleanFilePath, ErrDateDir)
}
if strings.HasPrefix(filename, ".") {
return "", 0, fmt.Errorf("%v: %w", filename, ErrDotFile)
return civil.Date{}, 0, fmt.Errorf("%v: %w", filename, ErrDotFile)
}
fi, err := os.Stat(fullPath)
if err != nil {
return "", 0, fmt.Errorf("failed to stat: %w", err)
return civil.Date{}, 0, fmt.Errorf("failed to stat: %w", err)
}
if !fi.Mode().IsRegular() {
return "", 0, fmt.Errorf("%v: %w", filename, ErrNotRegular)
return civil.Date{}, 0, fmt.Errorf("%v: %w", filename, ErrNotRegular)
}
if uint(fi.Size()) == 0 {
return "", 0, fmt.Errorf("%v: %w", filename, ErrEmpty)
return civil.Date{}, 0, fmt.Errorf("%v: %w", filename, ErrEmpty)
}
if uint(fi.Size()) > ub.bundleConf.SizeMax {
return "", 0, fmt.Errorf("%v: %w", filename, ErrTooBig)
return civil.Date{}, 0, fmt.Errorf("%v: %w", filename, ErrTooBig)
}
return dateSubdir[1:11], fi.Size(), nil
date, err := civil.ParseDate(strings.ReplaceAll(dateSubdir[1:11], "/", "-"))
if err != nil {
return civil.Date{}, 0, fmt.Errorf("%v: %w", filename, ErrDateParse)
}
return date, fi.Size(), nil
}

// newJSONLBundle creates and returns a new active bundle instance.
func (ub *UploadBundle) newJSONLBundle(dateSubdir string) *jsonlbundle.JSONLBundle {
func (ub *UploadBundle) newJSONLBundle(date civil.Date) *jsonlbundle.JSONLBundle {
// Sanity check: make sure we don't already have a bundle for
// the given date.
if jb, ok := ub.activeBundles[dateSubdir]; ok {
if dateSubdir == jb.DateSubdir {
if jb, ok := ub.activeBundles[date]; ok {
if date == jb.Date {
log.Printf("INTERNAL ERROR: an active %v already exists", jb.Description())
}
log.Printf("INTERNAL ERROR: key %v returned active %v", dateSubdir, jb.Description())
log.Printf("INTERNAL ERROR: key %s returned active %v", date, jb.Description())
}

jb := jsonlbundle.New(ub.gcsConf.Bucket, ub.gcsConf.DataDir, ub.gcsConf.IndexDir, ub.gcsConf.BaseID, ub.bundleConf.Datatype, dateSubdir)
ub.activeBundles[dateSubdir] = jb
jb := jsonlbundle.New(ub.gcsConf.Bucket, ub.gcsConf.DataDir, ub.gcsConf.IndexDir, ub.gcsConf.BaseID, ub.bundleConf.Datatype, date)
ub.activeBundles[date] = jb
verbose("created active %v", jb.Description())
time.AfterFunc(ub.bundleConf.AgeMax, func() {
ub.ageChan <- jb
Expand All @@ -298,7 +304,7 @@ func (ub *UploadBundle) uploadAgedBundle(ctx context.Context, jb *jsonlbundle.JS
// the uploads process to GCS in the background.
func (ub *UploadBundle) uploadBundle(ctx context.Context, jb *jsonlbundle.JSONLBundle) {
// Sanity check.
if _, ok := ub.activeBundles[jb.DateSubdir]; !ok {
if _, ok := ub.activeBundles[jb.Date]; !ok {
log.Printf("INTERNAL ERROR: %v not in active bundles map", jb.Description())
}
if len(jb.Lines) != len(jb.Index)+len(jb.BadFiles) {
Expand All @@ -308,7 +314,7 @@ func (ub *UploadBundle) uploadBundle(ctx context.Context, jb *jsonlbundle.JSONLB
// Add the bundle to upload bundles map.
ub.uploadBundles[jb.Timestamp] = struct{}{}
// Delete the bundle from active bundles map.
delete(ub.activeBundles, jb.DateSubdir)
delete(ub.activeBundles, jb.Date)

// Start the upload process in the background and acknowledge
// the files of this bundle with the directory watcher.
Expand Down

0 comments on commit 976d651

Please sign in to comment.