Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Partition Group Selection on ClusterView Event [API-2259] #915

Merged
merged 20 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/Hazelcast.Net.Testing/NoOpSubsetMembers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,16 @@ internal class NoOpSubsetMembers : ISubsetClusterMembers

public IReadOnlyList<Guid> GetSubsetMemberIds()
=> throw new NotImplementedException();
HashSet<Guid> ISubsetClusterMembers.GetSubsetMemberIds()
=> throw new NotImplementedException();
public void SetSubsetMembers(MemberGroups newGroup)
{
throw new NotImplementedException();
}
public MemberGroups CurrentGroups
{
get;
}
public void RemoveSubsetMember(Guid memberId)
{
throw new NotImplementedException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,52 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Hazelcast.Core;
using Hazelcast.Networking;
using Hazelcast.Testing;
using Hazelcast.Testing.Conditions;
using Hazelcast.Testing.Remote;
using Microsoft.Extensions.Logging;
using NUnit.Framework;
using Thrift.Protocol;
namespace Hazelcast.Tests.Clustering
{
[Category("enterprise")]
[ServerCondition("5.5")]
[Timeout(30_000)]
[Timeout(60_000)]
public class MemberPartitionGroupServerTests : MultiMembersRemoteTestBase
{
protected override string RcClusterConfiguration => Resources.ClusterPGEnabled;

[OneTimeSetUp]
public async Task TearDown()
[SetUp]
public async Task Setup()
{
await CreateCluster();
}
[TearDown]
public async Task TearDown()
{
await MembersOneTimeTearDown();
}

[TestCase(RoutingStrategy.PartitionGroups)]
public async Task TestMultiMemberRoutingWorks(RoutingStrategy routingStrategy)
{
await AssertEx.SucceedsEventually(() => Assert.That(RcMembers.Count, Is.EqualTo(3)), 30_000, 500);
HConsole.Configure(c => c.ConfigureDefaults(this));

var address1 = "127.0.0.1:5701";
var address2 = "127.0.0.1:5702";
var address3 = "127.0.0.1:5703";

var addreses = new string[] { address1, address2, address3 };

// create a client with the given routing strategy
var client1 = await CreateClient(routingStrategy, address1);
var client2 = await CreateClient(routingStrategy, address2);
var client3 = await CreateClient(routingStrategy, address3);
var client1 = await CreateClient(routingStrategy, new string[] { address1 }, "client1");
var client2 = await CreateClient(routingStrategy, new string[] { address2 }, "client2");
var client3 = await CreateClient(routingStrategy, new string[] { address3 }, "client3");

AssertClientOnlySees(client1, address1);
// wait until cluster forms of 3 members
await AssertEx.SucceedsEventually(() => AssertClientOnlySees(client1, address1), 15_000, 500);
AssertClientOnlySees(client2, address2);
AssertClientOnlySees(client3, address3);

Expand All @@ -68,7 +79,7 @@ await AssertEx.SucceedsEventually(() =>
{
AssertClientOnlySees(client1, address1);
}, 10_000, 500, "Client1 did not see the correct members");

AssertClientOnlySees(client2, address2);
AssertClientOnlySees(client3, address3);

Expand All @@ -87,15 +98,15 @@ await AssertEx.SucceedsEventually(()
},
15_000, 500);

Member member3;
Member member;
var nAddress3 = NetworkAddress.Parse(address3);
while (true)
{
member3 = await AddMember();
var created = new NetworkAddress(member3.Host, member3.Port);
member = await AddMember();
var created = new NetworkAddress(member.Host, member.Port);
if (created == nAddress3) break;

await RemoveMember(member3.Uuid);
await RemoveMember(member.Uuid);
}

await AssertEx.SucceedsEventually(()
Expand All @@ -105,7 +116,7 @@ await AssertEx.SucceedsEventually(()
// Check if client1 is connected to the new member
// cannot use the old member id since we can only either kill or create member
Assert.That(client3.Cluster.Connections.Count, Is.EqualTo(1));
Assert.That(client3.Members.Select(p => p.Member.ConnectAddress), Contains.Item(nAddress3));
Assert.That(client3.Members.Where(p => p.IsConnected).Select(p => p.Member.ConnectAddress.ToString()), Contains.Item(address3));

AssertClientOnlySees(client1, address1);
AssertClientOnlySees(client2, address2);
Expand All @@ -119,10 +130,11 @@ public async Task TestClientCollectsClusterEvents(RoutingModes routingMode)
{
var address1 = "127.0.0.1:5701";
var address2 = "127.0.0.1:5702";
var addresses = new string[] { address1, address2 };
var keyCount = 3 * 271;

// create a client with the given routing strategy for catching the events.
var client1 = await CreateClient(RoutingStrategy.PartitionGroups, address1, routingMode);
var client1 = await CreateClient(RoutingStrategy.PartitionGroups, addresses, "client1", routingMode);
var client1Count = 0;
var mapName = $"map_{routingMode}";
var map1 = await client1.GetMapAsync<int, int>(mapName);
Expand All @@ -137,7 +149,7 @@ await map1.SubscribeAsync((eventHandler) =>


// create a dummy client for creating events
var client2 = await CreateClient(RoutingStrategy.PartitionGroups, address2, RoutingModes.SingleMember);
var client2 = await CreateClient(RoutingStrategy.PartitionGroups, addresses, "client2", RoutingModes.SingleMember);

var map2 = await client2.GetMapAsync<int, int>(map1.Name);

Expand All @@ -162,19 +174,23 @@ private void AssertClientOnlySees(HazelcastClient client, string address, int cl
var members = client.Cluster.Members;
var memberId = Guid.Parse(member.Uuid);

Assert.That(members.GetMembers().Count(), Is.EqualTo(clusterSize));
Assert.That(members.GetMembers().Count(), Is.EqualTo(clusterSize), "Current cluster size " + RcMembers.Count);
Assert.That(client.Cluster.Connections.Count, Is.EqualTo(1));
Assert.True(client.Cluster.Connections.Contains(memberId), "Member is not connected");
Assert.That(members.SubsetClusterMembers.GetSubsetMemberIds().Count(), Is.EqualTo(1));
Assert.That(members.SubsetClusterMembers.GetSubsetMemberIds(), Contains.Item(memberId));
}
private async Task<HazelcastClient> CreateClient(RoutingStrategy routingStrategy, string address, RoutingModes routingMode = RoutingModes.MultiMember)
private async Task<HazelcastClient> CreateClient(RoutingStrategy routingStrategy, string[] address, string clientName, RoutingModes routingMode = RoutingModes.MultiMember)
{
var options = new HazelcastOptionsBuilder()
.With(args =>
{
args.ClientName = $"Client:{address}";
args.Networking.Addresses.Add(address);
for (int i = 0; i < address.Length; i++)
{

args.Networking.Addresses.Add(address[i]);
}
args.ClientName = clientName;
args.ClusterName = RcCluster.Id;
args.Networking.RoutingMode.Mode = routingMode;
args.Networking.RoutingMode.Strategy = routingStrategy;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright (c) 2008-2024, Hazelcast, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
using System;
using System.Linq;
using System.Threading.Tasks;
using Hazelcast.Networking;
using Hazelcast.Testing;
using Hazelcast.Testing.Conditions;
using Microsoft.Extensions.Logging;
using NUnit.Framework;
namespace Hazelcast.Tests.Clustering
{
[Category("enterprise,nightly")]
[ServerCondition("5.5")]
[Timeout(60_000)]
public class MemberPartitionGroupServerTestsNightly : MultiMembersRemoteTestBase
{
protected override string RcClusterConfiguration => Resources.ClusterPGEnabled;

[SetUp]
public async Task Setup()
{
await CreateCluster();
}
[TearDown]
public async Task TearDown()
{
await MembersOneTimeTearDown();
}

[TestCase(RoutingStrategy.PartitionGroups)]
public async Task TestMultiMemberRoutingConnectsNextGroupWhenDisconnected(RoutingStrategy routingStrategy)
{
var address1 = "127.0.0.1:5701";
var address2 = "127.0.0.1:5702";
var address3 = "127.0.0.1:5703";

var addreses = new string[] { address1, address2, address3 };

// create a client with the given routing strategy
var client = await CreateClient(routingStrategy, addreses, "client1");

// it should connect to first address
Assert.That(client.Cluster.Connections.Count, Is.EqualTo(1));

var connectedAddress = client.Members.First(p => p.IsConnected).Member.ConnectAddress.ToString();

AssertClientOnlySees(client, connectedAddress);

var effectiveMembers = client.Cluster.Members.GetMembersForConnection();
Assert.That(effectiveMembers.Count(), Is.EqualTo(1));
Assert.That(effectiveMembers.Select(p => p.ConnectAddress.ToString()), Contains.Item(connectedAddress));
// Kill the connected member so that client can go to next group
var connectedMember = RcMembers.Values.Where(m => connectedAddress.Equals($"{m.Host}:{m.Port}")).Select(m => m.Uuid).First();
await RemoveMember(connectedMember);

await AssertEx.SucceedsEventually(() => Assert.That(client.State, Is.EqualTo(ClientState.Disconnected)), 60_000, 500);

await AssertEx.SucceedsEventually(() =>
{
Assert.That(client.Cluster.Connections.Count, Is.EqualTo(1));
Assert.That(client.State, Is.EqualTo(ClientState.Connected));
var reConnectedAddress = client.Members.First(p => p.IsConnected).Member.ConnectAddress.ToString();
effectiveMembers = client.Cluster.Members.GetMembersForConnection();

Assert.That(reConnectedAddress, Is.Not.EqualTo(connectedAddress));
Assert.That(client.Cluster.Connections.Count, Is.EqualTo(1));
Assert.That(client.Members.Where(p => p.IsConnected).Select(p => p.Member.ConnectAddress.ToString()), Contains.Item(reConnectedAddress));
Assert.That(effectiveMembers.Count(), Is.EqualTo(1));
Assert.That(effectiveMembers.Select(p => p.ConnectAddress.ToString()), Contains.Item(reConnectedAddress));
}, 60_000, 500);

}

private void AssertClientOnlySees(HazelcastClient client, string address, int clusterSize = 3)
{
var member = RcMembers.Values.FirstOrDefault(m => address.Equals($"{m.Host}:{m.Port}"));

Assert.IsNotNull(member);

var members = client.Cluster.Members;
var memberId = Guid.Parse(member.Uuid);

Assert.That(members.GetMembers().Count(), Is.EqualTo(clusterSize), "Current cluster size " + RcMembers.Count);
Assert.That(client.Cluster.Connections.Count, Is.EqualTo(1));
Assert.True(client.Cluster.Connections.Contains(memberId), "Member is not connected");
Assert.That(members.SubsetClusterMembers.GetSubsetMemberIds().Count(), Is.EqualTo(1));
Assert.That(members.SubsetClusterMembers.GetSubsetMemberIds(), Contains.Item(memberId));
}
private async Task<HazelcastClient> CreateClient(RoutingStrategy routingStrategy, string[] address, string clientName, RoutingModes routingMode = RoutingModes.MultiMember)
{
var options = new HazelcastOptionsBuilder()
.With(args =>
{
for (int i = 0; i < address.Length; i++)
{

args.Networking.Addresses.Add(address[i]);
}
args.ClientName = clientName;
args.ClusterName = RcCluster.Id;
args.Networking.RoutingMode.Mode = routingMode;
args.Networking.RoutingMode.Strategy = routingStrategy;
args.Networking.ReconnectMode = ReconnectMode.ReconnectSync;

args.AddSubscriber(on => on.StateChanged((client, eventArgs) =>
{
Console.WriteLine(eventArgs.State);
}));

args.LoggerFactory.Creator = () => Microsoft.Extensions.Logging.LoggerFactory.Create(
conf => conf.AddConsole().SetMinimumLevel(LogLevel.Debug));

})
.Build();

var client = (HazelcastClient) await HazelcastClientFactory.StartNewClientAsync(options);
return client;
}
private async Task CreateCluster(int size = 3)
{
for (int i = 0; i < size; i++)
{
await AddMember();
}
}
}
}
Loading
Loading