Skip to content

Commit

Permalink
[release-v1.14] Clientpool fixes (knative-extensions#1135)
Browse files Browse the repository at this point in the history
* fix: virtual secrets created from netspec have name and namespace (knative-extensions#4001)

* fix: virtual secrets created from netspec have name and namespace

Signed-off-by: Calum Murray <[email protected]>

* cleanup: return a secret instead of data, name, namespace

Signed-off-by: Calum Murray <[email protected]>

---------

Signed-off-by: Calum Murray <[email protected]>

* fix: do not double increment clientpool wg (knative-extensions#4005)

* fix: do not double increment clientpool wg

Signed-off-by: Calum Murray <[email protected]>

* test: verify that client is successfully closed

Signed-off-by: Calum Murray <[email protected]>

* fix: test has no data race

Signed-off-by: Calum Murray <[email protected]>

* address review comments

Signed-off-by: Calum Murray <[email protected]>

---------

Signed-off-by: Calum Murray <[email protected]>

---------

Signed-off-by: Calum Murray <[email protected]>
  • Loading branch information
Cali0707 authored Jul 26, 2024
1 parent 8f9916d commit 884bdb1
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 13 deletions.
14 changes: 11 additions & 3 deletions control-plane/pkg/kafka/clientpool/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ type GetKafkaClientFunc func(ctx context.Context, bootstrapServers []string, sec
type GetKafkaClusterAdminFunc func(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.ClusterAdmin, error)

func (cp *ClientPool) GetClient(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error) {
client, err := cp.getClient(ctx, bootstrapServers, secret)
if err != nil {
return nil, err
}

client.incrementCallers()
return client, nil
}

func (cp *ClientPool) getClient(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (*client, error) {
// (bootstrapServers, secret) uniquely identifies a sarama client config with the options we allow users to configure
key := makeClusterAdminKey(bootstrapServers, secret)

Expand All @@ -76,7 +86,6 @@ func (cp *ClientPool) GetClient(ctx context.Context, bootstrapServers []string,
// if a corresponding connection already exists, lets use it
if val, ok := cp.cache.Get(key); ok && val.hasCorrectSecretVersion(secret) {
logger.Debug("successfully got a client from the clientpool")
val.incrementCallers()
return val, nil
}
logger.Debug("failed to get an existing client, going to create one")
Expand Down Expand Up @@ -108,13 +117,12 @@ func (cp *ClientPool) GetClient(ctx context.Context, bootstrapServers []string,

logger.Debug("Closing client")

if err := value.client.Close(); !errors.Is(err, sarama.ErrClosedClient) {
if err := value.client.Close(); err != nil && !errors.Is(err, sarama.ErrClosedClient) {
logger.Errorw("Failed to close client", zap.Error(err))
}
}()
})

val.incrementCallers()
return val, nil
}

Expand Down
43 changes: 43 additions & 0 deletions control-plane/pkg/kafka/clientpool/clientpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/IBM/sarama"
"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kafkatesting "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/testing"
Expand Down Expand Up @@ -117,6 +118,48 @@ func TestGetClusterAdmin(t *testing.T) {
cancel()
}

func TestClientCloses(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)

cache := prober.NewLocalExpiringCache[clientKey, *client, struct{}](ctx, time.Second*1)
clientClosed := atomic.NewBool(false)
adminClosed := atomic.NewBool(false)

clients := &ClientPool{
cache: cache,
newSaramaClient: func(_ []string, _ *sarama.Config) (sarama.Client, error) {
return &kafkatesting.MockKafkaClient{OnClose: func() {
clientClosed.Toggle()
}}, nil
},
newClusterAdminFromClient: func(c sarama.Client) (sarama.ClusterAdmin, error) {
return &kafkatesting.MockKafkaClusterAdmin{ExpectedTopics: []string{"topic1"}, OnClose: func() {
c.Close()
adminClosed.Toggle()
}}, nil
},
}

client1, err := clients.GetClient(ctx, []string{"localhost:9092"}, nil)
assert.NoError(t, err)

clusterAdmin, err := clients.GetClusterAdmin(ctx, []string{"localhost:9092"}, nil)
assert.NoError(t, err)

clusterAdmin.Close()
client1.Close()

time.Sleep(time.Second * 2)

// the client should have been closed successfully now
assert.True(t, clientClosed.Load())
assert.True(t, adminClosed.Load())

cancel()
}

func TestMakeClientKey(t *testing.T) {
key1 := makeClusterAdminKey([]string{"localhost:9090", "localhost:9091", "localhost:9092"}, &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "secret", Namespace: "knative-eventing"}})
key2 := makeClusterAdminKey([]string{"localhost:9092", "localhost:9091", "localhost:9090"}, &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "secret", Namespace: "knative-eventing"}})
Expand Down
4 changes: 1 addition & 3 deletions control-plane/pkg/kafka/clientpool/clusteradmin.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ func clusterAdminFromClient(saramaClient sarama.Client, makeClusterAdmin kafka.N
return nil, err
}

c.incrementCallers()

return &clusterAdmin{
client: c,
clusterAdmin: ca,
Expand Down Expand Up @@ -291,5 +289,5 @@ func (a *clusterAdmin) RemoveMemberFromConsumerGroup(groupId string, groupInstan
}

func (a *clusterAdmin) Close() error {
return a.client.Close()
return a.clusterAdmin.Close()
}
5 changes: 5 additions & 0 deletions control-plane/pkg/kafka/testing/admin_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type MockKafkaClusterAdmin struct {

ErrorOnDeleteConsumerGroup error

OnClose func()

T *testing.T
}

Expand Down Expand Up @@ -301,5 +303,8 @@ func (m *MockKafkaClusterAdmin) RemoveMemberFromConsumerGroup(groupId string, gr

func (m *MockKafkaClusterAdmin) Close() error {
m.ExpectedClose = true
if m.OnClose != nil {
m.OnClose()
}
return m.ExpectedCloseError
}
4 changes: 4 additions & 0 deletions control-plane/pkg/kafka/testing/client_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type MockKafkaClient struct {
ShouldFailRefreshMetadata bool
ShouldFailRefreshBrokers bool
ShouldFailBrokenPipe bool
OnClose func()
}

var _ sarama.Client = &MockKafkaClient{}
Expand Down Expand Up @@ -195,6 +196,9 @@ func (m MockKafkaClient) LeastLoadedBroker() *sarama.Broker {

func (m *MockKafkaClient) Close() error {
m.IsClosed = true
if m.OnClose != nil {
m.OnClose()
}
return m.CloseError
}

Expand Down
31 changes: 26 additions & 5 deletions control-plane/pkg/security/secrets_provider_net_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ package security
import (
"context"
"fmt"
"sort"
"strings"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corelisters "k8s.io/client-go/listers/core/v1"
bindings "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/bindings/v1beta1"

Expand Down Expand Up @@ -53,21 +56,21 @@ func ResolveAuthContextFromNetSpec(lister corelisters.SecretLister, namespace st
return nil, err
}
}
references, virtualSecretData := toContract(securityFields)
references, virtualSecret := toContract(securityFields)
multiSecretReference := &contract.MultiSecretReference{
Protocol: getProtocolContractFromNetSpec(netSpec),
References: references,
}
virtualSecretData[ProtocolKey] = []byte(getProtocolFromNetSpec(netSpec))
virtualSecret.Data[ProtocolKey] = []byte(getProtocolFromNetSpec(netSpec))

authContext := &NetSpecAuthContext{
VirtualSecret: &corev1.Secret{Data: virtualSecretData},
VirtualSecret: &virtualSecret,
MultiSecretReference: multiSecretReference,
}
return authContext, nil
}

func toContract(securityFields []*securityField) ([]*contract.SecretReference, map[string][]byte) {
func toContract(securityFields []*securityField) ([]*contract.SecretReference, corev1.Secret) {
virtualSecretData := make(map[string][]byte)
bySecretName := make(map[string][]securityField)
for _, f := range securityFields {
Expand All @@ -79,6 +82,8 @@ func toContract(securityFields []*securityField) ([]*contract.SecretReference, m
}

refs := make([]*contract.SecretReference, 0, 6 /* max number of secrets */)
names := make([]string, 0, 6)
namespaces := make([]string, 0, 6)
for secretName, securityFields := range bySecretName {
keyFieldReferences := make([]*contract.KeyFieldReference, 0, len(securityFields))
for _, f := range securityFields {
Expand All @@ -97,8 +102,16 @@ func toContract(securityFields []*securityField) ([]*contract.SecretReference, m
},
KeyFieldReferences: keyFieldReferences,
})
names = append(names, any.secret.Name)
namespaces = append(namespaces, any.secret.Namespace)
}
return refs, corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: stableConcat(names),
Namespace: stableConcat(namespaces),
},
Data: virtualSecretData,
}
return refs, virtualSecretData
}

type securityField struct {
Expand Down Expand Up @@ -160,3 +173,11 @@ func resolveSecret(lister corelisters.SecretLister, ns string, ref *corev1.Secre
}
return value, secret, nil
}

func stableConcat(elements []string) string {
sort.SliceStable(elements, func(i, j int) bool {
return elements[i] < elements[j]
})

return strings.Join(elements, "")
}
24 changes: 22 additions & 2 deletions control-plane/pkg/security/secrets_provider_net_spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,11 @@ func TestResolveAuthContextFromNetSpec(t *testing.T) {
SaslMechanismKey: SaslScramSha512,
SaslUserKey: "key",
SaslPasswordKey: "key",
}},
}, ObjectMeta: metav1.ObjectMeta{
Name: "cacertcertkeypasswordtypeuser",
Namespace: "nsnsnsnsnsns",
},
},
MultiSecretReference: &contract.MultiSecretReference{
Protocol: contract.Protocol_SASL_SSL,
References: []*contract.SecretReference{
Expand Down Expand Up @@ -249,7 +253,11 @@ func TestResolveAuthContextFromNetSpec(t *testing.T) {
SaslMechanismKey: SaslScramSha256,
SaslUserKey: "key",
SaslPasswordKey: "key",
}},
}, ObjectMeta: metav1.ObjectMeta{
Name: "cacertcertkeypasswordtypeuser",
Namespace: "nsnsnsnsnsns",
},
},
MultiSecretReference: &contract.MultiSecretReference{
Protocol: contract.Protocol_SASL_SSL,
References: []*contract.SecretReference{
Expand Down Expand Up @@ -339,6 +347,9 @@ func TestResolveAuthContextFromNetSpec(t *testing.T) {
CaCertificateKey: "key",
UserCertificate: "key",
UserKey: "key",
}, ObjectMeta: metav1.ObjectMeta{
Name: "cacertcertkey",
Namespace: "nsnsns",
}},
MultiSecretReference: &contract.MultiSecretReference{
Protocol: contract.Protocol_SSL,
Expand Down Expand Up @@ -400,6 +411,9 @@ func TestResolveAuthContextFromNetSpec(t *testing.T) {
ProtocolKey: ProtocolSSL,
UserCertificate: "key",
UserKey: "key",
}, ObjectMeta: metav1.ObjectMeta{
Name: "certkey",
Namespace: "nsns",
}},
MultiSecretReference: &contract.MultiSecretReference{
Protocol: contract.Protocol_SSL,
Expand Down Expand Up @@ -466,6 +480,9 @@ func TestResolveAuthContextFromNetSpec(t *testing.T) {
SaslMechanismKey: SaslScramSha256,
SaslUserKey: "key",
SaslPasswordKey: "key",
}, ObjectMeta: metav1.ObjectMeta{
Name: "passwordtypeuser",
Namespace: "nsnsns",
}},
MultiSecretReference: &contract.MultiSecretReference{
Protocol: contract.Protocol_SASL_PLAINTEXT,
Expand Down Expand Up @@ -538,6 +555,9 @@ func TestResolveAuthContextFromNetSpec(t *testing.T) {
SaslMechanismKey: SaslScramSha512,
SaslUserKey: "key",
SaslPasswordKey: "key",
}, ObjectMeta: metav1.ObjectMeta{
Name: "passwordtypeuser",
Namespace: "nsnsns",
}},
MultiSecretReference: &contract.MultiSecretReference{
Protocol: contract.Protocol_SASL_PLAINTEXT,
Expand Down

0 comments on commit 884bdb1

Please sign in to comment.