Skip to content

Commit

Permalink
ns
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Nov 13, 2024
1 parent e79568e commit d915c80
Showing 1 changed file with 39 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.jgroups.*;
import org.jgroups.stack.Protocol;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.util.MyReceiver;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.DataProvider;
Expand All @@ -18,12 +19,12 @@
* Some of the tests may fail occasionally until https://issues.redhat.com/browse/JGRP-1594 is fixed
* @author Bela Ban
*/
@Test(groups=Global.FUNCTIONAL,singleThreaded=true)
@Test(groups=Global.FUNCTIONAL,singleThreaded=true,dataProvider="configProvider")
public class UNICAST_ConnectionTests {
protected JChannel a, b;
protected Address a_addr, b_addr;
protected MyReceiver r1, r2;
protected Protocol u1, u2;
protected JChannel a, b;
protected Address a_addr, b_addr;
protected MyReceiver<Integer> r1, r2;
protected Protocol u1, u2;
protected static final String CLUSTER="UNICAST_ConnectionTests";


Expand All @@ -36,8 +37,8 @@ static Object[][] configProvider() {
}

protected void setup(Class<? extends Protocol> unicast_class) throws Exception {
r1=new MyReceiver("A");
r2=new MyReceiver("B");
r1=new MyReceiver<Integer>().name("A");
r2=new MyReceiver<Integer>().name("B");
a=createChannel(unicast_class, "A");
a.connect(CLUSTER);
a_addr=a.getAddress();
Expand Down Expand Up @@ -78,7 +79,7 @@ public void testBothChannelsClosing(Class<? extends Protocol> unicast) throws Ex
System.out.println("==== Closing the connections on both sides");
removeConnection(u1, b_addr);
removeConnection(u2, a_addr);
r1.clear(); r2.clear();
r1.reset(); r2.reset();

// causes new connection establishment
sendToEachOtherAndCheck(10);
Expand Down Expand Up @@ -117,7 +118,6 @@ public void testBClosingUnilaterally(Class<? extends Protocol> unicast) throws E
sendAndCheck(a, b_addr, 10, r2);
}

@Test(dataProvider="configProvider")
public void testBRemovingUnilaterally(Class<? extends Protocol> unicast) throws Exception {
setup(unicast);
sendAndCheck(a, b_addr, 10, r2);
Expand All @@ -130,6 +130,18 @@ public void testBRemovingUnilaterally(Class<? extends Protocol> unicast) throws
sendAndCheck(a, b_addr, 10, r2);
}

public void testBRemovingUnilaterallyOOB(Class<? extends Protocol> unicast) throws Exception {
setup(unicast);
sendAndCheck(a, b_addr, 10, r2);

// now remove connection on A unilaterally
System.out.println("==== Removing the connection on B");
removeConnection(u2, a_addr, true);

// then send OOB messages from A to B
sendAndCheck(a, b_addr, true, 10, r2);
}


/**
* Scenario #6 (A closes the connection unilaterally (B keeps it open), then reopens it and sends messages,
Expand Down Expand Up @@ -162,12 +174,10 @@ public void testMultipleConcurrentResets(Class<? extends Protocol> unicast) thro
System.out.println("==== Closing the connection on A");
removeConnection(u1, b_addr);

r2.clear();

r2.reset();
final Protocol ucast=b.getProtocolStack().findProtocol(Util.getUnicastProtocols());

int NUM=10;

final List<Message> msgs=new ArrayList<>(NUM);

for(int i=1; i <= NUM; i++) {
Expand Down Expand Up @@ -198,7 +208,7 @@ public void testMultipleConcurrentResets(Class<? extends Protocol> unicast) thro
for(Thread thread: threads)
thread.join();

List<Integer> list=r2.getMessages();
List<Integer> list=r2.list();
System.out.println("list = " + print(list));

assert list.size() == 1 : "list must have 1 element but has " + list.size() + ": " + print(list);
Expand Down Expand Up @@ -241,8 +251,8 @@ protected void sendToEachOtherAndCheck(int num) throws Exception {
a.send(b_addr, i);
b.send(a_addr, i);
}
List<Integer> l1=r1.getMessages();
List<Integer> l2=r2.getMessages();
List<Integer> l1=r1.list();
List<Integer> l2=r2.list();
for(int i=0; i < 10; i++) {
if(l1.size() == num && l2.size() == num)
break;
Expand All @@ -254,17 +264,21 @@ protected void sendToEachOtherAndCheck(int num) throws Exception {
assert l2.size() == num;
}

protected static void sendAndCheck(JChannel channel, Address dest, int num, MyReceiver receiver) throws Exception {
receiver.clear();
for(int i=1; i <= num; i++)
channel.send(dest, i);
List<Integer> list=receiver.getMessages();
for(int i=0; i < 20; i++) {
if(list.size() == num)
break;
Util.sleep(500);
protected static void sendAndCheck(JChannel channel, Address dest, int num, MyReceiver<Integer> r) throws Exception {
sendAndCheck(channel, dest, false, num, r);
}

protected static void sendAndCheck(JChannel channel, Address dest, boolean oob, int num, MyReceiver<Integer> r) throws Exception {
r.reset();
for(int i=1; i <= num; i++) {
Message msg=new ObjectMessage(dest, i);
if(oob)
msg.setFlag(Message.Flag.OOB);
channel.send(msg);
}
System.out.println("list = " + print(list));
List<Integer> list=r.list();
Util.waitUntilTrue(10000, 500, () -> list.size() == num);
System.out.println("list = " + list);
int size=list.size();
assert size == num : "list has " + size + " elements (expected " + num + "): " + list;
}
Expand All @@ -289,28 +303,6 @@ protected static JChannel createChannel(Class<? extends Protocol> unicast_class,
return new JChannel(new SHARED_LOOPBACK(), unicast).name(name);
}

protected static class MyReceiver implements Receiver {
final String name;
final List<Integer> msgs=new ArrayList<>(20);

public MyReceiver(String name) {
this.name=name;
}

public void receive(Message msg) {
synchronized(msgs) {
msgs.add(msg.getObject());
}
}

public List<Integer> getMessages() {return msgs;}
public void clear() {msgs.clear();}
public int size() {return msgs.size();}

public String toString() {
return name;
}
}

protected static class Drop extends Protocol {
protected volatile boolean drop_next=false;
Expand Down

0 comments on commit d915c80

Please sign in to comment.