From d6d10c8a911bacd41dd78a494ccebff16be9541c Mon Sep 17 00:00:00 2001 From: Przemek Delewski Date: Fri, 20 Dec 2024 18:32:14 +0100 Subject: [PATCH] Utilizing all v2 api calls for quesma creation --- quesma/main.go | 2 +- quesma/quesma/dual_write_proxy_v2.go | 33 ++++++++++------------------ 2 files changed, 12 insertions(+), 23 deletions(-) diff --git a/quesma/main.go b/quesma/main.go index a42ab85fc..66f88508b 100644 --- a/quesma/main.go +++ b/quesma/main.go @@ -209,7 +209,7 @@ func constructQuesma(cfg *config.QuesmaConfiguration, sl clickhouse.TableDiscove if cfg.TransparentProxy { return quesma.NewQuesmaTcpProxy(cfg, quesmaManagementConsole, logChan, false) } else { - const quesma_v2 = false + const quesma_v2 = true return quesma.NewHttpProxy(phoneHomeAgent, lm, ip, sl, im, schemaRegistry, cfg, quesmaManagementConsole, abResultsrepository, indexRegistry, quesma_v2) } } diff --git a/quesma/quesma/dual_write_proxy_v2.go b/quesma/quesma/dual_write_proxy_v2.go index 32594025a..547660f13 100644 --- a/quesma/quesma/dual_write_proxy_v2.go +++ b/quesma/quesma/dual_write_proxy_v2.go @@ -4,7 +4,6 @@ package quesma import ( "context" - "errors" "net/http" "quesma/ab_testing" "quesma/clickhouse" @@ -50,7 +49,7 @@ func (c *simultaneousClientsLimiterV2) ServeHTTP(w http.ResponseWriter, r *http. } type dualWriteHttpProxyV2 struct { - routingHttpServer *http.Server + quesmaV2 quesma_api.QuesmaBuilder indexManagement elasticsearch.IndexManagement logManager *clickhouse.LogManager publicPort util.Port @@ -91,33 +90,30 @@ func newDualWriteProxyV2(dependencies quesma_api.Dependencies, schemaLoader clic queryPipeline := quesma_api.NewPipeline() queryPipeline.AddFrontendConnector(elasticHttpQueryFrontendConnector) - quesmaBuilder.AddPipeline(ingestPipeline) + // TODO the order of pipelines is important + // that's due current bug/limitation in quesma.Build() method + // listener in the case of the same tcp port has to be shared quesmaBuilder.AddPipeline(queryPipeline) + quesmaBuilder.AddPipeline(ingestPipeline) - _, err := quesmaBuilder.Build() + quesmaV2, err := quesmaBuilder.Build() if err != nil { logger.Fatal().Msgf("Error building Quesma: %v", err) } - var limitedHandler http.Handler if config.DisableAuth { elasticHttpIngestFrontendConnector.AddMiddleware(newSimultaneousClientsLimiterV2(concurrentClientsLimitV2)) elasticHttpQueryFrontendConnector.AddMiddleware(newSimultaneousClientsLimiterV2(concurrentClientsLimitV2)) - limitedHandler = elasticHttpIngestFrontendConnector } else { elasticHttpQueryFrontendConnector.AddMiddleware(newSimultaneousClientsLimiterV2(concurrentClientsLimitV2)) elasticHttpQueryFrontendConnector.AddMiddleware(NewAuthMiddlewareV2(config.Elasticsearch)) elasticHttpIngestFrontendConnector.AddMiddleware(newSimultaneousClientsLimiterV2(concurrentClientsLimitV2)) elasticHttpIngestFrontendConnector.AddMiddleware(NewAuthMiddlewareV2(config.Elasticsearch)) - limitedHandler = elasticHttpIngestFrontendConnector } return &dualWriteHttpProxyV2{ - schemaRegistry: registry, - schemaLoader: schemaLoader, - routingHttpServer: &http.Server{ - Addr: ":" + strconv.Itoa(int(config.PublicTcpPort)), - Handler: limitedHandler, - }, + schemaRegistry: registry, + schemaLoader: schemaLoader, + quesmaV2: quesmaV2, indexManagement: indexManager, logManager: logManager, publicPort: config.PublicTcpPort, @@ -139,9 +135,7 @@ func (q *dualWriteHttpProxyV2) Close(ctx context.Context) { if q.asyncQueriesEvictor != nil { q.asyncQueriesEvictor.Close() } - if err := q.routingHttpServer.Shutdown(ctx); err != nil { - logger.Fatal().Msgf("Error during server shutdown: %v", err) - } + q.quesmaV2.Stop(ctx) } func (q *dualWriteHttpProxyV2) Ingest() { @@ -149,10 +143,5 @@ func (q *dualWriteHttpProxyV2) Ingest() { q.logManager.Start() q.indexManagement.Start() go q.asyncQueriesEvictor.AsyncQueriesGC() - go func() { - if err := q.routingHttpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { - logger.Fatal().Msgf("Error starting http server: %v", err) - } - logger.Info().Msgf("Accepting HTTP at :%d", q.publicPort) - }() + q.quesmaV2.Start() }