diff --git a/README.md b/README.md index 4863497..8d1a47e 100644 --- a/README.md +++ b/README.md @@ -256,13 +256,13 @@ Goals The goals of the Roger, Rabbit package are as follows: -- **Offer a drop-in replacement for streadway/amqp**: APIs may be extended (adding +- **Offer a Drop-in Replacement for streadway/amqp**: APIs may be extended (adding fields to `amqp.Config` or additional methods to `*amqp.Channel`, for instance) but must not break existing code unless absolutely necessary. -- **Add as few additional error paths as possible**: Errors may be *extended* with +- **Add as few Additional Error Paths as Possible**: Errors may be *extended* with additional information concerning disconnect scenarios, but new error type returns - from *Connection or *Channel should be an absolute last resort. + from `*Connection` or `*amqp.Channel` should be an absolute last resort. - **Be Highly Extensible**: Roger, Rabbit seeks to offer a high degree of extensibility via features like middleware, in an effort to reduce the balkanization of amqp client @@ -271,19 +271,47 @@ The goals of the Roger, Rabbit package are as follows: Current Limitations & Warnings ------------------------------ -- **Performance**: Roger, Rabbit's implementation is handled primarily through - middlewares, and a *sync.RWMutex on transports that handles blocking methods on - reconnection events. This increases the overhead on each call, but allows for an - enormous amount of extensibility and robustness, but may be a limiting factor for - applications that need the absolute maximum throughput possible. +- **Performance**: Roger, Rabbit has not been extensively benchmarked against + `streadway/amqp`. To see preliminary benchmarks, take a look at the next section. -- **Transaction Support**: Roger, Rabbit does not currently support AMQP Transactions, +- **Transaction Support**: Roger, Rabbit does not currently support amqp Transactions, as the author does not use them. Draft PR's with possible implementations are welcome! - **Reliability**: While the author uses this library in production, it is still early days, and more battle-testing will be needed before this library is promoted to version 1.0. PR's are welcome for Bug Fixes, code coverage, or new features. +Benchmarks +---------- + +Because of Roger, Rabbit's middleware-driven design, some overhead is expected vs +streadway proper. However, initial benchmarks are promising, and show only minimal +impact. For most applications, the overhead cost is likely worth the cost for ease of +development and flexibility. + +Still, if absolute peak throughput is critical to an application, a less general and +more tailored approach may be warranted. + +Benchmarks can be found in `./amqp/benchmark_test.go`. + +Machine: Intel(R) Core(TM) i9-8950HK CPU @ 2.90GHz + + +| OPERATION | LIB | EXECUTIONS | NS/OP | COMPARISON +| -------------------|------|-------------|------------|------------ +| QueueInspect | sw | 2,838 | 812,594 | -- +| | rr | 2,470 | 813,269 | +0.1% +| Publish | sw | 7,4559 | 28,882 | -- +| | rr | 7,0665 | 30,031 | +4.0% +| Publish & Confirm | sw | 3,4528 | 59,703 | -- +| | rr | 3,5481 | 62,198 | +4.2% + + +The above numbers were calculated by running each benchmark 4 times, then taking the +fastest result for each library. + +The benchmarks were run with the following command: + Acknowledgements ---------------- diff --git a/amqp/benchmarks_test.go b/amqp/benchmarks_test.go new file mode 100644 index 0000000..2f5f39b --- /dev/null +++ b/amqp/benchmarks_test.go @@ -0,0 +1,286 @@ +package amqp_test + +import ( + "github.com/peake100/rogerRabbit-go/amqp" + "github.com/peake100/rogerRabbit-go/amqptest" + streadway "github.com/streadway/amqp" + "sync" + "testing" + "time" +) + +var msgBody = []byte("some message") + +func BenchmarkComparison_QueueInspect_Streadway(b *testing.B) { + channel := dialStreadway(b) + queue := setupQueue(b, channel) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := channel.QueueInspect(queue.Name) + if err != nil { + b.Fatalf("error getting queue info: %v", err) + } + } +} + +func BenchmarkComparison_QueueInspect_Roger(b *testing.B) { + channel := dialRoger(b) + queue := setupQueue(b, channel) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := channel.QueueInspect(queue.Name) + if err != nil { + b.Fatalf("error getting queue info: %v", err) + } + } +} + +func BenchmarkComparison_QueuePublish_Streadway(b *testing.B) { + channel := dialStreadway(b) + queue := setupQueue(b, channel) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + err := channel.Publish( + "", + queue.Name, + true, + false, + amqp.Publishing{ + Body: msgBody, + }, + ) + if err != nil { + b.Fatalf("error publishing message: %v", err) + } + } +} + +func BenchmarkComparison_QueuePublish_Roger(b *testing.B) { + channel := dialRoger(b) + queue := setupQueue(b, channel) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + err := channel.Publish( + "", + queue.Name, + true, + false, + amqp.Publishing{ + Body: msgBody, + }, + ) + if err != nil { + b.Fatalf("error publishing message: %v", err) + } + } +} + +func BenchmarkComparison_QueuePublishConfirm_Streadway(b *testing.B) { + channel := dialStreadway(b) + queue := setupQueue(b, channel) + + err := channel.Confirm(false) + if err != nil { + b.Fatalf("error putting channel into confrimation mode") + } + + confirmations := make(chan amqp.BasicConfirmation, 100) + channel.NotifyPublish(confirmations) + + done := new(sync.WaitGroup) + done.Add(2) + + errPublish := make(chan error, 1) + + b.ResetTimer() + go func() { + defer done.Done() + + for i := 0; i < b.N; i++ { + err := channel.Publish( + "", + queue.Name, + true, + false, + amqp.Publishing{ + Body: msgBody, + }, + ) + if err != nil { + b.Errorf("error publishing message: %v", err) + errPublish <- err + return + } + } + }() + + go func() { + defer done.Done() + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + + for i := 0; i < b.N; i++ { + timer.Reset(5 * time.Second) + select { + case confirm := <-confirmations: + if !confirm.Ack { + b.Errorf( + "publication nacked for tag %v", confirm.DeliveryTag, + ) + return + } + case <-errPublish: + b.Errorf("error publishing. aborting confirmations") + return + case <-timer.C: + b.Errorf("timeout on confirmation %v", b.N) + return + } + } + }() + + done.Wait() +} + +func BenchmarkComparison_QueuePublishConfirm_Roger(b *testing.B) { + channel := dialRoger(b) + queue := setupQueue(b, channel) + + err := channel.Confirm(false) + if err != nil { + b.Fatalf("error putting channel into confrimation mode") + } + + confirmations := make(chan amqp.Confirmation, 100) + channel.NotifyPublish(confirmations) + + done := new(sync.WaitGroup) + done.Add(2) + + errPublish := make(chan error, 1) + + b.ResetTimer() + go func() { + defer done.Done() + + for i := 0; i < b.N; i++ { + err := channel.Publish( + "", + queue.Name, + true, + false, + amqp.Publishing{ + Body: msgBody, + }, + ) + if err != nil { + b.Errorf("error publishing message: %v", err) + errPublish <- err + return + } + } + }() + + go func() { + defer done.Done() + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + + for i := 0; i < b.N; i++ { + timer.Reset(5 * time.Second) + select { + case confirm := <-confirmations: + if !confirm.Ack { + b.Errorf( + "publication nacked for tag %v", confirm.DeliveryTag, + ) + return + } + case <-errPublish: + b.Errorf("error publishing. aborting confirmations") + return + case <-timer.C: + b.Errorf("timeout on confirmation %v", b.N) + return + } + } + }() + + done.Wait() +} + +// dialStreadway gets a streadway Connection +func dialStreadway(b *testing.B) *amqp.BasicChannel { + conn, err := streadway.Dial(amqptest.TestDialAddress) + if err != nil { + b.Fatalf("error dialing connection") + } + b.Cleanup(func() { + conn.Close() + }) + + channel, err := conn.Channel() + if err != nil { + b.Fatalf("error getting channel: %v", err) + } + return channel +} + +func dialRoger(b *testing.B) *amqp.Channel { + conn, err := amqp.Dial(amqptest.TestDialAddress) + if err != nil { + b.Fatalf("error dialing connection") + } + b.Cleanup(func() { + conn.Close() + }) + + channel, err := conn.Channel() + if err != nil { + b.Fatalf("error getting channel: %v", err) + } + return channel +} + +func setupQueue(b *testing.B, channel OrganizesQueues) amqp.Queue { + queue, err := channel.QueueDeclare( + "benchmark_queue_inspect", + false, + true, + false, + false, + nil, + ) + if err != nil { + b.Fatalf("error getting queue: %v", err) + } + + _, err = channel.QueuePurge(queue.Name, false) + if err != nil { + b.Fatalf("error purging queue: %v", err) + } + + // Delete the queue on the way out. + b.Cleanup(func() { + channel.QueueDelete(queue.Name, false, false, false) + }) + + return queue +} + +// publishesAndConfirms is used to run the publish anc confirm test. +type OrganizesQueues interface { + QueueDeclare( + name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table, + ) (queue amqp.Queue, err error) + QueuePurge(name string, noWait bool) (count int, err error) + QueueDelete( + name string, ifUnused, ifEmpty, noWait bool, + ) (count int, err error) +} diff --git a/amqp/channel.go b/amqp/channel.go index 812a729..84c23ed 100644 --- a/amqp/channel.go +++ b/amqp/channel.go @@ -65,7 +65,7 @@ func (channel *Channel) transportType() amqpmiddleware.TransportType { // streadway.Channel as a livesOnce interface func (channel *Channel) underlyingTransport() livesOnce { // Grab the lock and only release it once we have moved the pointer for the current - // channel into a variable. We don't want it switching out from under us as we + // channel into a variable. We don'tb want it switching out from under us as we // return. channel.underlyingChannelLock.Lock() defer channel.underlyingChannelLock.Unlock() @@ -104,7 +104,7 @@ func (channel *Channel) tryReconnect( } var result amqpmiddleware.ResultsChannelReconnect - result, err = channel.handlers.channelReconnect(channel.ctx, ags) + result, err = channel.handlers.channelReconnect(ctx, ags) if err != nil { return } @@ -246,7 +246,7 @@ func (channel *Channel) Flow(active bool) error { /* QueueDeclare declares a queue to hold messages and deliver to consumers. -Declaring creates a queue if it doesn't already exist, or ensures that an +Declaring creates a queue if it doesn'tb already exist, or ensures that an existing queue matches the same parameters. Every queue declared gets a default binding to the empty exchange "" which has @@ -895,7 +895,7 @@ Delivery.Ack on the returned delivery when you have fully processed this delivery. When autoAck is true, the server will automatically acknowledge this message so -you don't have to. But if you are unable to fully process this message before +you don'tb have to. But if you are unable to fully process this message before the channel or connection is closed, the message will not get requeued. --- @@ -1156,7 +1156,7 @@ func (channel *Channel) NotifyPublish( // and NotifyConfirmOrOrphaned. func notifyConfirmCloseConfirmChannels(tagChannels ...chan uint64) { mainLoop: - // Iterate over the channels and close them. We'll need to make sure we don't close + // Iterate over the channels and close them. We'll need to make sure we don'tb close // the same channel twice. for i, thisChannel := range tagChannels { // Whenever we get a new channel, compare it against all previously closed @@ -1401,7 +1401,7 @@ func (tester *ChannelTesting) ConnTest() *TransportTesting { blocks := int32(0) return &TransportTesting{ - t: tester.t, + tb: tester.tb, manager: &tester.channel.rogerConn.transportManager, blocks: &blocks, } @@ -1430,25 +1430,25 @@ func (tester *ChannelTesting) GetMiddlewareProvider( ) amqpmiddleware.ProvidesMiddleware { provider, ok := tester.channel.handlers.providers[id] if !ok { - tester.t.Errorf("no channel middleware provider %v", id) - tester.t.FailNow() + tester.tb.Errorf("no channel middleware provider %v", id) + tester.tb.FailNow() } return provider } // Test returns an object with methods for testing the Channel. -func (channel *Channel) Test(t *testing.T) *ChannelTesting { +func (channel *Channel) Test(tb testing.TB) *ChannelTesting { blocks := int32(0) chanTester := &ChannelTesting{ TransportTesting: &TransportTesting{ - t: t, + tb: tb, manager: &channel.transportManager, blocks: &blocks, }, channel: channel, } - t.Cleanup(chanTester.cleanup) + tb.Cleanup(chanTester.cleanup) return chanTester } diff --git a/amqp/channelHandlersBuilder.go b/amqp/channelHandlersBuilder.go index f78fd1b..7493683 100644 --- a/amqp/channelHandlersBuilder.go +++ b/amqp/channelHandlersBuilder.go @@ -449,7 +449,7 @@ func (builder channelHandlerBuilder) createConsume() amqpmiddleware.HandlerConsu noLocal: args.NoLocal, noWait: args.NoWait, args: args.Args, - // Make a buffered channel so we don't cause latency from waiting for queues + // Make a buffered channel so we don'tb cause latency from waiting for queues // to be ready callerDeliveryChan: make(chan datamodels.Delivery, 16), } diff --git a/amqp/channel_test.go b/amqp/channel_test.go index 88dafe4..2b8efb5 100644 --- a/amqp/channel_test.go +++ b/amqp/channel_test.go @@ -1792,7 +1792,7 @@ func (suite *ChannelMethodsSuite) Test0370_NotifyFlow() { flowEvents := make(chan bool, 2) suite.ChannelConsume().NotifyFlow(flowEvents) - // Check that we don't get flow notifications right off the bat + // Check that we don'tb get flow notifications right off the bat select { case <-flowEvents: suite.T().Error("got flow event") diff --git a/amqp/config.go b/amqp/config.go index f567c24..714a955 100644 --- a/amqp/config.go +++ b/amqp/config.go @@ -77,6 +77,6 @@ func DefaultConfig() Config { return Config{ Heartbeat: defaultHeartbeat, Locale: defaultLocale, - DefaultLoggerLevel: zerolog.InfoLevel, + DefaultLoggerLevel: zerolog.ErrorLevel, } } diff --git a/amqp/connection.go b/amqp/connection.go index d41b94b..77b9331 100644 --- a/amqp/connection.go +++ b/amqp/connection.go @@ -62,7 +62,7 @@ func (conn *Connection) transportType() amqpmiddleware.TransportType { // streadway.Connection as a livesOnce interface. func (conn *Connection) underlyingTransport() livesOnce { // Grab the lock and only release it once we have moved the pointer for the current - // connection into a variable. We don't want it switching out from under us as we + // connection into a variable. We don'tb want it switching out from under us as we // return. conn.underlyingConnLock.Lock() defer conn.underlyingConnLock.Unlock() @@ -183,12 +183,12 @@ func (conn *Connection) Channel() (*Channel, error) { // Test returns a ConnectionTesting object with a number of helper methods for testing // Connection objects. -func (conn *Connection) Test(t *testing.T) *ConnectionTesting { +func (conn *Connection) Test(tb testing.TB) *ConnectionTesting { blocks := int32(0) return &ConnectionTesting{ conn: conn, TransportTesting: TransportTesting{ - t: t, + tb: tb, manager: &conn.transportManager, blocks: &blocks, }, diff --git a/amqp/connection_test.go b/amqp/connection_test.go index a1afead..0537c13 100644 --- a/amqp/connection_test.go +++ b/amqp/connection_test.go @@ -394,7 +394,7 @@ func TestConnection_IsClosed(t *testing.T) { } func() { - // grab a lock on the livesOnce so we don't auto-reconnectMiddleware + // grab a lock on the livesOnce so we don'tb auto-reconnectMiddleware connTester.BlockReconnect() // release the lock to let the connection reconnectMiddleware diff --git a/amqp/eventRelayNotifyFlow.go b/amqp/eventRelayNotifyFlow.go index 2cfc75b..4d66d11 100644 --- a/amqp/eventRelayNotifyFlow.go +++ b/amqp/eventRelayNotifyFlow.go @@ -72,7 +72,7 @@ func (relay *notifyFlowRelay) RunRelayLeg(legNum int) (done bool) { } // Turn flow to false on broker disconnection if the roger channel has not been - // closed and the last notification sent was a ``true`` (we don't want to send two + // closed and the last notification sent was a ``true`` (we don'tb want to send two // false values in a row). if relay.ChannelCtx.Err() == nil && relay.lastEvent { relay.handler( diff --git a/amqp/streadwayAliases.go b/amqp/streadwayAliases.go index ab9b5bd..9718e79 100644 --- a/amqp/streadwayAliases.go +++ b/amqp/streadwayAliases.go @@ -88,6 +88,11 @@ type ( // type is desired BasicConnection = streadway.Connection + // BasicConfirmation is an alias to streadway/amqp.Confirmation, and is made + // available to avoid having to import both amqp packages if access to the base + // Confirmation type is desired + BasicConfirmation = streadway.Confirmation + // Blocking notifies the server's TCP flow control of the Connection. When a // server hits a memory or disk alarm it will block all connections until the // resources are reclaimed. Use NotifyBlock on the Connection to receive these diff --git a/amqp/transportManager.go b/amqp/transportManager.go index 2369a56..124678c 100644 --- a/amqp/transportManager.go +++ b/amqp/transportManager.go @@ -45,7 +45,7 @@ type reconnects interface { // TestReconnectSignaler allows us to block until a reconnection occurs during a test. type TestReconnectSignaler struct { // The test we are using. - t *testing.T + tb testing.TB // reconnectSignal will close when a reconnection occurs. reconnectSignal chan struct{} @@ -70,17 +70,16 @@ func (signaler *TestReconnectSignaler) WaitOnReconnect(ctx context.Context) { select { case <-signaler.reconnectSignal: case <-ctx.Done(): - signaler.t.Error( - "context cancelled before reconnection occurred: %w", ctx.Err(), + signaler.tb.Fatalf( + "context cancelled before reconnection occurred: %v", ctx.Err(), ) - signaler.t.FailNow() } } } // TransportTesting provides testing methods for testing Channel and Connection. type TransportTesting struct { - t *testing.T + tb testing.TB manager *transportManager // The number of times a connection has been blocked from being acquired. blocks *int32 @@ -133,7 +132,7 @@ func (tester *TransportTesting) SignalOnReconnect() *TestReconnectSignaler { }() signaler := &TestReconnectSignaler{ - t: tester.t, + tb: tester.tb, reconnectSignal: reconnected, original: tester.manager.transport.underlyingTransport(), manager: tester.manager, @@ -145,8 +144,8 @@ func (tester *TransportTesting) SignalOnReconnect() *TestReconnectSignaler { // DisconnectTransport closes the underlying livesOnce to force a reconnection. func (tester *TransportTesting) DisconnectTransport() { err := tester.manager.transport.underlyingTransport().Close() - if !assert.NoError(tester.t, err, "close underlying livesOnce") { - tester.t.FailNow() + if !assert.NoError(tester.tb, err, "close underlying livesOnce") { + tester.tb.FailNow() } } @@ -261,7 +260,7 @@ func isRepeatErr(err error) bool { return false } -// revive:disable:context-as-argument - we have two contexts here, they can't both be first. +// revive:disable:context-as-argument - we have two contexts here, they can'tb both be first. // retryOperationOnClosedSingle attempts a Connection or Channel channel method a single // time. @@ -288,7 +287,7 @@ func (manager *transportManager) retryOperationOnClosedSingle( // occur at the same time, but blocks the connection from being switched // out until the operations resolve. // - // We don't need to worry about lock contention, as once the livesOnce + // We don'tb need to worry about lock contention, as once the livesOnce // reconnection routine requests the lock, and new read acquisitions will // be blocked until the lock is acquired and released for write. manager.transportLock.RLock() @@ -333,7 +332,7 @@ func (manager *transportManager) retryOperationOnClosed( // // We'll give one immediate retry, but after that start increasing how long // we need to wait before re-attempting. - waitDur := 5 * time.Millisecond * time.Duration(attempt-1) + waitDur := time.Second / 2 * time.Duration(attempt-1) if waitDur > maxWait { waitDur = maxWait } @@ -498,7 +497,7 @@ func (manager *transportManager) IsClosed() bool { // Test methods for the livesOnce func (manager *transportManager) Test(t *testing.T) *TransportTesting { return &TransportTesting{ - t: t, + tb: t, manager: manager, } } diff --git a/amqp/transportManagerHandlersBuilder.go b/amqp/transportManagerHandlersBuilder.go index e134afc..124a197 100644 --- a/amqp/transportManagerHandlersBuilder.go +++ b/amqp/transportManagerHandlersBuilder.go @@ -192,7 +192,7 @@ func (builder transportHandlersBuilder) createClose() amqpmiddleware.HandlerClos manager.notificationSubscriberLock.Lock() defer manager.notificationSubscriberLock.Unlock() - // Close all disconnect and connect subscribers, then clear them. We don't + // Close all disconnect and connect subscribers, then clear them. We don'tb // need to grab the lock for this since the cancelled context will keep any new // subscribers from being added. for _, subscriber := range manager.notificationSubscribersDial { diff --git a/amqp/transportManagerReconnect.go b/amqp/transportManagerReconnect.go index cd6e531..4976f9d 100644 --- a/amqp/transportManagerReconnect.go +++ b/amqp/transportManagerReconnect.go @@ -4,13 +4,16 @@ import ( "context" streadway "github.com/streadway/amqp" "sync/atomic" + "time" ) // reconnectRedialOnce attempts to reconnect the livesOnce a single time. -func (manager *transportManager) reconnectRedialOnce(ctx context.Context) error { +func (manager *transportManager) reconnectRedialOnce(ctx context.Context, attempt int) error { + opCtx := context.WithValue(ctx, "opAttempt", attempt) + // Make the connection. err := manager.transport.tryReconnect( - ctx, atomic.LoadUint64(manager.reconnectCount), + opCtx, atomic.LoadUint64(manager.reconnectCount)+uint64(attempt), ) // Send a notification to all listeners subscribed to dial events. manager.sendDialNotifications(err) @@ -32,17 +35,30 @@ func (manager *transportManager) reconnectRedial( ctx context.Context, retry bool, ) error { // Endlessly redial the broker + attempt := 0 for { // Check to see if our context has been cancelled, and exit if so. if ctx.Err() != nil { return ctx.Err() } - err := manager.reconnectRedialOnce(ctx) + err := manager.reconnectRedialOnce(ctx, attempt) // If no error OR there is an error and retry is false return. if err == nil || (err != nil && !retry) { return err } + + // We don'tb want to saturate the connection with retries if we are having + // a hard time reconnecting. + // + // We'll give one immediate retry, but after that start increasing how long + // we need to wait before re-attempting. + waitDur := time.Second / 2 * time.Duration(attempt-1) + if waitDur > maxWait { + waitDur = maxWait + } + time.Sleep(waitDur) + attempt++ } } @@ -52,7 +68,7 @@ func (manager *transportManager) reconnectListenForClose(closeChan <-chan *strea // Wait for the current connection to close disconnectEvent := <-closeChan - // Lock access to the connection and don't unlock until we have reconnected. + // Lock access to the connection and don'tb unlock until we have reconnected. manager.transportLock.Lock() defer manager.transportLock.Unlock() @@ -73,7 +89,7 @@ func (manager *transportManager) reconnectListenForClose(closeChan <-chan *strea // closure. func (manager *transportManager) reconnect(ctx context.Context, retry bool) error { // This may be called directly by Dial methods. It's okay NOT to use the lock here - // since the caller won't be handed back the Connection or Channel until the initial + // since the caller won'tb be handed back the Connection or Channel until the initial // one is established. // // Once the first connection is established, reconnectListenForClose will grab diff --git a/amqptest/testUtils.go b/amqptest/testUtils.go index fe74b93..63d893e 100644 --- a/amqptest/testUtils.go +++ b/amqptest/testUtils.go @@ -5,7 +5,6 @@ package amqptest import ( "context" "github.com/peake100/rogerRabbit-go/amqp" - "github.com/stretchr/testify/assert" "testing" ) @@ -17,20 +16,21 @@ const ( // GetTestConnection creates a new connection to amqp://localhost:57018, where our // test broker will be listening. // -// t.FailNot() is called on any errors. -func GetTestConnection(t *testing.T) *amqp.Connection { - assert := assert.New(t) +// t.FailNow() is called on any errors. +func GetTestConnection(tb testing.TB) *amqp.Connection { conn, err := amqp.DialCtx(context.Background(), TestDialAddress) - if !assert.NoError(err, "dial connection") { - t.FailNow() + if err != nil { + tb.Errorf("error dialing broker: %v", err) + tb.FailNow() } - if !assert.NotNil(conn, "connection is not nil") { - t.FailNow() + if conn == nil { + tb.Errorf("connection is nil: %v", err) + tb.FailNow() } - t.Cleanup( + tb.Cleanup( func() { _ = conn.Close() }, diff --git a/makefile b/makefile index 058d464..18eb799 100644 --- a/makefile +++ b/makefile @@ -28,6 +28,10 @@ test: -python3 ./zdevelop/make_scripts/py_open_test_reports.py -docker stop rabbittest +.PHONY: bench +bench: + -go test -p 1 -count 4 -bench=. -run=Comparisons -benchtime=2s ./... + .PHONY: lint lint: -revive -config revive.toml ./... diff --git a/zdocs/source/_static/godoc.11.html b/zdocs/source/_static/godoc.11.html index b349f71..bd82bd7 100644 --- a/zdocs/source/_static/godoc.11.html +++ b/zdocs/source/_static/godoc.11.html @@ -145,6 +145,11 @@
+BasicConfirmation is an alias to streadway/amqp.Confirmation, and is made +available to avoid having to import both amqp packages if access to the base +Confirmation type is desired +
+ +type BasicConfirmation = streadway.Confirmation+ + + + + + + + + + + + + + +
▾ Example (Reconnect)
-Channel channelReconnect examples. +
Channel reconnect examples.
@@ -1139,9 +1171,9 @@QueueDeclare declares a queue to hold messages and deliver to consumers. -Declaring creates a queue if it doesn't already exist, or ensures that an +Declaring creates a queue if it doesn'tb already exist, or ensures that an existing queue matches the same parameters.
@@ -2772,7 +2804,7 @@
func (channel *Channel) Test(t *testing.T) *ChannelTesting+
func (channel *Channel) Test(tb testing.TB) *ChannelTesting
Test returns an object with methods for testing the Channel.
@@ -2982,7 +3014,7 @@func (conn *Connection) Test(t *testing.T) *ConnectionTesting+
func (conn *Connection) Test(tb testing.TB) *ConnectionTesting
Test returns a ConnectionTesting object with a number of helper methods for testing Connection objects. @@ -4627,7 +4659,7 @@
func GetTestConnection(t *testing.T) *amqp.Connection+
func GetTestConnection(tb testing.TB) *amqp.Connection
GetTestConnection creates a new connection to amqp://localhost:57018, where our test broker will be listening.
-t.FailNot() is called on any errors. +t.FailNow() is called on any errors.
diff --git a/zdocs/source/index.rst b/zdocs/source/index.rst index 87f5c1d..f079a00 100644 --- a/zdocs/source/index.rst +++ b/zdocs/source/index.rst @@ -12,6 +12,7 @@ Roger, Rabbit is broken into two packages: :maxdepth: 2 :caption: Contents: + ./overview.rst ./amqp.rst ./roger.rst @@ -180,104 +181,6 @@ Demo // Message Published and Confirmed! // Message Published and Confirmed! -Motivations ------------ - -`streadway/amqp`_, the official rabbitMQ driver for go is an excellent library with a -great API, but limited scope. By design, It offers a full implementation of the AMQP -spec, but comes with very few quality-of-life featured beyond that. From it's -documentation: - -.. code-block:: text - - Goals - - Provide a functional interface that closely represents the AMQP 0.9.1 model - targeted to RabbitMQ as a server. This includes the minimum necessary to - interact the semantics of the protocol. - - Things not intended to be supported: - - Auto reconnect and re-synchronization of client and server topologies. - - Reconnection would require understanding the error paths when the topology - cannot be declared on reconnect. This would require a new set of types and code - paths that are best suited at the call-site of this package. AMQP has a dynamic - topology that needs all peers to agree. If this doesn't happen, the behavior - is undefined. Instead of producing a possible interface with undefined - behavior, this package is designed to be simple for the caller to implement the - necessary connection-time topology declaration so that reconnection is trivial - and encapsulated in the caller's application code. - -Without a supplied way to handle reconnections, `bespoke