From 7b5d4f0d95948c77c07728035da66f2b832e5319 Mon Sep 17 00:00:00 2001 From: Ian Rees Date: Thu, 5 Dec 2024 10:17:53 -0800 Subject: [PATCH] Decrease loader batch size (#291) --- cmd/tlserver/main.go | 47 ++++++++------- model/config.go | 28 +++++---- server/gql/loaders.go | 136 ++++++++++++++++++++++-------------------- 3 files changed, 113 insertions(+), 98 deletions(-) diff --git a/cmd/tlserver/main.go b/cmd/tlserver/main.go index d214e051..ed24f6cd 100644 --- a/cmd/tlserver/main.go +++ b/cmd/tlserver/main.go @@ -68,18 +68,20 @@ func main() { } type ServerCommand struct { - Timeout int - LongQueryDuration int - Port string - RestPrefix string - LoadAdmins bool - ValidateLargeFiles bool - SecretsFile string - Storage string - RTStorage string - DBURL string - RedisURL string - secrets []dmfr.Secret + Timeout int + LongQueryDuration int + Port string + RestPrefix string + LoadAdmins bool + ValidateLargeFiles bool + LoaderBatchSize int + LoaderStopTimeBatchSize int + SecretsFile string + Storage string + RTStorage string + DBURL string + RedisURL string + secrets []dmfr.Secret } func (cmd *ServerCommand) HelpDesc() (string, string) { @@ -102,6 +104,9 @@ func (cmd *ServerCommand) AddFlags(fl *pflag.FlagSet) { fl.IntVar(&cmd.Timeout, "timeout", 60, "") fl.IntVar(&cmd.LongQueryDuration, "long-query", 1000, "Log queries over this duration (ms)") fl.BoolVar(&cmd.LoadAdmins, "load-admins", false, "Load admin polygons from database into memory") + fl.IntVar(&cmd.LoaderBatchSize, "loader-batch-size", 100, "GraphQL Loader batch size") + fl.IntVar(&cmd.LoaderStopTimeBatchSize, "loader-stop-time-batch-size", 1, "GraphQL Loader batch size for StopTimes") + } func (cmd *ServerCommand) Parse(args []string) error { @@ -168,14 +173,16 @@ func (cmd *ServerCommand) Run() error { // Setup config cfg := model.Config{ - Finder: dbFinder, - RTFinder: rtFinder, - GbfsFinder: gbfsFinder, - Secrets: cmd.secrets, - Storage: cmd.Storage, - RTStorage: cmd.RTStorage, - ValidateLargeFiles: cmd.ValidateLargeFiles, - RestPrefix: cmd.RestPrefix, + Finder: dbFinder, + RTFinder: rtFinder, + GbfsFinder: gbfsFinder, + Secrets: cmd.secrets, + Storage: cmd.Storage, + RTStorage: cmd.RTStorage, + ValidateLargeFiles: cmd.ValidateLargeFiles, + RestPrefix: cmd.RestPrefix, + LoaderBatchSize: cmd.LoaderBatchSize, + LoaderStopTimeBatchSize: cmd.LoaderStopTimeBatchSize, } // Setup router diff --git a/model/config.go b/model/config.go index 204a5cf3..af40fa51 100644 --- a/model/config.go +++ b/model/config.go @@ -10,19 +10,21 @@ import ( ) type Config struct { - Finder Finder - RTFinder RTFinder - GbfsFinder GbfsFinder - Checker Checker - Actions Actions - JobQueue jobs.JobQueue - Clock clock.Clock - Secrets []dmfr.Secret - ValidateLargeFiles bool - DisableImage bool - RestPrefix string - Storage string - RTStorage string + Finder Finder + RTFinder RTFinder + GbfsFinder GbfsFinder + Checker Checker + Actions Actions + JobQueue jobs.JobQueue + Clock clock.Clock + Secrets []dmfr.Secret + ValidateLargeFiles bool + DisableImage bool + RestPrefix string + Storage string + RTStorage string + LoaderBatchSize int + LoaderStopTimeBatchSize int } var finderCtxKey = &contextKey{"finderConfig"} diff --git a/server/gql/loaders.go b/server/gql/loaders.go index a2e829cd..11f0714c 100644 --- a/server/gql/loaders.go +++ b/server/gql/loaders.go @@ -17,7 +17,7 @@ type ctxKey string const ( loadersKey = ctxKey("dataloaders") waitTime = 2 * time.Millisecond - maxBatch = 1_000 + maxBatch = 100 ) // Loaders wrap your data loaders to inject via middleware @@ -88,70 +88,76 @@ type Loaders struct { } // NewLoaders instantiates data loaders for the middleware -func NewLoaders(dbf model.Finder) *Loaders { +func NewLoaders(dbf model.Finder, batchSize int, stopTimeBatchSize int) *Loaders { + if batchSize == 0 { + batchSize = maxBatch + } + if stopTimeBatchSize == 0 { + stopTimeBatchSize = maxBatch + } loaders := &Loaders{ - AgenciesByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.AgenciesByFeedVersionID), - AgenciesByID: withWaitAndCapacity(waitTime, maxBatch, dbf.AgenciesByID), - AgenciesByOnestopID: withWaitAndCapacity(waitTime, maxBatch, dbf.AgenciesByOnestopID), - AgencyPlacesByAgencyID: withWaitAndCapacity(waitTime, maxBatch, dbf.AgencyPlacesByAgencyID), - CalendarDatesByServiceID: withWaitAndCapacity(waitTime, maxBatch, dbf.CalendarDatesByServiceID), - CalendarsByID: withWaitAndCapacity(waitTime, maxBatch, dbf.CalendarsByID), - CensusGeographiesByEntityID: withWaitAndCapacity(waitTime, maxBatch, dbf.CensusGeographiesByEntityID), - CensusTableByID: withWaitAndCapacity(waitTime, maxBatch, dbf.CensusTableByID), - CensusFieldsByTableID: withWaitAndCapacity(waitTime, maxBatch, dbf.CensusFieldsByTableID), - CensusValuesByGeographyID: withWaitAndCapacity(waitTime, maxBatch, dbf.CensusValuesByGeographyID), - FeedFetchesByFeedID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedFetchesByFeedID), - FeedInfosByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedInfosByFeedVersionID), - FeedsByID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedsByID), - FeedsByOperatorOnestopID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedsByOperatorOnestopID), - FeedStatesByFeedID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedStatesByFeedID), - FeedVersionFileInfosByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedVersionFileInfosByFeedVersionID), - FeedVersionGeometryByID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedVersionGeometryByID), - FeedVersionGtfsImportByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedVersionGtfsImportByFeedVersionID), FeedVersionServiceWindowByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedVersionServiceWindowByFeedVersionID), - FeedVersionsByFeedID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedVersionsByFeedID), - FeedVersionsByID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedVersionsByID), - FeedVersionServiceLevelsByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedVersionServiceLevelsByFeedVersionID), - FrequenciesByTripID: withWaitAndCapacity(waitTime, maxBatch, dbf.FrequenciesByTripID), - LevelsByID: withWaitAndCapacity(waitTime, maxBatch, dbf.LevelsByID), - LevelsByParentStationID: withWaitAndCapacity(waitTime, maxBatch, dbf.LevelsByParentStationID), - OperatorsByAgencyID: withWaitAndCapacity(waitTime, maxBatch, dbf.OperatorsByAgencyID), - OperatorsByCOIF: withWaitAndCapacity(waitTime, maxBatch, dbf.OperatorsByCOIF), - OperatorsByFeedID: withWaitAndCapacity(waitTime, maxBatch, dbf.OperatorsByFeedID), - PathwaysByFromStopID: withWaitAndCapacity(waitTime, maxBatch, dbf.PathwaysByFromStopID), - PathwaysByID: withWaitAndCapacity(waitTime, maxBatch, dbf.PathwaysByID), - PathwaysByToStopID: withWaitAndCapacity(waitTime, maxBatch, dbf.PathwaysByToStopID), - RouteAttributesByRouteID: withWaitAndCapacity(waitTime, maxBatch, dbf.RouteAttributesByRouteID), - RouteGeometriesByRouteID: withWaitAndCapacity(waitTime, maxBatch, dbf.RouteGeometriesByRouteID), - RouteHeadwaysByRouteID: withWaitAndCapacity(waitTime, maxBatch, dbf.RouteHeadwaysByRouteID), - RoutesByAgencyID: withWaitAndCapacity(waitTime, maxBatch, dbf.RoutesByAgencyID), - RoutesByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.RoutesByFeedVersionID), - RoutesByID: withWaitAndCapacity(waitTime, maxBatch, dbf.RoutesByID), - RouteStopPatternsByRouteID: withWaitAndCapacity(waitTime, maxBatch, dbf.RouteStopPatternsByRouteID), - RouteStopsByRouteID: withWaitAndCapacity(waitTime, maxBatch, dbf.RouteStopsByRouteID), - RouteStopsByStopID: withWaitAndCapacity(waitTime, maxBatch, dbf.RouteStopsByStopID), - SegmentPatternsByRouteID: withWaitAndCapacity(waitTime, maxBatch, dbf.SegmentPatternsByRouteID), - SegmentPatternsBySegmentID: withWaitAndCapacity(waitTime, maxBatch, dbf.SegmentPatternsBySegmentID), - SegmentsByID: withWaitAndCapacity(waitTime, maxBatch, dbf.SegmentsByID), - SegmentsByRouteID: withWaitAndCapacity(waitTime, maxBatch, dbf.SegmentsByRouteID), - SegmentsByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.SegmentsByFeedVersionID), - ShapesByID: withWaitAndCapacity(waitTime, maxBatch, dbf.ShapesByID), - StopExternalReferencesByStopID: withWaitAndCapacity(waitTime, maxBatch, dbf.StopExternalReferencesByStopID), - StopObservationsByStopID: withWaitAndCapacity(waitTime, maxBatch, dbf.StopObservationsByStopID), - StopPlacesByStopID: withWaitAndCapacity(waitTime, maxBatch, dbf.StopPlacesByStopID), - StopsByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.StopsByFeedVersionID), - StopsByID: withWaitAndCapacity(waitTime, maxBatch, dbf.StopsByID), - StopsByLevelID: withWaitAndCapacity(waitTime, maxBatch, dbf.StopsByLevelID), - StopsByParentStopID: withWaitAndCapacity(waitTime, maxBatch, dbf.StopsByParentStopID), - StopsByRouteID: withWaitAndCapacity(waitTime, maxBatch, dbf.StopsByRouteID), - StopTimesByStopID: withWaitAndCapacity(waitTime, maxBatch, dbf.StopTimesByStopID), - StopTimesByTripID: withWaitAndCapacity(waitTime, maxBatch, dbf.StopTimesByTripID), - TargetStopsByStopID: withWaitAndCapacity(waitTime, maxBatch, dbf.TargetStopsByStopID), - TripsByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.TripsByFeedVersionID), - TripsByID: withWaitAndCapacity(waitTime, maxBatch, dbf.TripsByID), - TripsByRouteID: withWaitAndCapacity(waitTime, maxBatch, dbf.TripsByRouteID), - ValidationReportErrorExemplarsByValidationReportErrorGroupID: withWaitAndCapacity(waitTime, maxBatch, dbf.ValidationReportErrorExemplarsByValidationReportErrorGroupID), - ValidationReportErrorGroupsByValidationReportID: withWaitAndCapacity(waitTime, maxBatch, dbf.ValidationReportErrorGroupsByValidationReportID), - ValidationReportsByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.ValidationReportsByFeedVersionID), + AgenciesByFeedVersionID: withWaitAndCapacity(waitTime, batchSize, dbf.AgenciesByFeedVersionID), + AgenciesByID: withWaitAndCapacity(waitTime, batchSize, dbf.AgenciesByID), + AgenciesByOnestopID: withWaitAndCapacity(waitTime, batchSize, dbf.AgenciesByOnestopID), + AgencyPlacesByAgencyID: withWaitAndCapacity(waitTime, batchSize, dbf.AgencyPlacesByAgencyID), + CalendarDatesByServiceID: withWaitAndCapacity(waitTime, batchSize, dbf.CalendarDatesByServiceID), + CalendarsByID: withWaitAndCapacity(waitTime, batchSize, dbf.CalendarsByID), + CensusGeographiesByEntityID: withWaitAndCapacity(waitTime, batchSize, dbf.CensusGeographiesByEntityID), + CensusTableByID: withWaitAndCapacity(waitTime, batchSize, dbf.CensusTableByID), + CensusFieldsByTableID: withWaitAndCapacity(waitTime, batchSize, dbf.CensusFieldsByTableID), + CensusValuesByGeographyID: withWaitAndCapacity(waitTime, batchSize, dbf.CensusValuesByGeographyID), + FeedFetchesByFeedID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedFetchesByFeedID), + FeedInfosByFeedVersionID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedInfosByFeedVersionID), + FeedsByID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedsByID), + FeedsByOperatorOnestopID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedsByOperatorOnestopID), + FeedStatesByFeedID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedStatesByFeedID), + FeedVersionFileInfosByFeedVersionID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedVersionFileInfosByFeedVersionID), + FeedVersionGeometryByID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedVersionGeometryByID), + FeedVersionGtfsImportByFeedVersionID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedVersionGtfsImportByFeedVersionID), FeedVersionServiceWindowByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedVersionServiceWindowByFeedVersionID), + FeedVersionsByFeedID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedVersionsByFeedID), + FeedVersionsByID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedVersionsByID), + FeedVersionServiceLevelsByFeedVersionID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedVersionServiceLevelsByFeedVersionID), + FrequenciesByTripID: withWaitAndCapacity(waitTime, batchSize, dbf.FrequenciesByTripID), + LevelsByID: withWaitAndCapacity(waitTime, batchSize, dbf.LevelsByID), + LevelsByParentStationID: withWaitAndCapacity(waitTime, batchSize, dbf.LevelsByParentStationID), + OperatorsByAgencyID: withWaitAndCapacity(waitTime, batchSize, dbf.OperatorsByAgencyID), + OperatorsByCOIF: withWaitAndCapacity(waitTime, batchSize, dbf.OperatorsByCOIF), + OperatorsByFeedID: withWaitAndCapacity(waitTime, batchSize, dbf.OperatorsByFeedID), + PathwaysByFromStopID: withWaitAndCapacity(waitTime, batchSize, dbf.PathwaysByFromStopID), + PathwaysByID: withWaitAndCapacity(waitTime, batchSize, dbf.PathwaysByID), + PathwaysByToStopID: withWaitAndCapacity(waitTime, batchSize, dbf.PathwaysByToStopID), + RouteAttributesByRouteID: withWaitAndCapacity(waitTime, batchSize, dbf.RouteAttributesByRouteID), + RouteGeometriesByRouteID: withWaitAndCapacity(waitTime, batchSize, dbf.RouteGeometriesByRouteID), + RouteHeadwaysByRouteID: withWaitAndCapacity(waitTime, batchSize, dbf.RouteHeadwaysByRouteID), + RoutesByAgencyID: withWaitAndCapacity(waitTime, batchSize, dbf.RoutesByAgencyID), + RoutesByFeedVersionID: withWaitAndCapacity(waitTime, batchSize, dbf.RoutesByFeedVersionID), + RoutesByID: withWaitAndCapacity(waitTime, batchSize, dbf.RoutesByID), + RouteStopPatternsByRouteID: withWaitAndCapacity(waitTime, batchSize, dbf.RouteStopPatternsByRouteID), + RouteStopsByRouteID: withWaitAndCapacity(waitTime, batchSize, dbf.RouteStopsByRouteID), + RouteStopsByStopID: withWaitAndCapacity(waitTime, batchSize, dbf.RouteStopsByStopID), + SegmentPatternsByRouteID: withWaitAndCapacity(waitTime, batchSize, dbf.SegmentPatternsByRouteID), + SegmentPatternsBySegmentID: withWaitAndCapacity(waitTime, batchSize, dbf.SegmentPatternsBySegmentID), + SegmentsByID: withWaitAndCapacity(waitTime, batchSize, dbf.SegmentsByID), + SegmentsByRouteID: withWaitAndCapacity(waitTime, batchSize, dbf.SegmentsByRouteID), + SegmentsByFeedVersionID: withWaitAndCapacity(waitTime, batchSize, dbf.SegmentsByFeedVersionID), + ShapesByID: withWaitAndCapacity(waitTime, batchSize, dbf.ShapesByID), + StopExternalReferencesByStopID: withWaitAndCapacity(waitTime, batchSize, dbf.StopExternalReferencesByStopID), + StopObservationsByStopID: withWaitAndCapacity(waitTime, batchSize, dbf.StopObservationsByStopID), + StopPlacesByStopID: withWaitAndCapacity(waitTime, batchSize, dbf.StopPlacesByStopID), + StopsByFeedVersionID: withWaitAndCapacity(waitTime, batchSize, dbf.StopsByFeedVersionID), + StopsByID: withWaitAndCapacity(waitTime, batchSize, dbf.StopsByID), + StopsByLevelID: withWaitAndCapacity(waitTime, batchSize, dbf.StopsByLevelID), + StopsByParentStopID: withWaitAndCapacity(waitTime, batchSize, dbf.StopsByParentStopID), + StopsByRouteID: withWaitAndCapacity(waitTime, batchSize, dbf.StopsByRouteID), + StopTimesByStopID: withWaitAndCapacity(waitTime, stopTimeBatchSize, dbf.StopTimesByStopID), + StopTimesByTripID: withWaitAndCapacity(waitTime, batchSize, dbf.StopTimesByTripID), + TargetStopsByStopID: withWaitAndCapacity(waitTime, batchSize, dbf.TargetStopsByStopID), + TripsByFeedVersionID: withWaitAndCapacity(waitTime, batchSize, dbf.TripsByFeedVersionID), + TripsByID: withWaitAndCapacity(waitTime, batchSize, dbf.TripsByID), + TripsByRouteID: withWaitAndCapacity(waitTime, batchSize, dbf.TripsByRouteID), + ValidationReportErrorExemplarsByValidationReportErrorGroupID: withWaitAndCapacity(waitTime, batchSize, dbf.ValidationReportErrorExemplarsByValidationReportErrorGroupID), + ValidationReportErrorGroupsByValidationReportID: withWaitAndCapacity(waitTime, batchSize, dbf.ValidationReportErrorGroupsByValidationReportID), + ValidationReportsByFeedVersionID: withWaitAndCapacity(waitTime, batchSize, dbf.ValidationReportsByFeedVersionID), } return loaders } @@ -162,7 +168,7 @@ func loaderMiddleware(next http.Handler) http.Handler { // Is this OK to use as a long term cache? ctx := r.Context() cfg := model.ForContext(ctx) - loaders := NewLoaders(cfg.Finder) + loaders := NewLoaders(cfg.Finder, cfg.LoaderBatchSize, cfg.LoaderStopTimeBatchSize) nextCtx := context.WithValue(ctx, loadersKey, loaders) r = r.WithContext(nextCtx) next.ServeHTTP(w, r)