Skip to content

Commit

Permalink
fix(kafka/cluster): fill connectionDetails with bootstrap brokers
Browse files Browse the repository at this point in the history
  • Loading branch information
Charel Baum (external expert on behalf of DB Netz AG) committed Oct 4, 2023
1 parent c7b3aab commit d5bc114
Showing 1 changed file with 29 additions and 14 deletions.
43 changes: 29 additions & 14 deletions pkg/controller/kafka/cluster/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
)

const (
errBootstrapBrokers = "cannot get BootstrapBrokers of Cluster in AWS"
errUpdateBrokerType = "cannot update BrokerType of Cluster in AWS"
errUpdateBrokerStorage = "cannot update BrokerStorage of Cluster in AWS"
errUpdateBrokerCount = "cannot update BrokerCount of Cluster in AWS"
Expand All @@ -57,14 +58,14 @@ func SetupCluster(mgr ctrl.Manager, o controller.Options) error {
name := managed.ControllerName(svcapitypes.ClusterGroupKind)
opts := []option{
func(e *external) {
u := &updater{client: e.client}
e.preObserve = preObserve
e.postObserve = postObserve
e.postObserve = u.postObserve
e.preDelete = preDelete
e.postDelete = postDelete
e.preCreate = preCreate
e.postCreate = postCreate
e.lateInitialize = LateInitialize
u := &updater{client: e.client}
e.update = u.update
e.isUpToDate = isUpToDate
},
Expand Down Expand Up @@ -121,13 +122,37 @@ func preObserve(_ context.Context, cr *svcapitypes.Cluster, obj *svcsdk.Describe
return nil
}

func postObserve(_ context.Context, cr *svcapitypes.Cluster, obj *svcsdk.DescribeClusterOutput, obs managed.ExternalObservation, err error) (managed.ExternalObservation, error) {
func (u *updater) postObserve(ctx context.Context, cr *svcapitypes.Cluster, obj *svcsdk.DescribeClusterOutput, obs managed.ExternalObservation, err error) (managed.ExternalObservation, error) {
if err != nil {
return managed.ExternalObservation{}, err
}

obs.ConnectionDetails = managed.ConnectionDetails{
// see: https://docs.aws.amazon.com/msk/latest/developerguide/client-access.html
// no endpoint informations available in DescribeClusterOutput only endpoints for zookeeperPlain/Tls
"zookeeperEndpointPlain": []byte(awsclients.StringValue(obj.ClusterInfo.ZookeeperConnectString)),
"zookeeperEndpointTls": []byte(awsclients.StringValue(obj.ClusterInfo.ZookeeperConnectStringTls)),
}

switch awsclients.StringValue(obj.ClusterInfo.State) {
case string(svcapitypes.ClusterState_ACTIVE):
cr.SetConditions(xpv1.Available())

// see: https://docs.aws.amazon.com/msk/latest/developerguide/msk-get-bootstrap-brokers.html
// retrieve cluster bootstrap brokers (endpoints)
// not possible in every cluster state (e.g. "You can't get bootstrap broker nodes for a cluster in DELETING state.")
endpoints, err := u.client.GetBootstrapBrokersWithContext(ctx, &svcsdk.GetBootstrapBrokersInput{ClusterArn: awsclients.String(meta.GetExternalName(cr))})
if err != nil {
return obs, awsclients.Wrap(err, errBootstrapBrokers)
}
obs.ConnectionDetails["clusterEndpointPlain"] = []byte(awsclients.StringValue(endpoints.BootstrapBrokerString))
obs.ConnectionDetails["clusterEndpointTls"] = []byte(awsclients.StringValue(endpoints.BootstrapBrokerStringTls))
obs.ConnectionDetails["clusterEndpointIAM"] = []byte(awsclients.StringValue(endpoints.BootstrapBrokerStringSaslIam))
obs.ConnectionDetails["clusterEndpointScram"] = []byte(awsclients.StringValue(endpoints.BootstrapBrokerStringSaslScram))
obs.ConnectionDetails["clusterEndpointTlsPublic"] = []byte(awsclients.StringValue(endpoints.BootstrapBrokerStringPublicTls))
obs.ConnectionDetails["clusterEndpointIAMPublic"] = []byte(awsclients.StringValue(endpoints.BootstrapBrokerStringPublicSaslIam))
obs.ConnectionDetails["clusterEndpointScramPublic"] = []byte(awsclients.StringValue(endpoints.BootstrapBrokerStringPublicSaslScram))

case string(svcapitypes.ClusterState_CREATING):
cr.SetConditions(xpv1.Creating())
case string(svcapitypes.ClusterState_FAILED), string(svcapitypes.ClusterState_MAINTENANCE), string(svcapitypes.ClusterState_UPDATING):
Expand All @@ -136,16 +161,6 @@ func postObserve(_ context.Context, cr *svcapitypes.Cluster, obj *svcsdk.Describ
cr.SetConditions(xpv1.Deleting())
}

obs.ConnectionDetails = managed.ConnectionDetails{
// see: https://docs.aws.amazon.com/msk/latest/developerguide/client-access.html
// no endpoint informations available in DescribeClusterOutput only endpoints for zookeeperPlain/Tls
"zookeeperEndpointPlain": []byte(awsclients.StringValue(obj.ClusterInfo.ZookeeperConnectString)),
"zookeeperEndpointTls": []byte(awsclients.StringValue(obj.ClusterInfo.ZookeeperConnectStringTls)),
"clusterEndpointPlain": []byte(strings.ReplaceAll(awsclients.StringValue(obj.ClusterInfo.ZookeeperConnectString), "2181", "9092")),
"clusterEndpointTls": []byte(strings.ReplaceAll(awsclients.StringValue(obj.ClusterInfo.ZookeeperConnectString), "2181", "9094")),
"clusterEndpointIAM": []byte(strings.ReplaceAll(awsclients.StringValue(obj.ClusterInfo.ZookeeperConnectString), "2181", "9098")),
}

return obs, nil
}

Expand Down Expand Up @@ -860,7 +875,7 @@ func generateClientAuthentication(wanted *svcapitypes.ClientAuthentication) *svc
if wanted.SASL.IAM != nil {
iam := &svcsdk.Iam{}
if wanted.SASL.IAM.Enabled != nil {
iam.Enabled = wanted.TLS.Enabled
iam.Enabled = wanted.SASL.IAM.Enabled
}
sasl.Iam = iam
}
Expand Down

0 comments on commit d5bc114

Please sign in to comment.