Skip to content
This repository has been archived by the owner on Jul 28, 2021. It is now read-only.

Commit

Permalink
Merge pull request #106 from nats-io/improve_error
Browse files Browse the repository at this point in the history
improve error handling
  • Loading branch information
ripienaar authored Feb 26, 2020
2 parents c3ed8be + b748e57 commit 2ea4c1b
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 92 deletions.
17 changes: 12 additions & 5 deletions internal/jsch/README.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
## Overview

This is a helper library for managing and interacting with JetStream we are exploring a few options for how such a library will look and to determine what will go into core NATS clients and what will be left as an external library.
This is a helper library for managing and interacting with JetStream.

This library is not an official blessed way for interacting with JetStream yet but as a set of examples of all the capabilities this is valuable and it's for us a starting point to learning what patterns work well
This library provides API access to all the abilities of the `nats` CLI utility.

## Setup

As some parts of this library will go into the core NATS client and others not we are not sure how it will get a NATS connection, for now we did a basic - but ugly - thing just to delay answering that.

```go
nc, _ := nats.Connect("localhost")

nc, _ := nats.Connect("localhost:4222", nats.UserCredentials("user.creds"))
jsch.SetConnection(nc)
```

This will then use the NATS Connection you supply for all future interaction with JetStream
or

```go
err := jsch.Connect("localhost:4222", nats.UserCredentials("user.creds"))
```

This will then use the NATS Connection you supply for all future interaction with JetStream.

If you want access to this connection you can use `jsch.Connection()`

## Streams
### Creating Streams
Expand Down
18 changes: 1 addition & 17 deletions internal/jsch/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,11 @@ func createDurableConsumer(request server.CreateConsumerRequest) (name string, e
return "", err
}

response, err := nrequest(fmt.Sprintf(server.JetStreamCreateConsumerT, request.Stream, request.Config.Durable), jreq, timeout)
_, err = nrequest(fmt.Sprintf(server.JetStreamCreateConsumerT, request.Stream, request.Config.Durable), jreq, timeout)
if err != nil {
return "", err
}

if IsErrorResponse(response) {
return "", fmt.Errorf(string(response.Data))
}

return request.Config.Durable, nil
}

Expand All @@ -111,10 +107,6 @@ func createEphemeralConsumer(request server.CreateConsumerRequest) (name string,
return "", err
}

if IsErrorResponse(response) {
return "", fmt.Errorf(string(response.Data))
}

parts := strings.Split(string(response.Data), " ")
if len(parts) != 2 {
return "", fmt.Errorf("invalid ephemeral OK response from server: %q", response.Data)
Expand Down Expand Up @@ -187,10 +179,6 @@ func loadConsumerInfo(s string, c string) (info server.ConsumerInfo, err error)
return info, err
}

if IsErrorResponse(response) {
return info, fmt.Errorf(string(response.Data))
}

info = server.ConsumerInfo{}
err = json.Unmarshal(response.Data, &info)
if err != nil {
Expand Down Expand Up @@ -501,10 +489,6 @@ func (c *Consumer) Delete() (err error) {
return err
}

if IsErrorResponse(response) {
return fmt.Errorf(string(response.Data))
}

if IsOKResponse(response) {
return nil
}
Expand Down
39 changes: 21 additions & 18 deletions internal/jsch/jsch.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var timeout = 5 * time.Second
var nc *nats.Conn
var mu sync.Mutex

// Connect connects to NATS and configures it to use the connection in future interaction
// Connect connects to NATS and configures it to use the connection in future interactions with JetStream
func Connect(servers string, opts ...nats.Option) (err error) {
mu.Lock()
defer mu.Unlock()
Expand Down Expand Up @@ -78,6 +78,15 @@ func IsErrorResponse(m *nats.Msg) bool {
return strings.HasPrefix(string(m.Data), server.ErrPrefix)
}

// ParseErrorResponse parses the JetStream response, if it's an error returns an error instance holding the message else nil
func ParseErrorResponse(m *nats.Msg) error {
if !IsErrorResponse(m) {
return nil
}

return fmt.Errorf(strings.TrimSuffix(strings.TrimPrefix(strings.TrimPrefix(string(m.Data), server.ErrPrefix), " '"), "'"))
}

// IsOKResponse checks if the message holds a standard JetStream error
func IsOKResponse(m *nats.Msg) bool {
return strings.HasPrefix(string(m.Data), server.OK)
Expand Down Expand Up @@ -138,10 +147,6 @@ func JetStreamAccountInfo() (info server.JetStreamAccountStats, err error) {
return info, err
}

if IsErrorResponse(response) {
return info, fmt.Errorf(string(response.Data))
}

err = json.Unmarshal(response.Data, &info)
if err != nil {
return info, err
Expand All @@ -159,10 +164,6 @@ func StreamNames() (streams []string, err error) {
return streams, err
}

if IsErrorResponse(response) {
return streams, fmt.Errorf(string(response.Data))
}

err = json.Unmarshal(response.Data, &streams)
if err != nil {
return streams, err
Expand All @@ -182,10 +183,6 @@ func StreamTemplateNames() (templates []string, err error) {
return templates, err
}

if IsErrorResponse(response) {
return templates, fmt.Errorf(string(response.Data))
}

err = json.Unmarshal(response.Data, &templates)
if err != nil {
return templates, err
Expand All @@ -205,10 +202,6 @@ func ConsumerNames(stream string) (consumers []string, err error) {
return consumers, err
}

if IsErrorResponse(response) {
return consumers, fmt.Errorf(string(response.Data))
}

err = json.Unmarshal(response.Data, &consumers)
if err != nil {
return consumers, err
Expand Down Expand Up @@ -268,6 +261,11 @@ func Flush() error {
return nc.Flush()
}

// Connection is the active NATS connection being used
func Connection() *nats.Conn {
return nconn()
}

func nconn() *nats.Conn {
mu.Lock()
defer mu.Unlock()
Expand All @@ -281,5 +279,10 @@ func nrequest(subj string, data []byte, timeout time.Duration) (*nats.Msg, error
return nil, fmt.Errorf("nats connection is not set, use SetConnection()")
}

return nc.Request(subj, data, timeout)
res, err := nc.Request(subj, data, timeout)
if err != nil {
return nil, err
}

return res, ParseErrorResponse(res)
}
15 changes: 14 additions & 1 deletion internal/jsch/jsch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,24 @@ func TestIsErrorResponse(t *testing.T) {
t.Fatalf("OK is Error")
}

if !jsch.IsErrorResponse(&nats.Msg{Data: []byte("-ERR error")}) {
if !jsch.IsErrorResponse(&nats.Msg{Data: []byte("-ERR 'error'")}) {
t.Fatalf("ERR is not Error")
}
}

func TestParseErrorResponse(t *testing.T) {
checkErr(t, jsch.ParseErrorResponse(&nats.Msg{Data: []byte("+OK")}), "expected nil got error")

err := jsch.ParseErrorResponse(&nats.Msg{Data: []byte("-ERR 'test error")})
if err == nil {
t.Fatalf("expected an error got nil")
}

if err.Error() != "test error" {
t.Fatalf("expected 'test error' got '%v'", err)
}
}

func TestIsOKResponse(t *testing.T) {
if !jsch.IsOKResponse(&nats.Msg{Data: []byte("+OK")}) {
t.Fatalf("OK is Error")
Expand Down
42 changes: 5 additions & 37 deletions internal/jsch/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,11 @@ func NewStreamFromDefault(name string, dflt server.StreamConfig, opts ...StreamO
return nil, err
}

response, err := nrequest(fmt.Sprintf(server.JetStreamCreateStreamT, name), jreq, timeout)
_, err = nrequest(fmt.Sprintf(server.JetStreamCreateStreamT, name), jreq, timeout)
if err != nil {
return nil, err
}

if IsErrorResponse(response) {
return nil, fmt.Errorf(string(response.Data))
}

return LoadStream(name)
}

Expand Down Expand Up @@ -149,10 +145,6 @@ func loadStreamInfo(stream string) (info *server.StreamInfo, err error) {
return nil, err
}

if IsErrorResponse(response) {
return nil, fmt.Errorf(string(response.Data))
}

info = &server.StreamInfo{}
err = json.Unmarshal(response.Data, info)
if err != nil {
Expand Down Expand Up @@ -265,15 +257,11 @@ func (s *Stream) UpdateConfiguration(cfg server.StreamConfig, opts ...StreamOpti
return err
}

response, err := nrequest(fmt.Sprintf(server.JetStreamUpdateStreamT, s.Name()), jcfg, timeout)
_, err = nrequest(fmt.Sprintf(server.JetStreamUpdateStreamT, s.Name()), jcfg, timeout)
if err != nil {
return err
}

if IsErrorResponse(response) {
return fmt.Errorf(string(response.Data))
}

return s.Reset()
}

Expand Down Expand Up @@ -314,10 +302,6 @@ func (s *Stream) ConsumerNames() (names []string, err error) {
return names, err
}

if IsErrorResponse(response) {
return names, fmt.Errorf(string(response.Data))
}

err = json.Unmarshal(response.Data, &names)
if err != nil {
return names, err
Expand Down Expand Up @@ -363,29 +347,21 @@ func (s *Stream) State() (stats server.StreamState, err error) {

// Delete deletes the Stream, after this the Stream object should be disposed
func (s *Stream) Delete() error {
response, err := nrequest(fmt.Sprintf(server.JetStreamDeleteStreamT, s.Name()), nil, timeout)
_, err := nrequest(fmt.Sprintf(server.JetStreamDeleteStreamT, s.Name()), nil, timeout)
if err != nil {
return err
}

if IsErrorResponse(response) {
return fmt.Errorf(string(response.Data))
}

return nil
}

// Purge deletes all messages from the Stream
func (s *Stream) Purge() error {
response, err := nrequest(fmt.Sprintf(server.JetStreamPurgeStreamT, s.Name()), nil, timeout)
_, err := nrequest(fmt.Sprintf(server.JetStreamPurgeStreamT, s.Name()), nil, timeout)
if err != nil {
return err
}

if IsErrorResponse(response) {
return fmt.Errorf(string(response.Data))
}

return nil
}

Expand All @@ -396,10 +372,6 @@ func (s *Stream) LoadMessage(seq int) (msg server.StoredMsg, err error) {
return server.StoredMsg{}, err
}

if IsErrorResponse(response) {
return server.StoredMsg{}, fmt.Errorf(string(response.Data))
}

msg = server.StoredMsg{}
err = json.Unmarshal(response.Data, &msg)
if err != nil {
Expand All @@ -411,15 +383,11 @@ func (s *Stream) LoadMessage(seq int) (msg server.StoredMsg, err error) {

// DeleteMessage deletes a specific message from the Stream by overwriting it with random data
func (s *Stream) DeleteMessage(seq int) (err error) {
response, err := nrequest(fmt.Sprintf(server.JetStreamDeleteMsgT, s.Name()), []byte(strconv.Itoa(seq)), timeout)
_, err = nrequest(fmt.Sprintf(server.JetStreamDeleteMsgT, s.Name()), []byte(strconv.Itoa(seq)), timeout)
if err != nil {
return err
}

if IsErrorResponse(response) {
return fmt.Errorf(string(response.Data))
}

return nil
}

Expand Down
16 changes: 2 additions & 14 deletions internal/jsch/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,11 @@ func NewStreamTemplate(name string, maxStreams uint32, config server.StreamConfi
return nil, err
}

response, err := nrequest(fmt.Sprintf(server.JetStreamCreateTemplateT, name), jreq, timeout)
_, err = nrequest(fmt.Sprintf(server.JetStreamCreateTemplateT, name), jreq, timeout)
if err != nil {
return nil, err
}

if IsErrorResponse(response) {
return nil, fmt.Errorf(string(response.Data))
}

return LoadStreamTemplate(name)
}

Expand Down Expand Up @@ -69,10 +65,6 @@ func loadConfigForStreamTemplate(template *StreamTemplate) (err error) {
return err
}

if IsErrorResponse(response) {
return fmt.Errorf(string(response.Data))
}

info := server.StreamTemplateInfo{}
err = json.Unmarshal(response.Data, &info)
if err != nil {
Expand All @@ -87,15 +79,11 @@ func loadConfigForStreamTemplate(template *StreamTemplate) (err error) {

// Delete deletes the StreamTemplate, after this the StreamTemplate object should be disposed
func (t *StreamTemplate) Delete() error {
response, err := nrequest(fmt.Sprintf(server.JetStreamDeleteTemplateT, t.Name()), nil, timeout)
_, err := nrequest(fmt.Sprintf(server.JetStreamDeleteTemplateT, t.Name()), nil, timeout)
if err != nil {
return err
}

if IsErrorResponse(response) {
return fmt.Errorf(string(response.Data))
}

return nil
}

Expand Down

0 comments on commit 2ea4c1b

Please sign in to comment.