diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/go.mod b/go.mod index d151a0b..33a7ba1 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/gomarkdown/markdown v0.0.0-20220830015526-01a3c37d6f50 github.com/hashicorp/vault/api v1.7.2 github.com/iancoleman/strcase v0.2.0 + github.com/nats-io/nats.go v1.22.1 github.com/openware/pkg/ika v0.1.1 github.com/openware/pkg/mngapi v0.1.1 github.com/stretchr/testify v1.8.0 @@ -66,10 +67,12 @@ require ( github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.15.11 // indirect github.com/leodido/go-urn v1.2.1 // indirect github.com/mattn/go-colorable v0.1.6 // indirect github.com/mattn/go-isatty v0.0.14 // indirect github.com/mattn/go-sqlite3 v1.14.12 // indirect + github.com/minio/highwayhash v1.0.2 // indirect github.com/mitchellh/copystructure v1.0.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/go-testing-interface v1.0.0 // indirect @@ -77,6 +80,10 @@ require ( github.com/mitchellh/reflectwalk v1.0.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/nats-io/jwt/v2 v2.3.0 // indirect + github.com/nats-io/nats-server/v2 v2.9.10 // indirect + github.com/nats-io/nkeys v0.3.0 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/oklog/run v1.0.0 // indirect github.com/onsi/ginkgo v1.16.5 // indirect github.com/onsi/gomega v1.19.0 // indirect @@ -88,11 +95,12 @@ require ( github.com/ugorji/go/codec v1.2.7 // indirect github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 // indirect go.uber.org/atomic v1.9.0 // indirect - golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd // indirect + go.uber.org/automaxprocs v1.5.1 // indirect + golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be // indirect golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect - golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect + golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec // indirect golang.org/x/text v0.3.7 // indirect - golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect + golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect google.golang.org/genproto v0.0.0-20210924002016-3dee208752a0 // indirect google.golang.org/grpc v1.41.0 // indirect google.golang.org/protobuf v1.28.0 // indirect diff --git a/go.sum b/go.sum index 62f1efd..5ed09f8 100644 --- a/go.sum +++ b/go.sum @@ -242,6 +242,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c= +github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= @@ -277,6 +279,8 @@ github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27k github.com/mattn/go-sqlite3 v1.14.12 h1:TJ1bhYJPV44phC+IMu1u2K/i5RriLTPe+yc68XDJ1Z0= github.com/mattn/go-sqlite3 v1.14.12/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= +github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= github.com/mitchellh/copystructure v1.0.0 h1:Laisrj+bAB6b/yJwB5Bt3ITZhGJdqmxquMKeZ+mmkFQ= github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFWgYaq1OVrnRcwhnw= @@ -299,6 +303,16 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI= +github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= +github.com/nats-io/nats-server/v2 v2.9.10 h1:LMC46Oi9E6BUx/xBsaCVZgofliAqKQzRPU6eKWkN8jE= +github.com/nats-io/nats-server/v2 v2.9.10/go.mod h1:AB6hAnGZDlYfqb7CTAm66ZKMZy9DpfierY1/PbpvI2g= +github.com/nats-io/nats.go v1.22.1 h1:XzfqDspY0RNufzdrB8c4hFR+R3dahkxlpWe5+IWJzbE= +github.com/nats-io/nats.go v1.22.1/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA= +github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= +github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= @@ -390,6 +404,8 @@ go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/automaxprocs v1.5.1 h1:e1YG66Lrk73dn4qhg8WFSvhF0JuFQF0ERIp4rpuV8Qk= +go.uber.org/automaxprocs v1.5.1/go.mod h1:BF4eumQw0P9GtnuxxovUd06vwm1o18oMzFtK66vU6XU= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= @@ -405,11 +421,12 @@ golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= +golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd h1:XcWmESyNjXJMLahc3mqVQJcgSTDxFxhETVlfk9uGc38= -golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be h1:fmw3UbQh+nxngCAHrDCCztao/kbYFnWjoqop8dHx05A= +golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= @@ -450,6 +467,7 @@ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -474,8 +492,8 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec h1:BkDtF2Ih9xZ7le9ndzTA7KJow28VbQW3odyk/8drmuI= +golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -486,8 +504,8 @@ golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44= -golang.org/x/time v0.0.0-20220210224613-90d013bbcef8/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.0.0-20220922220347-f3bd1da661af h1:Yx9k8YCG3dvF87UAn2tu2HQLf2dt/eR1bXxpLMWeH+Y= +golang.org/x/time v0.0.0-20220922220347-f3bd1da661af/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/nats/README.md b/nats/README.md new file mode 100644 index 0000000..2552c64 --- /dev/null +++ b/nats/README.md @@ -0,0 +1,46 @@ +# Openware Nats + +openware common nats package. + +first we need to initialize nats connection + +``` +connectionString = "localhost:4222" +nc, err := nats.InitNats(connectionString) +``` +we need nc for initializing handlers and subscribers + + +### Publisher + +we have publisher for publishing event using either nats or Jetstream + +to intialize nats and publish an event: +``` +pub, err := nats.NewNatsEventPublisher(nc) +// handle error +pub.Publish("foo.bar", []byte("baz")) +``` + +to initialize jetstream and publish an event: +``` +js, err := nats.NewJsEventPublisher(nc) +// handle error +js.Publish("foo.bar", []byte("baz")) +``` +keep in mind that before publishing an event you'll also need to have a stream of the topic. to create a stream: +``` +err := js.CreateNewEventStream("foo", []string{"foo.baz", "foo.bar"}) +// handle error +``` + +### Subscriber +With subscribers we can subscribe to different topics. If we subscribe using queue, subscribers with the same group name will receive an event once. + +to intiialize nats and subscribe to a topic. +we can pass ```<-chan os.Signal``` for subscribing to shutdown event. so at the end it can cleanup +``` +handler := nats.NewNatsHandler(nc, terminationChannel, nats.NewHandlerDefaultConfig()) +msgChan := make(chan *nats.Msg) +handler.SubscribeToQueueUsingChannel("foo.baz", "bar", msgChannel) +``` \ No newline at end of file diff --git a/nats/common.go b/nats/common.go new file mode 100644 index 0000000..6f78c35 --- /dev/null +++ b/nats/common.go @@ -0,0 +1,28 @@ +// Package nats is used for handling nats (or nats jetstream) pub/sub +package nats + +import ( + "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats.go" +) + +// InitNats initialize nats using connectionSting +func InitNats(connectionString string) (*nats.Conn, error) { + nc, err := nats.Connect(connectionString) + + return nc, err +} + +// InitEmbededNats initialize nats in memory +func InitEmbededNats() (*nats.Conn, error) { + opts := &server.Options{} + ns, err := server.NewServer(opts) + if err != nil { + panic("failed to initialize nats mock server") + } + + ns.Start() + nc, err := nats.Connect(ns.ClientURL()) + + return nc, err +} diff --git a/nats/go.mod b/nats/go.mod new file mode 100644 index 0000000..806de52 --- /dev/null +++ b/nats/go.mod @@ -0,0 +1,22 @@ +module github.com/openware/pkg/nats + +go 1.18 + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/klauspost/compress v1.15.11 // indirect + github.com/minio/highwayhash v1.0.2 // indirect + github.com/nats-io/jwt/v2 v2.3.0 // indirect + github.com/nats-io/nats-server/v2 v2.9.10 // indirect + github.com/nats-io/nats.go v1.22.1 // indirect + github.com/nats-io/nkeys v0.3.0 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/objx v0.5.0 // indirect + github.com/stretchr/testify v1.8.1 // indirect + go.uber.org/automaxprocs v1.5.1 // indirect + golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be // indirect + golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec // indirect + golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/nats/go.sum b/nats/go.sum new file mode 100644 index 0000000..dcb1557 --- /dev/null +++ b/nats/go.sum @@ -0,0 +1,47 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c= +github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= +github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= +github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= +github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI= +github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= +github.com/nats-io/nats-server/v2 v2.9.10 h1:LMC46Oi9E6BUx/xBsaCVZgofliAqKQzRPU6eKWkN8jE= +github.com/nats-io/nats-server/v2 v2.9.10/go.mod h1:AB6hAnGZDlYfqb7CTAm66ZKMZy9DpfierY1/PbpvI2g= +github.com/nats-io/nats.go v1.22.1 h1:XzfqDspY0RNufzdrB8c4hFR+R3dahkxlpWe5+IWJzbE= +github.com/nats-io/nats.go v1.22.1/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA= +github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= +github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +go.uber.org/automaxprocs v1.5.1 h1:e1YG66Lrk73dn4qhg8WFSvhF0JuFQF0ERIp4rpuV8Qk= +go.uber.org/automaxprocs v1.5.1/go.mod h1:BF4eumQw0P9GtnuxxovUd06vwm1o18oMzFtK66vU6XU= +golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as= +golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= +golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be h1:fmw3UbQh+nxngCAHrDCCztao/kbYFnWjoqop8dHx05A= +golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec h1:BkDtF2Ih9xZ7le9ndzTA7KJow28VbQW3odyk/8drmuI= +golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/time v0.0.0-20220922220347-f3bd1da661af h1:Yx9k8YCG3dvF87UAn2tu2HQLf2dt/eR1bXxpLMWeH+Y= +golang.org/x/time v0.0.0-20220922220347-f3bd1da661af/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/nats/handler.go b/nats/handler.go new file mode 100644 index 0000000..e207d97 --- /dev/null +++ b/nats/handler.go @@ -0,0 +1,191 @@ +package nats + +import ( + "os" + "sync" + + "github.com/nats-io/nats.go" +) + +// EventHandler handles subscribing and queue subscribing on nats and jetstream +type EventHandler interface { + SubscribeToQueueUsingChannel(subject string, group string, msgChannel chan *nats.Msg) error + SubscribeToQueue(subject string, group string, cb nats.MsgHandler) error + Subscribe(subject string, cb nats.MsgHandler) error +} + +type eventHandlerBase struct { + subs []*nats.Subscription + mutex sync.Mutex + termination <-chan os.Signal +} + +// handlerConfig config for nats and jetstream +type handlerConfig struct { + autoUnsubscribeOnShutdown bool +} + +// NatsEventHandler nats event handler structure which implements EventHandler interface +type NatsEventHandler struct { + eventHandlerBase + nc *nats.Conn + config *handlerConfig +} + +// JsEventHandler jetstream event handler structure which implements EventHandler interface +type JsEventHandler struct { + eventHandlerBase + js nats.JetStreamContext + config *handlerConfig +} + +func newHandlerBase(termination <-chan os.Signal) eventHandlerBase { + return eventHandlerBase{ + termination: termination, + subs: make([]*nats.Subscription, 0), + mutex: sync.Mutex{}, + } +} + +// NewNatsHandler initializes new nats handler. +func NewNatsHandler(nc *nats.Conn, termination <-chan os.Signal, config *handlerConfig) *NatsEventHandler { + if config == nil { + config = NewHandlerDefaultConfig() + } + + handler := NatsEventHandler{ + nc: nc, + eventHandlerBase: newHandlerBase(termination), + config: config, + } + + go handler.handleShutdown(config.autoUnsubscribeOnShutdown) + + return &handler +} + +// NewJsHandler initializes new jetstream handler. +func NewJsHandler(nc *nats.Conn, termination <-chan os.Signal, config *handlerConfig) (*JsEventHandler, error) { + if config == nil { + config = NewHandlerDefaultConfig() + } + + js, err := nc.JetStream() + if err != nil { + return nil, err + } + + handler := &JsEventHandler{ + js: js, + eventHandlerBase: newHandlerBase(termination), + config: config, + } + + go handler.handleShutdown(config.autoUnsubscribeOnShutdown) + + return handler, nil +} + +// NewHandlerDefaultConfig initialize default config for event handlers +func NewHandlerDefaultConfig() *handlerConfig { + return &handlerConfig{ + autoUnsubscribeOnShutdown: true, + } +} + +func (h *eventHandlerBase) handleShutdown(unsubOnShutdown bool) []error { + if unsubOnShutdown { + for _ = range h.termination { + var errors []error + for _, sub := range h.subs { + err := sub.Unsubscribe() + if err != nil { + errors = append(errors, err) + } + } + + return errors + } + } + + return nil +} + +// GetSubscriptions get list of subscriptions +func (h *eventHandlerBase) GetSubscriptions() []*nats.Subscription { + return h.subs +} + +func (h *eventHandlerBase) pushSub(sub *nats.Subscription) { + h.mutex.Lock() + defer h.mutex.Unlock() + + h.subs = append(h.subs, sub) +} + +// SubscribeToQueueUsingChannel subscribe to queue with channel. you'll receive all the new events into the channel +func (j *JsEventHandler) SubscribeToQueueUsingChannel(subject string, group string, msgChannel chan *nats.Msg) error { + sub, err := j.js.ChanQueueSubscribe(subject, group, msgChannel, nats.AckExplicit()) + if err != nil { + return err + } + + j.pushSub(sub) + return nil +} + +// SubscribeToQueue subscribe to queue using a callback. +func (j *JsEventHandler) SubscribeToQueue(subject string, group string, cb nats.MsgHandler) error { + sub, err := j.js.QueueSubscribe(subject, group, cb, nats.AckExplicit()) + if err != nil { + return err + } + + j.pushSub(sub) + return nil +} + +// Subscribe subscribe using a callback. +func (j *JsEventHandler) Subscribe(subject string, cb nats.MsgHandler) error { + sub, err := j.js.Subscribe(subject, cb, nats.AckExplicit()) + if err != nil { + return err + } + + j.pushSub(sub) + return nil +} + +// SubscribeToQueueUsingChannel subscribe to queue with channel. you'll receive all the new events into the channel +func (n *NatsEventHandler) SubscribeToQueueUsingChannel(subject string, group string, msgChannel chan *nats.Msg) error { + sub, err := n.nc.ChanQueueSubscribe(subject, group, msgChannel) + if err != nil { + return err + } + + n.pushSub(sub) + return nil +} + +// SubscribeToQueue subscribe to queue using a callback. +func (n *NatsEventHandler) SubscribeToQueue(subject string, group string, cb nats.MsgHandler) error { + sub, err := n.nc.QueueSubscribe(subject, group, cb) + if err != nil { + return err + } + + n.pushSub(sub) + return nil +} + +// Subscribe subscribe using a callback. +func (n *NatsEventHandler) Subscribe(subject string, cb nats.MsgHandler) error { + sub, err := n.nc.Subscribe(subject, cb) + + if err != nil { + return err + } + + n.pushSub(sub) + return nil +} diff --git a/nats/nats_test.go b/nats/nats_test.go new file mode 100644 index 0000000..84b3191 --- /dev/null +++ b/nats/nats_test.go @@ -0,0 +1,84 @@ +package nats + +import ( + "sync" + "testing" + + "github.com/nats-io/nats.go" + "github.com/stretchr/testify/assert" +) + +// For Checking compatibility +var ( + _ EventHandler = (*NatsEventHandler)(nil) + _ EventHandler = (*JsEventHandler)(nil) + + _ eventPublisherBase = (*publisherBase)(nil) + _ EventPublisher = (*natsEventPublisher)(nil) + _ JsEventPublisher = (*jsEventPublisher)(nil) +) + +func initNatsHandlers() (*natsEventPublisher, *NatsEventHandler) { + nc, _ := InitEmbededNats() + publisher := NewNatsEventPublisher(nc) + handler := NewNatsHandler(nc, nil, nil) + + return publisher, handler +} + +func TestNatsPubSub(t *testing.T) { + pub, handler := initNatsHandlers() + handler.Subscribe("test.*", func(msg *nats.Msg) { + res := string(msg.Data) + assert.Equal(t, "test", res) + }) + + handler.Subscribe("test.*", func(msg *nats.Msg) { + res := string(msg.Data) + assert.Equal(t, "test", res) + }) + + pub.Publish("test.all", []byte("test")) +} + +func TestNatsQueue(t *testing.T) { + pub, handler := initNatsHandlers() + wg := sync.WaitGroup{} + subCount := 0 + handler.SubscribeToQueue("test.*", "grp", func(msg *nats.Msg) { + subCount++ + wg.Done() + }) + + handler.SubscribeToQueue("test.*", "grp", func(msg *nats.Msg) { + subCount++ + wg.Done() + }) + + handler.SubscribeToQueue("test.*", "grp2", func(msg *nats.Msg) { + subCount++ + wg.Done() + }) + + pub.Publish("test.all", []byte("test")) + wg.Add(2) + wg.Wait() + assert.Equal(t, 2, subCount) +} + +func TestNatsChannelQueue(t *testing.T) { + pub, handler := initNatsHandlers() + msgChan := make(chan *nats.Msg) + wg := sync.WaitGroup{} + go func() { + for msg := range msgChan { + assert.Equal(t, "test", string(msg.Data)) + wg.Done() + } + }() + + handler.SubscribeToQueueUsingChannel("test.*", "grp", msgChan) + pub.Publish("test.topic", []byte("test")) + wg.Add(1) + wg.Wait() +} diff --git a/nats/publisher.go b/nats/publisher.go new file mode 100644 index 0000000..a811c7c --- /dev/null +++ b/nats/publisher.go @@ -0,0 +1,150 @@ +package nats + +import ( + "sync" + "time" + + "github.com/nats-io/nats.go" +) + +type eventPublisherBase interface { + Request(subj string, data []byte) (*nats.Msg, error) + RequestWithTimeout(subject string, data []byte, timeout time.Duration) (*nats.Msg, error) + Publish(topic string, payload []byte) error + PublishMultiple(topics []string, payload []byte) []error +} + +// EventPublisher nats event publisher interface +type EventPublisher interface { + eventPublisherBase +} + +// JsEventPublisher jetstream event publisher and event stream manager +type JsEventPublisher interface { + eventPublisherBase + CreateNewEventStream(string, []string) error + DeleteEventStream(streamName string) error +} + +// publisherBase Base publisher stuct. it has base implementation for publishing events +type publisherBase struct { + nc *nats.Conn +} + +type natsEventPublisher struct { + publisherBase +} + +type jsEventPublisher struct { + publisherBase + js nats.JetStreamContext +} + +// NewNatsEventPublisher initialize new nats event publisher +func NewNatsEventPublisher(nc *nats.Conn) *natsEventPublisher { + dispatcher := natsEventPublisher{ + publisherBase{nc: nc}, + } + + return &dispatcher +} + +// NewJsEventPublisher initialize new jetstream event publisher +func NewJsEventPublisher(nc *nats.Conn) (*jsEventPublisher, error) { + js, err := nc.JetStream() + if err != nil { + return nil, err + } + + dispatcher := jsEventPublisher{ + publisherBase: publisherBase{nc}, + js: js, + } + + return &dispatcher, nil +} + +// Request make a request request to specific subject. (default timeout is set on 3 seconds) +func (p *publisherBase) Request(subject string, data []byte) (*nats.Msg, error) { + // TODO: maybe modify to something else + return p.RequestWithTimeout(subject, data, time.Second*3) +} + +// RequestWithTimeout make a request to specific subject and specify timeout +func (p *publisherBase) RequestWithTimeout(subject string, data []byte, timeout time.Duration) (*nats.Msg, error) { + return p.nc.Request(subject, data, timeout) +} + +// Publish publish an event for specific topic +func (p *publisherBase) Publish(topic string, payload []byte) error { + return p.nc.Publish(topic, payload) +} + +// PublishMultiple publish an event for multiple payload. because each publish may result with error we return error array +func (p *publisherBase) PublishMultiple(topics []string, payload []byte) []error { + wg := sync.WaitGroup{} + var errors []error + for _, topic := range topics { + wg.Add(1) + go func() { + defer wg.Done() + err := p.Publish(topic, payload) + if err != nil { + errors = append(errors, err) + } + }() + } + wg.Wait() + + return errors +} + +// CreateNewEventStream create stream for specific subject for jetstream. if stream already exists nothing happens. +func (j *jsEventPublisher) CreateNewEventStream(streamName string, subjects []string) error { + stream, err := j.js.StreamInfo(streamName) + if err != nil && err != nats.ErrStreamNotFound { + return err + } + + if stream == nil { + _, err := j.js.AddStream(&nats.StreamConfig{ + Name: streamName, + Subjects: subjects, + }) + if err != nil { + return err + } + } + + return nil +} + +// DeleteEventStream delete a stream in jetstream +func (j *jsEventPublisher) DeleteEventStream(streamName string) error { + return j.js.DeleteStream(streamName) +} + +// Publish publish a new event +func (j *jsEventPublisher) Publish(topic string, payload []byte) error { + _, err := j.js.Publish(topic, payload) + return err +} + +// PublishMultiple publish an event for multiple payload. because each publish may result with error we return error array +func (j *jsEventPublisher) PublishMultiple(topics []string, payload []byte) []error { + wg := sync.WaitGroup{} + var errors []error + for _, topic := range topics { + wg.Add(1) + go func() { + defer wg.Done() + _, err := j.js.Publish(topic, payload) + if err != nil { + errors = append(errors, err) + } + }() + } + wg.Wait() + + return errors +}