Skip to content

Commit

Permalink
Decrease loader batch size (#291)
Browse files Browse the repository at this point in the history
  • Loading branch information
irees authored Dec 5, 2024
1 parent ddef50c commit 7b5d4f0
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 98 deletions.
47 changes: 27 additions & 20 deletions cmd/tlserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,20 @@ func main() {
}

type ServerCommand struct {
Timeout int
LongQueryDuration int
Port string
RestPrefix string
LoadAdmins bool
ValidateLargeFiles bool
SecretsFile string
Storage string
RTStorage string
DBURL string
RedisURL string
secrets []dmfr.Secret
Timeout int
LongQueryDuration int
Port string
RestPrefix string
LoadAdmins bool
ValidateLargeFiles bool
LoaderBatchSize int
LoaderStopTimeBatchSize int
SecretsFile string
Storage string
RTStorage string
DBURL string
RedisURL string
secrets []dmfr.Secret
}

func (cmd *ServerCommand) HelpDesc() (string, string) {
Expand All @@ -102,6 +104,9 @@ func (cmd *ServerCommand) AddFlags(fl *pflag.FlagSet) {
fl.IntVar(&cmd.Timeout, "timeout", 60, "")
fl.IntVar(&cmd.LongQueryDuration, "long-query", 1000, "Log queries over this duration (ms)")
fl.BoolVar(&cmd.LoadAdmins, "load-admins", false, "Load admin polygons from database into memory")
fl.IntVar(&cmd.LoaderBatchSize, "loader-batch-size", 100, "GraphQL Loader batch size")
fl.IntVar(&cmd.LoaderStopTimeBatchSize, "loader-stop-time-batch-size", 1, "GraphQL Loader batch size for StopTimes")

}

func (cmd *ServerCommand) Parse(args []string) error {
Expand Down Expand Up @@ -168,14 +173,16 @@ func (cmd *ServerCommand) Run() error {

// Setup config
cfg := model.Config{
Finder: dbFinder,
RTFinder: rtFinder,
GbfsFinder: gbfsFinder,
Secrets: cmd.secrets,
Storage: cmd.Storage,
RTStorage: cmd.RTStorage,
ValidateLargeFiles: cmd.ValidateLargeFiles,
RestPrefix: cmd.RestPrefix,
Finder: dbFinder,
RTFinder: rtFinder,
GbfsFinder: gbfsFinder,
Secrets: cmd.secrets,
Storage: cmd.Storage,
RTStorage: cmd.RTStorage,
ValidateLargeFiles: cmd.ValidateLargeFiles,
RestPrefix: cmd.RestPrefix,
LoaderBatchSize: cmd.LoaderBatchSize,
LoaderStopTimeBatchSize: cmd.LoaderStopTimeBatchSize,
}

// Setup router
Expand Down
28 changes: 15 additions & 13 deletions model/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,21 @@ import (
)

type Config struct {
Finder Finder
RTFinder RTFinder
GbfsFinder GbfsFinder
Checker Checker
Actions Actions
JobQueue jobs.JobQueue
Clock clock.Clock
Secrets []dmfr.Secret
ValidateLargeFiles bool
DisableImage bool
RestPrefix string
Storage string
RTStorage string
Finder Finder
RTFinder RTFinder
GbfsFinder GbfsFinder
Checker Checker
Actions Actions
JobQueue jobs.JobQueue
Clock clock.Clock
Secrets []dmfr.Secret
ValidateLargeFiles bool
DisableImage bool
RestPrefix string
Storage string
RTStorage string
LoaderBatchSize int
LoaderStopTimeBatchSize int
}

var finderCtxKey = &contextKey{"finderConfig"}
Expand Down
136 changes: 71 additions & 65 deletions server/gql/loaders.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type ctxKey string
const (
loadersKey = ctxKey("dataloaders")
waitTime = 2 * time.Millisecond
maxBatch = 1_000
maxBatch = 100
)

// Loaders wrap your data loaders to inject via middleware
Expand Down Expand Up @@ -88,70 +88,76 @@ type Loaders struct {
}

// NewLoaders instantiates data loaders for the middleware
func NewLoaders(dbf model.Finder) *Loaders {
func NewLoaders(dbf model.Finder, batchSize int, stopTimeBatchSize int) *Loaders {
if batchSize == 0 {
batchSize = maxBatch
}
if stopTimeBatchSize == 0 {
stopTimeBatchSize = maxBatch
}
loaders := &Loaders{
AgenciesByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.AgenciesByFeedVersionID),
AgenciesByID: withWaitAndCapacity(waitTime, maxBatch, dbf.AgenciesByID),
AgenciesByOnestopID: withWaitAndCapacity(waitTime, maxBatch, dbf.AgenciesByOnestopID),
AgencyPlacesByAgencyID: withWaitAndCapacity(waitTime, maxBatch, dbf.AgencyPlacesByAgencyID),
CalendarDatesByServiceID: withWaitAndCapacity(waitTime, maxBatch, dbf.CalendarDatesByServiceID),
CalendarsByID: withWaitAndCapacity(waitTime, maxBatch, dbf.CalendarsByID),
CensusGeographiesByEntityID: withWaitAndCapacity(waitTime, maxBatch, dbf.CensusGeographiesByEntityID),
CensusTableByID: withWaitAndCapacity(waitTime, maxBatch, dbf.CensusTableByID),
CensusFieldsByTableID: withWaitAndCapacity(waitTime, maxBatch, dbf.CensusFieldsByTableID),
CensusValuesByGeographyID: withWaitAndCapacity(waitTime, maxBatch, dbf.CensusValuesByGeographyID),
FeedFetchesByFeedID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedFetchesByFeedID),
FeedInfosByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedInfosByFeedVersionID),
FeedsByID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedsByID),
FeedsByOperatorOnestopID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedsByOperatorOnestopID),
FeedStatesByFeedID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedStatesByFeedID),
FeedVersionFileInfosByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedVersionFileInfosByFeedVersionID),
FeedVersionGeometryByID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedVersionGeometryByID),
FeedVersionGtfsImportByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedVersionGtfsImportByFeedVersionID), FeedVersionServiceWindowByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedVersionServiceWindowByFeedVersionID),
FeedVersionsByFeedID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedVersionsByFeedID),
FeedVersionsByID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedVersionsByID),
FeedVersionServiceLevelsByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedVersionServiceLevelsByFeedVersionID),
FrequenciesByTripID: withWaitAndCapacity(waitTime, maxBatch, dbf.FrequenciesByTripID),
LevelsByID: withWaitAndCapacity(waitTime, maxBatch, dbf.LevelsByID),
LevelsByParentStationID: withWaitAndCapacity(waitTime, maxBatch, dbf.LevelsByParentStationID),
OperatorsByAgencyID: withWaitAndCapacity(waitTime, maxBatch, dbf.OperatorsByAgencyID),
OperatorsByCOIF: withWaitAndCapacity(waitTime, maxBatch, dbf.OperatorsByCOIF),
OperatorsByFeedID: withWaitAndCapacity(waitTime, maxBatch, dbf.OperatorsByFeedID),
PathwaysByFromStopID: withWaitAndCapacity(waitTime, maxBatch, dbf.PathwaysByFromStopID),
PathwaysByID: withWaitAndCapacity(waitTime, maxBatch, dbf.PathwaysByID),
PathwaysByToStopID: withWaitAndCapacity(waitTime, maxBatch, dbf.PathwaysByToStopID),
RouteAttributesByRouteID: withWaitAndCapacity(waitTime, maxBatch, dbf.RouteAttributesByRouteID),
RouteGeometriesByRouteID: withWaitAndCapacity(waitTime, maxBatch, dbf.RouteGeometriesByRouteID),
RouteHeadwaysByRouteID: withWaitAndCapacity(waitTime, maxBatch, dbf.RouteHeadwaysByRouteID),
RoutesByAgencyID: withWaitAndCapacity(waitTime, maxBatch, dbf.RoutesByAgencyID),
RoutesByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.RoutesByFeedVersionID),
RoutesByID: withWaitAndCapacity(waitTime, maxBatch, dbf.RoutesByID),
RouteStopPatternsByRouteID: withWaitAndCapacity(waitTime, maxBatch, dbf.RouteStopPatternsByRouteID),
RouteStopsByRouteID: withWaitAndCapacity(waitTime, maxBatch, dbf.RouteStopsByRouteID),
RouteStopsByStopID: withWaitAndCapacity(waitTime, maxBatch, dbf.RouteStopsByStopID),
SegmentPatternsByRouteID: withWaitAndCapacity(waitTime, maxBatch, dbf.SegmentPatternsByRouteID),
SegmentPatternsBySegmentID: withWaitAndCapacity(waitTime, maxBatch, dbf.SegmentPatternsBySegmentID),
SegmentsByID: withWaitAndCapacity(waitTime, maxBatch, dbf.SegmentsByID),
SegmentsByRouteID: withWaitAndCapacity(waitTime, maxBatch, dbf.SegmentsByRouteID),
SegmentsByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.SegmentsByFeedVersionID),
ShapesByID: withWaitAndCapacity(waitTime, maxBatch, dbf.ShapesByID),
StopExternalReferencesByStopID: withWaitAndCapacity(waitTime, maxBatch, dbf.StopExternalReferencesByStopID),
StopObservationsByStopID: withWaitAndCapacity(waitTime, maxBatch, dbf.StopObservationsByStopID),
StopPlacesByStopID: withWaitAndCapacity(waitTime, maxBatch, dbf.StopPlacesByStopID),
StopsByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.StopsByFeedVersionID),
StopsByID: withWaitAndCapacity(waitTime, maxBatch, dbf.StopsByID),
StopsByLevelID: withWaitAndCapacity(waitTime, maxBatch, dbf.StopsByLevelID),
StopsByParentStopID: withWaitAndCapacity(waitTime, maxBatch, dbf.StopsByParentStopID),
StopsByRouteID: withWaitAndCapacity(waitTime, maxBatch, dbf.StopsByRouteID),
StopTimesByStopID: withWaitAndCapacity(waitTime, maxBatch, dbf.StopTimesByStopID),
StopTimesByTripID: withWaitAndCapacity(waitTime, maxBatch, dbf.StopTimesByTripID),
TargetStopsByStopID: withWaitAndCapacity(waitTime, maxBatch, dbf.TargetStopsByStopID),
TripsByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.TripsByFeedVersionID),
TripsByID: withWaitAndCapacity(waitTime, maxBatch, dbf.TripsByID),
TripsByRouteID: withWaitAndCapacity(waitTime, maxBatch, dbf.TripsByRouteID),
ValidationReportErrorExemplarsByValidationReportErrorGroupID: withWaitAndCapacity(waitTime, maxBatch, dbf.ValidationReportErrorExemplarsByValidationReportErrorGroupID),
ValidationReportErrorGroupsByValidationReportID: withWaitAndCapacity(waitTime, maxBatch, dbf.ValidationReportErrorGroupsByValidationReportID),
ValidationReportsByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.ValidationReportsByFeedVersionID),
AgenciesByFeedVersionID: withWaitAndCapacity(waitTime, batchSize, dbf.AgenciesByFeedVersionID),
AgenciesByID: withWaitAndCapacity(waitTime, batchSize, dbf.AgenciesByID),
AgenciesByOnestopID: withWaitAndCapacity(waitTime, batchSize, dbf.AgenciesByOnestopID),
AgencyPlacesByAgencyID: withWaitAndCapacity(waitTime, batchSize, dbf.AgencyPlacesByAgencyID),
CalendarDatesByServiceID: withWaitAndCapacity(waitTime, batchSize, dbf.CalendarDatesByServiceID),
CalendarsByID: withWaitAndCapacity(waitTime, batchSize, dbf.CalendarsByID),
CensusGeographiesByEntityID: withWaitAndCapacity(waitTime, batchSize, dbf.CensusGeographiesByEntityID),
CensusTableByID: withWaitAndCapacity(waitTime, batchSize, dbf.CensusTableByID),
CensusFieldsByTableID: withWaitAndCapacity(waitTime, batchSize, dbf.CensusFieldsByTableID),
CensusValuesByGeographyID: withWaitAndCapacity(waitTime, batchSize, dbf.CensusValuesByGeographyID),
FeedFetchesByFeedID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedFetchesByFeedID),
FeedInfosByFeedVersionID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedInfosByFeedVersionID),
FeedsByID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedsByID),
FeedsByOperatorOnestopID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedsByOperatorOnestopID),
FeedStatesByFeedID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedStatesByFeedID),
FeedVersionFileInfosByFeedVersionID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedVersionFileInfosByFeedVersionID),
FeedVersionGeometryByID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedVersionGeometryByID),
FeedVersionGtfsImportByFeedVersionID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedVersionGtfsImportByFeedVersionID), FeedVersionServiceWindowByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedVersionServiceWindowByFeedVersionID),
FeedVersionsByFeedID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedVersionsByFeedID),
FeedVersionsByID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedVersionsByID),
FeedVersionServiceLevelsByFeedVersionID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedVersionServiceLevelsByFeedVersionID),
FrequenciesByTripID: withWaitAndCapacity(waitTime, batchSize, dbf.FrequenciesByTripID),
LevelsByID: withWaitAndCapacity(waitTime, batchSize, dbf.LevelsByID),
LevelsByParentStationID: withWaitAndCapacity(waitTime, batchSize, dbf.LevelsByParentStationID),
OperatorsByAgencyID: withWaitAndCapacity(waitTime, batchSize, dbf.OperatorsByAgencyID),
OperatorsByCOIF: withWaitAndCapacity(waitTime, batchSize, dbf.OperatorsByCOIF),
OperatorsByFeedID: withWaitAndCapacity(waitTime, batchSize, dbf.OperatorsByFeedID),
PathwaysByFromStopID: withWaitAndCapacity(waitTime, batchSize, dbf.PathwaysByFromStopID),
PathwaysByID: withWaitAndCapacity(waitTime, batchSize, dbf.PathwaysByID),
PathwaysByToStopID: withWaitAndCapacity(waitTime, batchSize, dbf.PathwaysByToStopID),
RouteAttributesByRouteID: withWaitAndCapacity(waitTime, batchSize, dbf.RouteAttributesByRouteID),
RouteGeometriesByRouteID: withWaitAndCapacity(waitTime, batchSize, dbf.RouteGeometriesByRouteID),
RouteHeadwaysByRouteID: withWaitAndCapacity(waitTime, batchSize, dbf.RouteHeadwaysByRouteID),
RoutesByAgencyID: withWaitAndCapacity(waitTime, batchSize, dbf.RoutesByAgencyID),
RoutesByFeedVersionID: withWaitAndCapacity(waitTime, batchSize, dbf.RoutesByFeedVersionID),
RoutesByID: withWaitAndCapacity(waitTime, batchSize, dbf.RoutesByID),
RouteStopPatternsByRouteID: withWaitAndCapacity(waitTime, batchSize, dbf.RouteStopPatternsByRouteID),
RouteStopsByRouteID: withWaitAndCapacity(waitTime, batchSize, dbf.RouteStopsByRouteID),
RouteStopsByStopID: withWaitAndCapacity(waitTime, batchSize, dbf.RouteStopsByStopID),
SegmentPatternsByRouteID: withWaitAndCapacity(waitTime, batchSize, dbf.SegmentPatternsByRouteID),
SegmentPatternsBySegmentID: withWaitAndCapacity(waitTime, batchSize, dbf.SegmentPatternsBySegmentID),
SegmentsByID: withWaitAndCapacity(waitTime, batchSize, dbf.SegmentsByID),
SegmentsByRouteID: withWaitAndCapacity(waitTime, batchSize, dbf.SegmentsByRouteID),
SegmentsByFeedVersionID: withWaitAndCapacity(waitTime, batchSize, dbf.SegmentsByFeedVersionID),
ShapesByID: withWaitAndCapacity(waitTime, batchSize, dbf.ShapesByID),
StopExternalReferencesByStopID: withWaitAndCapacity(waitTime, batchSize, dbf.StopExternalReferencesByStopID),
StopObservationsByStopID: withWaitAndCapacity(waitTime, batchSize, dbf.StopObservationsByStopID),
StopPlacesByStopID: withWaitAndCapacity(waitTime, batchSize, dbf.StopPlacesByStopID),
StopsByFeedVersionID: withWaitAndCapacity(waitTime, batchSize, dbf.StopsByFeedVersionID),
StopsByID: withWaitAndCapacity(waitTime, batchSize, dbf.StopsByID),
StopsByLevelID: withWaitAndCapacity(waitTime, batchSize, dbf.StopsByLevelID),
StopsByParentStopID: withWaitAndCapacity(waitTime, batchSize, dbf.StopsByParentStopID),
StopsByRouteID: withWaitAndCapacity(waitTime, batchSize, dbf.StopsByRouteID),
StopTimesByStopID: withWaitAndCapacity(waitTime, stopTimeBatchSize, dbf.StopTimesByStopID),
StopTimesByTripID: withWaitAndCapacity(waitTime, batchSize, dbf.StopTimesByTripID),
TargetStopsByStopID: withWaitAndCapacity(waitTime, batchSize, dbf.TargetStopsByStopID),
TripsByFeedVersionID: withWaitAndCapacity(waitTime, batchSize, dbf.TripsByFeedVersionID),
TripsByID: withWaitAndCapacity(waitTime, batchSize, dbf.TripsByID),
TripsByRouteID: withWaitAndCapacity(waitTime, batchSize, dbf.TripsByRouteID),
ValidationReportErrorExemplarsByValidationReportErrorGroupID: withWaitAndCapacity(waitTime, batchSize, dbf.ValidationReportErrorExemplarsByValidationReportErrorGroupID),
ValidationReportErrorGroupsByValidationReportID: withWaitAndCapacity(waitTime, batchSize, dbf.ValidationReportErrorGroupsByValidationReportID),
ValidationReportsByFeedVersionID: withWaitAndCapacity(waitTime, batchSize, dbf.ValidationReportsByFeedVersionID),
}
return loaders
}
Expand All @@ -162,7 +168,7 @@ func loaderMiddleware(next http.Handler) http.Handler {
// Is this OK to use as a long term cache?
ctx := r.Context()
cfg := model.ForContext(ctx)
loaders := NewLoaders(cfg.Finder)
loaders := NewLoaders(cfg.Finder, cfg.LoaderBatchSize, cfg.LoaderStopTimeBatchSize)
nextCtx := context.WithValue(ctx, loadersKey, loaders)
r = r.WithContext(nextCtx)
next.ServeHTTP(w, r)
Expand Down

0 comments on commit 7b5d4f0

Please sign in to comment.