From 3dcec62cc847ef7835a305a3d8802d775f75d5a8 Mon Sep 17 00:00:00 2001 From: "Charel Baum (external expert on behalf of DB Netz AG)" Date: Wed, 4 Oct 2023 16:29:28 +0200 Subject: [PATCH] fix(kafka/cluster): fill connectionDetails with bootstrap brokers Signed-off-by: Charel Baum (external expert on behalf of DB Netz AG) --- pkg/controller/kafka/cluster/setup.go | 43 ++++++++++++++++++--------- 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/pkg/controller/kafka/cluster/setup.go b/pkg/controller/kafka/cluster/setup.go index d5bbd8feca..2e9924e0e2 100644 --- a/pkg/controller/kafka/cluster/setup.go +++ b/pkg/controller/kafka/cluster/setup.go @@ -40,6 +40,7 @@ import ( ) const ( + errGetBootstrapBrokers = "cannot get BootstrapBrokers of Cluster in AWS" errUpdateBrokerType = "cannot update BrokerType of Cluster in AWS" errUpdateBrokerStorage = "cannot update BrokerStorage of Cluster in AWS" errUpdateBrokerCount = "cannot update BrokerCount of Cluster in AWS" @@ -57,14 +58,14 @@ func SetupCluster(mgr ctrl.Manager, o controller.Options) error { name := managed.ControllerName(svcapitypes.ClusterGroupKind) opts := []option{ func(e *external) { + u := &updater{client: e.client} e.preObserve = preObserve - e.postObserve = postObserve + e.postObserve = u.postObserve e.preDelete = preDelete e.postDelete = postDelete e.preCreate = preCreate e.postCreate = postCreate e.lateInitialize = LateInitialize - u := &updater{client: e.client} e.update = u.update e.isUpToDate = isUpToDate }, @@ -121,13 +122,37 @@ func preObserve(_ context.Context, cr *svcapitypes.Cluster, obj *svcsdk.Describe return nil } -func postObserve(_ context.Context, cr *svcapitypes.Cluster, obj *svcsdk.DescribeClusterOutput, obs managed.ExternalObservation, err error) (managed.ExternalObservation, error) { +func (u *updater) postObserve(ctx context.Context, cr *svcapitypes.Cluster, obj *svcsdk.DescribeClusterOutput, obs managed.ExternalObservation, err error) (managed.ExternalObservation, error) { if err != nil { return managed.ExternalObservation{}, err } + + obs.ConnectionDetails = managed.ConnectionDetails{ + // see: https://docs.aws.amazon.com/msk/latest/developerguide/client-access.html + // no endpoint informations available in DescribeClusterOutput only endpoints for zookeeperPlain/Tls + "zookeeperEndpointPlain": []byte(awsclients.StringValue(obj.ClusterInfo.ZookeeperConnectString)), + "zookeeperEndpointTls": []byte(awsclients.StringValue(obj.ClusterInfo.ZookeeperConnectStringTls)), + } + switch awsclients.StringValue(obj.ClusterInfo.State) { case string(svcapitypes.ClusterState_ACTIVE): cr.SetConditions(xpv1.Available()) + + // see: https://docs.aws.amazon.com/msk/latest/developerguide/msk-get-bootstrap-brokers.html + // retrieve cluster bootstrap brokers (endpoints) + // not possible in every cluster state (e.g. "You can't get bootstrap broker nodes for a cluster in DELETING state.") + endpoints, err := u.client.GetBootstrapBrokersWithContext(ctx, &svcsdk.GetBootstrapBrokersInput{ClusterArn: awsclients.String(meta.GetExternalName(cr))}) + if err != nil { + return obs, awsclients.Wrap(err, errGetBootstrapBrokers) + } + obs.ConnectionDetails["clusterEndpointPlain"] = []byte(awsclients.StringValue(endpoints.BootstrapBrokerString)) + obs.ConnectionDetails["clusterEndpointTls"] = []byte(awsclients.StringValue(endpoints.BootstrapBrokerStringTls)) + obs.ConnectionDetails["clusterEndpointIAM"] = []byte(awsclients.StringValue(endpoints.BootstrapBrokerStringSaslIam)) + obs.ConnectionDetails["clusterEndpointScram"] = []byte(awsclients.StringValue(endpoints.BootstrapBrokerStringSaslScram)) + obs.ConnectionDetails["clusterEndpointTlsPublic"] = []byte(awsclients.StringValue(endpoints.BootstrapBrokerStringPublicTls)) + obs.ConnectionDetails["clusterEndpointIAMPublic"] = []byte(awsclients.StringValue(endpoints.BootstrapBrokerStringPublicSaslIam)) + obs.ConnectionDetails["clusterEndpointScramPublic"] = []byte(awsclients.StringValue(endpoints.BootstrapBrokerStringPublicSaslScram)) + case string(svcapitypes.ClusterState_CREATING): cr.SetConditions(xpv1.Creating()) case string(svcapitypes.ClusterState_FAILED), string(svcapitypes.ClusterState_MAINTENANCE), string(svcapitypes.ClusterState_UPDATING): @@ -136,16 +161,6 @@ func postObserve(_ context.Context, cr *svcapitypes.Cluster, obj *svcsdk.Describ cr.SetConditions(xpv1.Deleting()) } - obs.ConnectionDetails = managed.ConnectionDetails{ - // see: https://docs.aws.amazon.com/msk/latest/developerguide/client-access.html - // no endpoint informations available in DescribeClusterOutput only endpoints for zookeeperPlain/Tls - "zookeeperEndpointPlain": []byte(awsclients.StringValue(obj.ClusterInfo.ZookeeperConnectString)), - "zookeeperEndpointTls": []byte(awsclients.StringValue(obj.ClusterInfo.ZookeeperConnectStringTls)), - "clusterEndpointPlain": []byte(strings.ReplaceAll(awsclients.StringValue(obj.ClusterInfo.ZookeeperConnectString), "2181", "9092")), - "clusterEndpointTls": []byte(strings.ReplaceAll(awsclients.StringValue(obj.ClusterInfo.ZookeeperConnectString), "2181", "9094")), - "clusterEndpointIAM": []byte(strings.ReplaceAll(awsclients.StringValue(obj.ClusterInfo.ZookeeperConnectString), "2181", "9098")), - } - return obs, nil } @@ -860,7 +875,7 @@ func generateClientAuthentication(wanted *svcapitypes.ClientAuthentication) *svc if wanted.SASL.IAM != nil { iam := &svcsdk.Iam{} if wanted.SASL.IAM.Enabled != nil { - iam.Enabled = wanted.TLS.Enabled + iam.Enabled = wanted.SASL.IAM.Enabled } sasl.Iam = iam }