Skip to content

Commit

Permalink
Utilizing all v2 api calls for quesma creation
Browse files Browse the repository at this point in the history
  • Loading branch information
pdelewski committed Dec 20, 2024
1 parent c1a9b05 commit d6d10c8
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 23 deletions.
2 changes: 1 addition & 1 deletion quesma/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func constructQuesma(cfg *config.QuesmaConfiguration, sl clickhouse.TableDiscove
if cfg.TransparentProxy {
return quesma.NewQuesmaTcpProxy(cfg, quesmaManagementConsole, logChan, false)
} else {
const quesma_v2 = false
const quesma_v2 = true
return quesma.NewHttpProxy(phoneHomeAgent, lm, ip, sl, im, schemaRegistry, cfg, quesmaManagementConsole, abResultsrepository, indexRegistry, quesma_v2)
}
}
33 changes: 11 additions & 22 deletions quesma/quesma/dual_write_proxy_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package quesma

import (
"context"
"errors"
"net/http"
"quesma/ab_testing"
"quesma/clickhouse"
Expand Down Expand Up @@ -50,7 +49,7 @@ func (c *simultaneousClientsLimiterV2) ServeHTTP(w http.ResponseWriter, r *http.
}

type dualWriteHttpProxyV2 struct {
routingHttpServer *http.Server
quesmaV2 quesma_api.QuesmaBuilder
indexManagement elasticsearch.IndexManagement
logManager *clickhouse.LogManager
publicPort util.Port
Expand Down Expand Up @@ -91,33 +90,30 @@ func newDualWriteProxyV2(dependencies quesma_api.Dependencies, schemaLoader clic

queryPipeline := quesma_api.NewPipeline()
queryPipeline.AddFrontendConnector(elasticHttpQueryFrontendConnector)
quesmaBuilder.AddPipeline(ingestPipeline)
// TODO the order of pipelines is important
// that's due current bug/limitation in quesma.Build() method
// listener in the case of the same tcp port has to be shared
quesmaBuilder.AddPipeline(queryPipeline)
quesmaBuilder.AddPipeline(ingestPipeline)

_, err := quesmaBuilder.Build()
quesmaV2, err := quesmaBuilder.Build()
if err != nil {
logger.Fatal().Msgf("Error building Quesma: %v", err)
}
var limitedHandler http.Handler
if config.DisableAuth {
elasticHttpIngestFrontendConnector.AddMiddleware(newSimultaneousClientsLimiterV2(concurrentClientsLimitV2))
elasticHttpQueryFrontendConnector.AddMiddleware(newSimultaneousClientsLimiterV2(concurrentClientsLimitV2))
limitedHandler = elasticHttpIngestFrontendConnector
} else {
elasticHttpQueryFrontendConnector.AddMiddleware(newSimultaneousClientsLimiterV2(concurrentClientsLimitV2))
elasticHttpQueryFrontendConnector.AddMiddleware(NewAuthMiddlewareV2(config.Elasticsearch))
elasticHttpIngestFrontendConnector.AddMiddleware(newSimultaneousClientsLimiterV2(concurrentClientsLimitV2))
elasticHttpIngestFrontendConnector.AddMiddleware(NewAuthMiddlewareV2(config.Elasticsearch))
limitedHandler = elasticHttpIngestFrontendConnector
}

return &dualWriteHttpProxyV2{
schemaRegistry: registry,
schemaLoader: schemaLoader,
routingHttpServer: &http.Server{
Addr: ":" + strconv.Itoa(int(config.PublicTcpPort)),
Handler: limitedHandler,
},
schemaRegistry: registry,
schemaLoader: schemaLoader,
quesmaV2: quesmaV2,
indexManagement: indexManager,
logManager: logManager,
publicPort: config.PublicTcpPort,
Expand All @@ -139,20 +135,13 @@ func (q *dualWriteHttpProxyV2) Close(ctx context.Context) {
if q.asyncQueriesEvictor != nil {
q.asyncQueriesEvictor.Close()
}
if err := q.routingHttpServer.Shutdown(ctx); err != nil {
logger.Fatal().Msgf("Error during server shutdown: %v", err)
}
q.quesmaV2.Stop(ctx)
}

func (q *dualWriteHttpProxyV2) Ingest() {
q.schemaLoader.ReloadTableDefinitions()
q.logManager.Start()
q.indexManagement.Start()
go q.asyncQueriesEvictor.AsyncQueriesGC()
go func() {
if err := q.routingHttpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Fatal().Msgf("Error starting http server: %v", err)
}
logger.Info().Msgf("Accepting HTTP at :%d", q.publicPort)
}()
q.quesmaV2.Start()
}

0 comments on commit d6d10c8

Please sign in to comment.