Skip to content

Commit

Permalink
Merge pull request #21 from stolostron/upstream
Browse files Browse the repository at this point in the history
merge upstream - using two kafka topics
  • Loading branch information
skeeey authored Dec 24, 2024
2 parents e56129a + a7c48d5 commit b5c9ed3
Show file tree
Hide file tree
Showing 37 changed files with 990 additions and 394 deletions.
22 changes: 6 additions & 16 deletions Containerfile.rhtap
Original file line number Diff line number Diff line change
@@ -1,28 +1,18 @@
FROM brew.registry.redhat.io/rh-osbs/openshift-golang-builder:rhel_8_1.22 AS builder
FROM brew.registry.redhat.io/rh-osbs/openshift-golang-builder:rhel_9_1.22 AS builder

ENV SOURCE_DIR=/maestro
WORKDIR $SOURCE_DIR
COPY . $SOURCE_DIR

ENV GOFLAGS=""
RUN make binary
RUN pwd
ENV GOEXPERIMENT=strictfipsruntime
ENV CGO_ENABLED=1
RUN make binary BUILD_OPTS="-tags strictfipsruntime"

FROM registry.access.redhat.com/ubi9/ubi-minimal:latest

RUN \
microdnf update -y \
&& \
microdnf install -y util-linux \
&& \
microdnf clean all

COPY --from=builder \
/maestro/maestro \
/usr/local/bin/

RUN microdnf update -y && microdnf install -y util-linux && microdnf clean all
COPY --from=builder /maestro/maestro /usr/local/bin/
EXPOSE 8000

ENTRYPOINT ["/usr/local/bin/maestro", "server"]

LABEL name="maestro" \
Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ lint:
# Build binaries
# NOTE it may be necessary to use CGO_ENABLED=0 for backwards compatibility with centos7 if not using centos7
binary: check-gopath
${GO} build -tags=kafka ./cmd/maestro
${GO} mod vendor
${GO} build $(BUILD_OPTS) ./cmd/maestro
.PHONY: binary

# Install
Expand Down
33 changes: 24 additions & 9 deletions cmd/maestro/servecmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (

"github.com/openshift-online/maestro/cmd/maestro/environments"
"github.com/openshift-online/maestro/cmd/maestro/server"
"github.com/openshift-online/maestro/pkg/config"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/pkg/dispatcher"
"github.com/openshift-online/maestro/pkg/event"
)

Expand All @@ -35,24 +38,40 @@ func runServer(cmd *cobra.Command, args []string) {
klog.Fatalf("Unable to initialize environment: %s", err.Error())
}

healthcheckServer := server.NewHealthCheckServer()

// Create event broadcaster to broadcast resource status update events to subscribers
eventBroadcaster := event.NewEventBroadcaster()

// Create the event server based on the message broker type:
// For gRPC, create a gRPC broker to handle resource spec and status events.
// For MQTT, create a Pulse server to handle resource spec and status events.
// For MQTT/Kafka, create a message queue based event server to handle resource spec and status events.
var eventServer server.EventServer
if environments.Environment().Config.MessageBroker.MessageBrokerType == "grpc" {
klog.Info("Setting up grpc broker")
eventServer = server.NewGRPCBroker(eventBroadcaster)
} else {
klog.Info("Setting up pulse server")
eventServer = server.NewPulseServer(eventBroadcaster)
klog.Info("Setting up message queue event server")
var statusDispatcher dispatcher.Dispatcher
subscriptionType := environments.Environment().Config.EventServer.SubscriptionType
switch config.SubscriptionType(subscriptionType) {
case config.SharedSubscriptionType:
statusDispatcher = dispatcher.NewNoopDispatcher(dao.NewConsumerDao(&environments.Environment().Database.SessionFactory), environments.Environment().Clients.CloudEventsSource)
case config.BroadcastSubscriptionType:
statusDispatcher = dispatcher.NewHashDispatcher(environments.Environment().Config.MessageBroker.ClientID, dao.NewInstanceDao(&environments.Environment().Database.SessionFactory),
dao.NewConsumerDao(&environments.Environment().Database.SessionFactory), environments.Environment().Clients.CloudEventsSource, environments.Environment().Config.EventServer.ConsistentHashConfig)
default:
klog.Errorf("Unsupported subscription type: %s", subscriptionType)
}

// Set the status dispatcher for the healthcheck server
healthcheckServer.SetStatusDispatcher(statusDispatcher)
eventServer = server.NewMessageQueueEventServer(eventBroadcaster, statusDispatcher)
}

// Create the servers
apiserver := server.NewAPIServer(eventBroadcaster)
metricsServer := server.NewMetricsServer()
healthcheckServer := server.NewHealthCheckServer()
controllersServer := server.NewControllersServer(eventServer)

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -70,10 +89,6 @@ func runServer(cmd *cobra.Command, args []string) {
if err := metricsServer.Stop(); err != nil {
klog.Errorf("Failed to stop metrics server, %v", err)
}

if err := healthcheckServer.Stop(); err != nil {
klog.Errorf("Failed to stop healthcheck server, %v", err)
}
}()

// Start the event broadcaster
Expand All @@ -82,7 +97,7 @@ func runServer(cmd *cobra.Command, args []string) {
// Run the servers
go apiserver.Start()
go metricsServer.Start()
go healthcheckServer.Start()
go healthcheckServer.Start(ctx)
go eventServer.Start(ctx)
go controllersServer.Start(ctx)

Expand Down
3 changes: 3 additions & 0 deletions cmd/maestro/server/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/openshift-online/maestro/pkg/api"
"github.com/openshift-online/maestro/pkg/controllers"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/pkg/db"

"github.com/openshift-online/maestro/pkg/logger"
Expand All @@ -18,6 +19,8 @@ func NewControllersServer(eventServer EventServer) *ControllersServer {
),
StatusController: controllers.NewStatusController(
env().Services.StatusEvents(),
dao.NewInstanceDao(&env().Database.SessionFactory),
dao.NewEventInstanceDao(&env().Database.SessionFactory),
),
}

Expand Down
Loading

0 comments on commit b5c9ed3

Please sign in to comment.