-
Notifications
You must be signed in to change notification settings - Fork 12
/
nsdperf.C
executable file
·9906 lines (8619 loc) · 265 KB
/
nsdperf.C
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/******************************************************************************
nsdperf - Test network performance, simulating GPFS NSD client / server
See the README file for information about building and running
this program.
Changes in version 1.28:
* Use a global table to hold pending replies. The table is split into
multiple buckets with a mutex for each one. This should improve SMP
performance, and also work more like GPFS.
* For reply table, use a hash table with a free list per bucket instead
of a map to avoid the need for shared memory allocation for entry
elements.
* On x86_64, use time stamp counter, if possible, to measure message
transmission times. This should have less overhead than making a
system call to fetch the time of day.
* Use global variable to stop tester threads so that they don't need to
look at the time.
* Use atomic ops to increment and decrement connection hold counter.
Don't increment counter when creating a RcvMsg or ReplyEntry object
since the connection won't go away while these exist.
* Do zero-filling properly when printing timestamps.
* Avoid aliasing complaints when compiling with -O3.
* Show version in server startup log message.
Changes in version 1.27:
* Add support for more than one RDMA port when using Connection
Manager. Instead of using the IP address given on client or server
command when making connections, scan all interfaces and use the
ones whose IPv6 link-local addresses match the RDMA port interface
identifier. Only use interfaces that have a real (not link-local)
IP address assigned.
* Use only one RDMA context, completion channel, protection domain,
and memory registration per device instead of one per port.
* Don't bind to a specific port number when listening for Connection
Manager requests. Use whatever available port is assigned. Remove
the cmport command.
* Increase the default number of message worker threads from 10 to 32.
* Change "subnet number" to "fabric number" in RDMA port specification.
* Add RDMA device name and port to some messages.
* Improve the formatting of timestamps in debug output.
Changes in version 1.26:
* Allow RDMA connecting to work when multiple fabrics are present.
For now, this is done by supplying a subnet number in the port
definition. RDMA connections will only be made between ports
whose subnet numbers match. Eventually this should be fixed so
that the subnet identity is determined automatically.
* Instead of using just one RDMA completion queue per device, use
one for each target node.
Changes in version 1.25:
* Add "sinline" command to enable use of inline data in RDMA send.
Changes in version 1.24:
* Allow more than one RDMA port to be used. The "-r" command line
option can be used to select which ports to use.
* New "maxrdma" command to specify maximum number of RDMA ports to use.
* Send changes in debug level (from "debug" command) to remote hosts.
Changes in version 1.23:
* Add "usecm" command to enable use of Connection Manager to establish
RDMA connections.
* Add "cmport" command for specifying Connection Manager port number.
* Show connection number along with destination host name.
Changes in version 1.22:
* Increase the maximum number of tester threads to 4096 and the
maximum number of parallel connections to 8191.
* Update help messages.
Changes in version 1.20:
* Command line option for specifying the number of receiver threads is
now "-t" rather than "-r".
* Add "-r" command line option for specifying the RDMA device and port.
* On Linux systems, use epoll to wait for socket data.
Changes in version 1.16:
* Add a command to verify data message contents.
* Add new RDMA options for sending control messages over the RDMA
interface and sending data inline with control messages.
Changes in version 1.14:
* Add support for RDMA connections using verbs library.
* Serialize writing of log message to stdout.
* Include message rate in output statistics.
* Require a space after "/" when used as a command delimiter.
Changes in version 1.13:
* Fixed ordering of in-use waiter queue that caused excessively
long waits.
* Use a separate pending reply table and message id for each
connection to avoid contention for global locks.
* Use nanosecond resolution for times.
******************************************************************************/
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#define _LINUX_SOURCE_COMPAT
#include <algorithm>
#include <cstdio>
#include <cstdlib>
#include <deque>
#include <fstream>
#include <iomanip>
#include <iostream>
#include <list>
#include <map>
#include <new>
#include <queue>
#include <set>
#include <sstream>
#include <string>
#include <vector>
#include <fcntl.h>
#include <limits.h>
#include <math.h>
#include <netdb.h>
#include <pthread.h>
#include <signal.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/uio.h>
#include <sys/errno.h>
#include <sys/ioctl.h>
#include <sys/poll.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/un.h>
#ifdef _AIX
#include <libperfstat.h>
#endif
#ifdef RDMA
#include <ifaddrs.h>
#include <infiniband/verbs.h>
#include <syscall.h>
/* Omni-Path 8K MTU Support */
#ifndef IBV_MTU_8192
#define IBV_MTU_8192 ((enum ibv_mtu)6)
#endif
#include <net/if.h>
#include <rdma/rdma_cma.h>
/* Maximum number of ibv_send_wr for RDMA read and write that can be
chained and posted to WQ */
#define MAX_RDMA_SEND_WR 32
static int max_send_wr = MAX_RDMA_SEND_WR;
#endif
#ifdef AF_INET6
#define IPV6_SUPPORT
#endif
#ifdef __linux
#define USE_EPOLL
#endif
#ifdef USE_EPOLL
#include <sys/epoll.h>
#endif
#ifdef __sparc
#define strerror_r(_e,_b,_s) strerror(_e)
#define INADDR_NONE INADDR_BROADCAST
#include <sys/filio.h>
#endif
using namespace std;
// Sized types
typedef unsigned char UChar;
typedef short Int16;
typedef int Int32;
typedef long long Int64;
typedef unsigned short UInt16;
typedef unsigned int UInt32;
typedef unsigned long long UInt64;
// High-resolution time (nanosecond units)
typedef long long HTime;
// These definitions allow compatibility with the Windows Socket Library.
// They work in conjunction with a GPFS interface library.
#ifndef USE_WINSOCK
// Socket type
typedef int Sock;
// Socket event description for the poll() function
typedef struct pollfd PollSock;
// The customary invalid socket value.
static const Sock INVALID_SOCK = -1;
#endif
#define CACHE_LINE_SIZE 128
#if CACHE_LINE_SIZE == 0
#define __CACHE_LINE_ALIGNED__
#elif defined(__GNUC__)
#define __CACHE_LINE_ALIGNED__ __attribute__ ((aligned (CACHE_LINE_SIZE)))
#else
#define __CACHE_LINE_ALIGNED__
#endif
#define MIN_VERBS_SEND_SGE (1)
#define MAX_VERBS_SEND_SGE (128)
#define DEFAULT_VERBS_SEND_SGE (27)
#define DEF_SCATTER_BYTES (262144)
typedef struct GlobalVerbs_t
{
int VerbsRdmaMaxSendBytes;
int VerbsMaxSendSge;
} GlobalVerbs_t;
GlobalVerbs_t GlobalVerbs __CACHE_LINE_ALIGNED__ =
{
DEF_SCATTER_BYTES, /* VerbsRdmaMaxSendBytes */
DEFAULT_VERBS_SEND_SGE, /* VerbsMaxSendSge */
};
// Message ID type
typedef UInt32 MsgId;
// Program version
static const string version = "1.28";
// Default port to use
static const int NSDPERF_PORT = 6668;
// Default number of message worker threads
static const int MSG_WORKERS = 32;
// Default number of tester threads
static const unsigned int TESTER_THREADS = 4;
// Maximum number of tester threads
static const unsigned int MAX_TESTERS = 4096;
// Maximum size for data buffer
static const unsigned int DEF_BUFFSIZE = 4 * 1024 * 1024;
static const unsigned int MIN_BUFFSIZE = 4 * 1024;
static const unsigned int MAX_BUFFSIZE = 16 * 1024 * 1024;
// Maximum size for data part of a control message
static const unsigned int MAX_RPCSIZE = 64;
// Maximum TCP send/receive buffer size
static const int MAX_SOCKSIZE = 100 * 1024 * 1024;
// Maximum number of FDs to pass to poll
static const int MAX_POLLFD_NUM = 8192;
// Value for maxrdma that means unlimited
static const int MAXRDMA_UNLIMITED = INT_MAX;
// Maximum number of parallel connections
static const int MAX_PARALLEL = MAX_POLLFD_NUM - 1;
// Size of a message header
static const unsigned int MSG_HDRSIZE = 4 * sizeof(UInt32) + 6 * sizeof(HTime);
// Magic number for message headers
static const UInt32 MSG_MAGIC = 0x1F2E3D4CU;
// For scrambling data in test data buffers
static const UInt64 SCRAMBLE = 0x0305070b0d111317ULL;
// Maximum possible value for an HTime
static const long long MAX_HTIME = 0x7FFFFFFFFFFFFFFFLL;
// For ioctl arguments
static const int on = 1;
// Flag to use in sendmsg system call to prevent EPIPE errors. Some
// systems don't have this.
#ifdef MSG_NOSIGNAL
static const int SENDMSG_FLAGS = MSG_NOSIGNAL;
#else
static const int SENDMSG_FLAGS = 0;
#endif
// Forward declarations
class MsgRecord;
class MsgWorker;
class PollWait;
struct RcvMsg;
class RdmaConn;
struct RdmaPort;
class RdmaReceiver;
class Receiver;
struct Target;
struct TestReq;
class Tester;
class Thread;
// Error macros
#define Warn(msg) do \
{ pthread_mutex_lock(&logMutex); \
cerr << progname << ": Warning: " << msg << endl; \
pthread_mutex_unlock(&logMutex); } while (false)
#define Warnm(msg) do \
{ pthread_mutex_lock(&logMutex); \
cerr << progname << ": Warning: " << msg << ": "; perror(""); \
pthread_mutex_unlock(&logMutex); } while (false)
#define Error(msg) do \
{ pthread_mutex_lock(&logMutex); cerr << progname << ": " << msg << endl; \
exit(EXIT_FAILURE); } while (false)
#define Errorm(msg) do \
{ pthread_mutex_lock(&logMutex); cerr << progname << ": " << msg << ": "; \
perror(""); exit(EXIT_FAILURE); } while (false)
// Console log output macros
#define Log(msg) do \
{ thLock(&logMutex); cout << msg << endl; thUnlock(&logMutex); } \
while (false)
#define Logt(lev, msg) do \
{ if (debugLevel >= (lev)) \
{ thLock(&logMutex); \
cout << httostr(getTime()) << " " << msg << endl; \
thUnlock(&logMutex); } \
} while (false)
#define Logm(msg) do \
{ int e = errno; thLock(&logMutex); \
cout << msg << ": " << geterr(e) << endl; \
thUnlock(&logMutex); } while (false)
// Error codes
enum Errno
{
E_OK,
E_INVAL,
E_NOENT,
E_WOULDBLOCK,
E_CONNRESET,
E_BADMSG,
E_BROKEN,
E_SENDFAILED,
E_REPLY,
E_CONNFAILED
};
// Address types for IpAddr
enum Atype
{
AT_IPV4,
AT_IPV6
};
// Structure for holding an IPv4 or IPv6 address
struct IpAddr
{
Atype fam;
UChar a[16];
IpAddr() { setNone(); }
bool operator<(const IpAddr &iaddr) const;
bool operator!=(const IpAddr &iaddr) const;
void setNone();
void setAny();
Errno parse(const string hostname);
void loadSockaddr(const sockaddr *saddrP);
string toString() const;
sockaddr *toSockaddr(UInt16 port, sockaddr_storage *sockBuffP) const;
socklen_t getSocklen() const;
static int getSize() { return sizeof(UInt32) + sizeof(UChar[16]); /* a */ }
#ifdef IPV6_SUPPORT
int getFamily() const { return (fam == AT_IPV6) ? AF_INET6 : AF_INET; }
#else
int getFamily() const { return AF_INET; }
#endif
bool isLinkLocal() const;
bool isNone() const;
};
// Structure for holding an RDMA memory address (i.e. a character pointer)
// for passing between nodes.
struct RdmaAddr
{
UInt64 addr;
RdmaAddr() { addr = 0; }
RdmaAddr(UInt64 a) : addr(a) {}
RdmaAddr(char *p) : addr(reinterpret_cast<UInt64>(p)) {}
operator char *() { return reinterpret_cast<char *>(addr); }
operator UInt64() { return addr; }
};
// Structure to record time line of each message
struct TimeLine
{
HTime rdStartStamp;
HTime rdFinStamp;
HTime msgSendStamp; // Time stamp when message is sent on sender side
HTime msgRecvStamp; // Time stamp when message is received and recognized on receiver side
HTime replySendStamp; // Time stamp when message is sent on sender side
HTime replyRecvStamp; // Time stamp when message is received and recognized on receiver side
TimeLine()
{
rdStartStamp = 0;
rdFinStamp = 0;
msgSendStamp = 0;
msgRecvStamp = 0;
replySendStamp = 0;
replyRecvStamp = 0;
}
~TimeLine() {}
HTime getNetworkDelay()
{
return (rdFinStamp - rdStartStamp + replyRecvStamp - msgSendStamp - \
(replySendStamp - msgRecvStamp));
}
};
// Abstract base class for threads. Derived class must supply the routine
// body.
class Thread
{
pthread_t th;
protected:
bool running;
public:
Thread() : th(0), running(false) {}
virtual ~Thread() {}
void init();
void startup();
pthread_t getThread() const { return th; }
virtual int threadBody() = 0;
};
// Thread states
enum thState { tsRun, tsDie, tsDead };
// Data buffer that allows putting and getting items for sending in messages
class DataBuff
{
char *buffP; // The data buffer
unsigned int alloc; // Actual allocated length
unsigned int bufflen; // Length currently assigned to buffer
unsigned int buffpos; // Position for put/get
char *auxBuffP; // Extra buffer used for inline RDMA
unsigned int auxlen; // Length of extra buffer
public:
DataBuff()
{ buffP = auxBuffP = NULL; alloc = bufflen = buffpos = auxlen = 0; }
DataBuff(unsigned int len) { alloc = 0; newBuff(len); }
DataBuff(char *dP, unsigned int len) { alloc = 0; initBuff(dP, len); }
~DataBuff() { if (alloc > 0) delete [] buffP; }
void newBuff(unsigned int len);
void initBuff(char *dataP, unsigned int datalen);
void fillBuff(UInt64 seed);
bool verifyBuff(UInt64 seed);
void resetBuff() { buffpos = 0; }
char *getBuffP() { return buffP; }
unsigned int getBufflen() const { return bufflen; }
void setAux(char *dataP, unsigned int datalen)
{ auxBuffP = dataP; auxlen = datalen; }
char *getAuxBuffP() { return auxBuffP; }
unsigned int getAuxlen() { return auxlen; }
void putUInt16(UInt16 i);
void putInt32(Int32 i);
void putUInt32(UInt32 i);
void putUInt64(UInt64 i);
void putRdmaAddr(RdmaAddr a);
void putHTime(HTime t);
void putIpAddr(IpAddr iaddr);
void putString(string s);
void putTimeLine(TimeLine *timeline);
UInt16 getUInt16();
Int32 getInt32();
UInt32 getUInt32();
UInt64 getUInt64();
RdmaAddr getRdmaAddr();
HTime getHTime();
IpAddr getIpAddr();
string getString();
TimeLine* getTimeLine();
};
// Gather histogram of response times
class Histogram
{
map<HTime, UInt32> buckets;
int nEvents;
HTime totalTime;
public:
Histogram() { nEvents = 0; totalTime = 0; }
void addEntry(HTime t);
void addHist(const Histogram *hP);
void printHist(ostream &os) const;
void putBuff(DataBuff *dbP) const;
void getBuff(DataBuff *dbP);
int getNevents() const { return nEvents; }
unsigned int calcLen() const
{ return sizeof(UInt64) + 2 * sizeof(UInt32) +
buckets.size() * (sizeof(UInt64) + sizeof(UInt32)); }
double average() const;
double median() const;
double standardDeviation() const;
double minTime() const;
double maxTime() const;
UInt32 maxBucket() const;
};
// Compare two histogram bucket entries by bucket value
struct BucketCmp
{
bool operator()(pair<HTime, UInt32> p1, pair<HTime, UInt32> p2) const
{ return p1.second < p2.second; }
};
// Receive states of a connection
enum RState
{
rcv_idle,
rcv_header,
rcv_data
};
// Message type codes
enum MType
{
mtUnknown, // Unknown
mtReply, // Reply message
mtReplyErr, // Error reply message
mtVersion, // Query version
mtWrite, // Accept buffer of test data
mtRead, // Send back a buffer of test data
mtNwrite, // NSD style write: target asks for test data
mtGetdata, // Fetch data in response to NSD write request
mtKill, // Tell destination node to exit
mtConnect, // Connect to specified servers
mtReset, // Close any existing server connections
mtRdmaDone, // Close RDMA connections to servers at end of test
mtRdmaConn, // Set up RDMA connection
mtRdmaGetBuffs, // Return addresses of RDMA buffers to use for test
mtRdmaDisconnCM, // Tell connection manager to disconnect
mtRdmaDisconn, // Tear down RDMA connection
mtRdmaCleanup, // Delete RDMA connection object
mtRdmaWrite, // Accept data that has been written through RDMA
mtParms, // Set test parameters
mtAlloc, // Allocate memory buffers
mtFree, // Free buffers allocated by mtAlloc
mtTest, // Run performance test to all servers
mtStatus, // Return node status
mtStatOn, // Turn on test statistics gathering
mtStatOff, // Turn off test statistics gathering
mtIdlePct, // Get idle CPU percentage from last test
mtLast // Highest message type number
};
// An entry in the queue of waiters for exclusive use of a socket for sending
struct InuseWaiter
{
// Wake this when the socket is available
pthread_cond_t iwCond;
// The following fields are used to determine priority between waiters
MType mt; // Original msgType (not mtReply or mtReplyErr)
MsgId msgId; // Message identifier (earlier messages are lower)
unsigned int datalen; // Length of the message
InuseWaiter(); // Default constructor - not defined
InuseWaiter(MType tmt, MsgId tmsgId, unsigned int tdatalen);
};
// Function object to compare two InuseWaiter entries to determine which
// has higher priority
struct InuseCmp
{ bool operator()(const InuseWaiter *w1P, const InuseWaiter *w2P) const; };
// An RDMA port specification (device name, port number, fabric number),
// from RDMAPORTS command option
struct RdmaPortName
{
string devName;
int rport; // Negative value matches any port number
int fabnum; // Not included in compares
RdmaPortName() : rport(-1), fabnum(0) {}
RdmaPortName(string dev, int p, int f) : devName(dev), rport(p), fabnum(f) {}
// Comparison operator for inserting port names into sets. The fabric
// number isn't used since a port name must be on only one fabric.
bool operator<(const RdmaPortName &p) const
{
if (devName != p.devName)
return devName < p.devName;
if (rport != p.rport && rport >= 0 && p.rport >= 0)
return rport < p.rport;
return false;
}
};
// Information about an RDMA port. These objects are exchanged among nodes
// so that they know where to connect.
struct RdmaPortInfo
{
string piName; // Device name
int piPort; // Port number within device
int piFabnum; // Fabric number
UInt64 piPortIf; // Interface ID
IpAddr piAddr; // IP address (CM only)
UInt16 piCmPort; // Port number (CM only)
RdmaPortInfo() : piPort(0), piFabnum(0), piPortIf(0), piCmPort(0) {}
RdmaPortInfo(const RdmaPort *rportP);
unsigned int calcPortInfoLen() const;
void putBuff(DataBuff *dbP) const;
void getBuff(DataBuff *dbP);
string toString() const;
// Comparison operator for inserting port names into sets. The fabric
// number isn't used since a port name must be on only one fabric.
bool operator<(const RdmaPortInfo &p) const
{
if (piName != p.piName)
return piName < p.piName;
if (piPort != p.piPort && piPort >= 0 && p.piPort >= 0)
return piPort < p.piPort;
return false;
}
// For sorting ports by fabric number, name, and port number
static bool comp(const RdmaPortInfo *p1P, const RdmaPortInfo *p2P)
{
if (p1P->piFabnum != p2P->piFabnum)
return p1P->piFabnum < p2P->piFabnum;
if (p1P->piName != p2P->piName)
return p1P->piName < p2P->piName;
return p1P->piPort < p2P->piPort;
}
};
// TCP connection
class TcpConn
{
pthread_mutex_t connMutex;
pthread_cond_t connCond;
priority_queue<InuseWaiter *, vector<InuseWaiter *>, InuseCmp> waiters;
int refCount;
Sock tcSock;
IpAddr dest;
bool inuse; // Serializer for senders
bool broken; // True if socket was shut down due to error
int cnum; // TCP connection number
Histogram connHist; // Histogram of response times from last test
Histogram connLat; // Histogram of latency times from last test
static int nextCnum; // Next connection number (protected by globalMutex)
#ifdef RDMA
int lastConnNdx; // Hint about which connection was used last
int nRconns; // Count of entries in rconnTab
vector<RdmaConn *>rconnTab; // RDMA connections
list<RdmaAddr>remoteBuffs; // RDMA buffers on remote side for write tests
list<char *>givenBuffs; // RDMA buffers that we have given out
#endif
// Receiver thread state
RState recvState;
char *recvP; // Receive pointer
int recvlen; // Length of data received so far
int recvmax; // Size of receive buffer
RcvMsg *recvMsgP; // Message will be received here
public:
TcpConn(); // Default constructor - not defined
TcpConn(Sock tsock, IpAddr tdest);
~TcpConn();
string destName() const;
Histogram *getHistP() { return &connHist; }
Histogram *getLatP() { return &connLat; }
bool isBroken() const { return broken; }
int getCnum() const { return cnum; }
Sock getSock() const { return tcSock; }
IpAddr getDest() const { return dest; }
MsgId assignMsgId();
void holdConn();
void releaseConn();
void connShutdown();
void receiveDone();
Errno receiverEvent();
void gotMsg(RcvMsg *rmsgP);
void getSourceAddr(MsgId msgId, char **srcAddrPP, unsigned int *srcLenP);
Errno recvMessage();
Errno sendMessage(MType mt, DataBuff *dbP, MsgRecord *mrP,
PollWait *pwaitP = NULL, TimeLine *timeLine = new TimeLine());
Errno sendit(MType mt, MType origmt, MsgId msgId, DataBuff *mdbP,
MsgRecord *mrP, PollWait *pwaitP, TimeLine *timeLine = new TimeLine());
#ifdef RDMA
int getNRconns() const { return nRconns; }
string rdmaClientConnect(const set<RdmaPortInfo> *remotePortsP);
void rdmaServerConnect(RcvMsg *rmsgP);
void rdmaSendCMDiscReq();
void rdmaRecvCMDiscReq(RcvMsg *rmsgP);
void rdmaDisconnect();
void rdmaCleanup();
void rdmaWrite(DataBuff *testBuffP, RdmaAddr raddr, UInt32 rlen,
PollWait *pwaitP);
void rdmaRead(RdmaAddr raddr, UInt32 rlen, char *dataP, PollWait *pwaitP);
void rdmaSend(DataBuff *dbP, PollWait *pwaitP);
void rdmaGiven(char *buffP) { givenBuffs.push_back(buffP); }
RdmaAddr getRemoteBuff();
void freeRemoteBuff(RdmaAddr rBuff);
RdmaConn *chooseRconnP();
#endif
};
#ifdef RDMA
// Unique key to locate TcpConn object for incoming RDMA connection
// manager events.
struct ConnKey
{
int cnum; // TCP connection number
IpAddr iaddr; // IP address
IpAddr saddr;
ConnKey() : cnum(0) {}
ConnKey(int c, const IpAddr &ia, const IpAddr &sa) : cnum(c), iaddr(ia), saddr(sa) {}
bool operator<(const ConnKey &k) const
{
if (cnum != k.cnum)
return cnum < k.cnum;
if (iaddr != k.iaddr)
return iaddr < k.iaddr;
return saddr < k.saddr;
}
};
static const char *ibv_wc_status_str_nsdperf(enum ibv_wc_status status);
static const char *ibv_wr_opcode_str(enum ibv_wr_opcode opcode);
// RDMA connection
class RdmaConn
{
pthread_mutex_t cmMutex; // For waiting on CM events
pthread_cond_t cmCond;
rdma_cm_event *cmEventP; // To pass event between handler and waiter
int cmWaiting; // Count of threads waiting for a CM event
bool cmBroken; // True if forcibly disconnecting
rdma_cm_id *cmId; // Communications identifier for CM
ibv_qp *qp; // Queue pair
unsigned int maxInline; // Maximum bytes that can be sent inline
UInt32 rkey; // Remote memory key
UInt32 llid, rlid; // Local and remote lid
Int32 remoteNdx; // Index into rconnTab on remote node
list<PollWait *>pwList; // Receive requests
TcpConn *connP; // TCP connection that owns this object
int rconnNdx; // Index into rconnTab for this connection
RdmaPort *rdmaPortP; // The RDMA port
RdmaPortInfo remotePinfo; // Information about the port at the other end
pthread_mutex_t bytesMutex; // Mutex for bytesPending
UInt64 bytesPending; // Bytes in flight (protected by bytesMutex)
public:
RdmaConn(TcpConn *tconnP, int ndx);
~RdmaConn();
Int32 rdGetRemoteNdx() const { return remoteNdx; }
rdma_cm_id *rdWaitForCMConn(const ConnKey *ckeyP,
int *responder_resourcesP,
int *initiator_depthP);
void rdCMListen();
string rdConnInfo() const;
string rdPrepClient(TcpConn *connP, RdmaPort *rportP,
const RdmaPortInfo *destPortInfoP);
string rdPrepServer(RcvMsg *rmsgP, TcpConn *connP, DataBuff *dbP);
void rdPrepPost(TcpConn *connP);
void rdPostRecv(PollWait *pwaitP);
void rdConnect(UInt32 qpnum, UInt32 maxQpRd, const char *whoP);
void rdDisconnectCM(string name);
void rdDisconnect();
void rdCleanup();
void rdWrite(DataBuff *testBuffP, RdmaAddr raddr, UInt32 rlen, PollWait *pwaitP);
void rdRead(RdmaAddr raddr, UInt32 rlen, char *dataP, PollWait *pwaitP);
void rdSend(DataBuff *dbP, PollWait *pwaitP);
void rdRecv(PollWait *pwaitP, unsigned int len);
void rdHandleCMEvent(rdma_cm_event *eventP);
string rdCheckCMEvent(const string func, enum rdma_cm_event_type expectedEv,
enum rdma_cm_event_type errEv);
void rdAddBytes(UInt64 nBytes);
void rdSubBytes(UInt64 nBytes);
UInt64 rdGetBytesPending() const { return bytesPending; };
};
#endif // RDMA
// Target node for test
struct Target
{
string hostname;
IpAddr iaddr; // IP address
TcpConn *connP; // TCP connection (null if not connected)
set<RdmaPortInfo> remPinfo; // RDMA ports on remote node
bool isClient; // True if this is a client node
bool active; // For detecting inactive Targets
bool didAlloc, didConnect; // Used by test command to track progress
Target(); // Default constructor - not defined
Target(const Target &m); // Copy constructor - not defined
Target(const string thostname, const IpAddr tiaddr) :
hostname(thostname), iaddr(tiaddr), connP(NULL), isClient(false),
active(true), didAlloc(false), didConnect(false) {}
~Target();
string makeConnection();
int calcConnectionCount() const;
RcvMsg *sendm(MType mt, DataBuff *dbP = NULL, PollWait *pwaitP = NULL,
char *srcAddrP = NULL, unsigned int srcLen = 0, TimeLine *timeline = new TimeLine());
string name() const;
};
// For sorting Target objects for round-robin scheduling
struct SortedTarget
{
int tindex;
Target *targP;
SortedTarget(int ndx, Target *tP) : tindex(ndx), targP(tP) {}
static bool comp(const SortedTarget t1, const SortedTarget t2)
{
if (t1.tindex != t2.tindex) return t1.tindex < t2.tindex;
return t1.targP->iaddr < t2.targP->iaddr;
}
};
// Structure to hold received messages
struct RcvMsg
{
TcpConn *connP; // The connection that this message came from
char hdr[MSG_HDRSIZE]; // Buffer for receiving message header
DataBuff msgBuff; // Buffer for message data
MsgId msgId; // Message identifier
MType msgType; // Message type
TimeLine *timeLine; // Record time line for each message
int rconnNdx; // Which RDMA connection the message came in on
string errText; // Error message, or empty if no error
RcvMsg(); // Default constructor - not defined
RcvMsg(const RcvMsg &m); // Copy constructor - not defined
RcvMsg(TcpConn *tconnP)
{
connP = tconnP;
memset(hdr, 0, MSG_HDRSIZE);
msgId = 0;
msgType = mtUnknown;
rconnNdx = -1;
timeLine = new TimeLine();
}
~RcvMsg() {}
char *msgBuffP() { return msgBuff.getBuffP(); }
unsigned int msgLen() const { return msgBuff.getBufflen(); }
void sendReply(DataBuff *dbP, string errText = "", PollWait *pwaitP = NULL);
bool showError();
bool startAdminReq();
void endAdminReq();
void dispatch(MsgWorker *mwP);
// Message handlers
void handleVersion();
void handleKill();
void handleWrite(MsgWorker *mwP);
void handleRdmaWrite(MsgWorker *mwP);
void handleRead(MsgWorker *mwP);
void handleNwrite(MsgWorker *mwP);
void handleGetdata(MsgWorker *mwP);
void handleConnect();
void handleReset();
void handleRdmaDone();
void handleRdmaConn();
void handleRdmaGetBuffs();
void handleRdmaDisconnCM();
void handleRdmaDisconn();
void handleRdmaCleanup();
void handleParms();
void handleAlloc();
void handleFree();
void handleTest();
void handleStatus();
void handleIdlePct();
};
// This object is used to wait for pending replies
class MsgRecord
{
pthread_mutex_t waitMutex;
pthread_cond_t waitCond;
set<MsgId> waitTab; // IDs of pending messages
list<RcvMsg *> replies; // Reply data
public:
char *srcAddrP; // Data buffer for GetData requests
unsigned int srcLen; // Length of the above buffer
MsgRecord(char *tsrcAddrP = NULL, unsigned int tsrcLen = 0);
~MsgRecord();
void addMsg(MsgId msgId);
void waitForReplies();
bool checkReplies();
void gotReply(RcvMsg *rmsgP);
RcvMsg *nextReply();
};
// An object for waiting on RDMA I/O requests
class PollWait
{
#ifdef RDMA
pthread_mutex_t pwMutex;
pthread_cond_t pwCond;
bool complete;
public:
char *srvBuffP;
char *cliBuffP;
UInt64 opId;
UInt32 buffLen;
ibv_wr_opcode opcode;
enum ibv_wc_status status;
int tid;
char *mbufP; // Registered message buffer (optional)
RdmaConn *rconnP; // Connection that message came in on
// (only used for receive requests)
PollWait() { init(); }
PollWait(RdmaConn *rP);