Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: goroutine leak in publisher on close #198

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

magictucura
Copy link

Hello,

Closing publisher did not close all associated goroutines. The routine listening for block events of connection only got terminated on closing the connection.
The wrapping connection manager holds all blocking channels and only passes down an universal blocking channel now. If a signal reaches this channel it is broadcasted to all blocking channels. This allows telling the connection manager to remove and close the channel if publisher is closed which will terminate the startNotifyBlockedHandler method

Closes #149

So i moved to blocked handling of channels into the connection manager but i'm unsure if it is ok to use a second read write mutex for the blockings channel array.

- closing publisher did not close all associated goroutines. The
routine listening for block events of connection only got terminated
on closing the connection.
The wrapping connection manager holds all blocking channels and only
passes down an universal blocking channel now. If a signal reaches this
channel it is broadcasted to all blocking channels. This allows telling
the connection manager to remove and close the channel if publisher is
closed.

Closes wagslane#149
@magictucura
Copy link
Author

magictucura commented Dec 16, 2024

pretty ugly script for testing, but maybe helpful

package main

import (
	"fmt"
	"github.com/wagslane/go-rabbitmq"
	"golang.org/x/net/context"
	"os"
	"os/signal"
	"runtime"
	"sync"
	"syscall"
	"time"
)

var rabbitConn *rabbitmq.Conn

func main() {
	lifetime, cancel := context.WithCancel(context.Background())
	defer cancel()
	wg := new(sync.WaitGroup)

	rc, err := connect("dot", "test", "fd00:dead:beef::1:1006", 5672)
	if err != nil {
		panic(err)
	}
	rabbitConn = rc

	subLifetime, subLifetimeCancel := context.WithCancel(lifetime)

	info := make(chan string)
	go func() {
		for {
			select {
			case _ = <-info:
				// fmt.Printf("msg:%s routine-count: %d\n", msg, runtime.NumGoroutine())
			}
		}
	}()
	info <- "info routine started"

	for id := range 1000 {
		createAndDoFunnyStuff(subLifetime, info, id)
	}

	fmt.Printf("routines after startup: %d\n", runtime.NumGoroutine())

	go func() {
		info <- "signal routine started"
		c := make(chan os.Signal, 1)
		signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
		<-c
		subLifetimeCancel()
	}()

	info <- "startup finished"
	<-subLifetime.Done()
	// printStack()
	fmt.Println("final goroutine count before cancel of lifetime", runtime.NumGoroutine())
	cancel()
	wg.Wait()
	rabbitConn.Close()
	fmt.Println("final goroutine count before shutting down", runtime.NumGoroutine())
}

func createAndDoFunnyStuff(lifetime context.Context, info chan string, id int) {
	exchangeName := fmt.Sprintf("best-exchange-eu-%d", id)
	w, err := createPublisher(exchangeName)
	if err != nil {
		panic(err)
	}
	info <- "writer created"

	c, err := createConsumer(exchangeName, id)
	if err != nil {
		panic(err)
	}
	info <- "reader created"

	go func() {
		info <- "publisher routine started"
		t := time.NewTicker(1 * time.Second)
		count := 0
		for {
			select {
			case <-t.C:
				// fmt.Println("sending event on the way", runtime.NumGoroutine())
				b := []byte("hello world")
				err := w.PublishWithContext(lifetime, b, []string{exchangeName}, rabbitmq.WithPublishOptionsExchange(exchangeName))
				if err != nil {
					panic(err)
				}
				count++
				if count > 3 {
					t.Stop()
					w.Close()
					c.Close()
					info <- "finished producer and consumer"
					return
				}
			case <-lifetime.Done():
				return
			}
		}
	}()
}
func printStack() {
	buf := make([]byte, 1<<16)
	stackSize := runtime.Stack(buf, true)
	fmt.Println(string(buf[0:stackSize]))
}

func createPublisher(exchangeName string) (*rabbitmq.Publisher, error) {
	notificationPublisher, err := rabbitmq.NewPublisher(
		rabbitConn,
		rabbitmq.WithPublisherOptionsExchangeName(exchangeName),
		rabbitmq.WithPublisherOptionsExchangeKind("fanout"),
		rabbitmq.WithPublisherOptionsExchangeDeclare,
	)
	if err != nil {
		return nil, fmt.Errorf("create writer publisher: %w", err)
	}

	return notificationPublisher, nil
}

func createConsumer(exchangeName string, id int) (*rabbitmq.Consumer, error) {
	c, err := rabbitmq.NewConsumer(
		rabbitConn,
		fmt.Sprintf("best-queue-eu-%d", id),
		rabbitmq.WithConsumerOptionsExchangeName(exchangeName),
		rabbitmq.WithConsumerOptionsExchangeKind("fanout"),
		rabbitmq.WithConsumerOptionsQueueAutoDelete,
		rabbitmq.WithConsumerOptionsExchangeDeclare,
		rabbitmq.WithConsumerOptionsBinding(rabbitmq.Binding{
			RoutingKey: exchangeName,
			BindingOptions: rabbitmq.BindingOptions{
				NoWait:  false,
				Args:    nil,
				Declare: true,
			},
		}),
	)
	if err != nil {
		return nil, fmt.Errorf("create notification consumer: %w", err)
	}

	go func() {
		err = c.Run(read)
		if err != nil {
			panic(err)
		}
		// fmt.Println("run is finished it seems, so goroutine will be discarded")
	}()
	return c, nil
}

func read(delivery rabbitmq.Delivery) (action rabbitmq.Action) {
	// fmt.Println("something came in:", string(delivery.Body))
	return rabbitmq.Ack
}

func connect(user, password, host string, port int) (*rabbitmq.Conn, error) {
	url := fmt.Sprintf("amqp://%s:%s@%s:%d/", user, password, host, port)
	cp, err := rabbitmq.NewConn(url)
	if err != nil {
		return nil, fmt.Errorf("create connection to rabbitmq: %w", err)
	}
	return cp, nil
}

Wait around 5-6 seconds to let message be send and publisher and consumer to be closed.
After stop you should see the remaining goroutines.
E.g in my case:

without the fix:

final goroutine count before cancel of lifetime 1008
final goroutine count before shutting down 4

with fix:

final goroutine count before cancel of lifetime 9
final goroutine count before shutting down 4

In the script itself are some prints commented, if you need more tracing, e.g. the finaly stack trace after all publisher and consumer got stopped, which shows the hanging goroutines

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

memory leak
1 participant