Skip to content

Commit

Permalink
Implement message move limit (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
mercury2269 authored Feb 24, 2020
1 parent 009d240 commit 3f75882
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 31 deletions.
21 changes: 14 additions & 7 deletions README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ Useful when you need to move deadletter queue messages back into the original qu

## Features

* Reliable delivery. Messages are only deleted from the source queue if they
were successfully enqueued to the destination.
* Reliable delivery. Messages are only deleted from the source queue if they were enqueued to the destination.
* Messages are sent and received in batches for faster processing.
* Progress indicator.
* User friendly info and error messages.
* Queue name resolution. For ease of use, you only need to provide a queue name and not the full `arn` address.
* Message Attributes are copied over.
* Support for FIFO queues. MessageGroupId and MessageDeduplicationId are copied over to the destination messages.
* Optional flag to limit the number of messages to move.

## Installation

Expand Down Expand Up @@ -104,12 +104,14 @@ sqsmover --help
usage: sqsmover --source=SOURCE --destination=DESTINATION [<flags>]
Flags:
--help Show context-sensitive help (also try
-h, --help Show context-sensitive help (also try
--help-long and --help-man).
-s, --source=SOURCE Source queue to move messages from
-d, --destination=DESTINATION Destination queue to move messages to
-r, --region="us-west-2" AWS Region for source and destination queues
-p, --profile="" Use a specific profile from your credential file.
-s, --source=SOURCE Source queue name to move messages from.
-d, --destination=DESTINATION Destination queue name to move messages to.
-r, --region="us-west-2" AWS region for source and destination queues.
-p, --profile="" Use a specific profile from AWS credentials file.
-l, --limit=0 Limits number of messages moved. No limit is set by default.
-v, --version Show application version.
```

Examples:
Expand All @@ -130,3 +132,8 @@ Profile will default to `Default`, you can also override it with `--profile` fla
sqsmover --source=my_source_queue_name --destination=my_destination_queuename --profile=user
```

Limit number of moved messages to 10
```
sqsmover -s my_source_queue_name -d my_destination_queuename -l 10
```

58 changes: 34 additions & 24 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ var (
)

var (
sourceQueue = kingpin.Flag("source", "Source queue to move messages from").Short('s').Required().String()
destinationQueue = kingpin.Flag("destination", "Destination queue to move messages to").Short('d').Required().String()
region = kingpin.Flag("region", "AWS Region for source and destination queues").Short('r').Default("us-west-2").String()
profile = kingpin.Flag("profile", "Use a specific profile from your credential file.").Short('p').Default("").String()
sourceQueue = kingpin.Flag("source", "Source queue name to move messages from.").Short('s').Required().String()
destinationQueue = kingpin.Flag("destination", "Destination queue name to move messages to.").Short('d').Required().String()
region = kingpin.Flag("region", "AWS region for source and destination queues.").Short('r').Default("us-west-2").String()
profile = kingpin.Flag("profile", "Use a specific profile from AWS credentials file.").Short('p').Default("").String()
limit = kingpin.Flag("limit", "Limits number of messages moved. No limit is set by default.").Short('l').Default("0").Int()
)


func main() {
log.SetHandler(cli.Default)

Expand Down Expand Up @@ -60,23 +60,23 @@ func main() {

svc := sqs.New(sess)

err, sourceQueueUrl := resolveQueueUrl(svc, *sourceQueue)
sourceQueueUrl, err := resolveQueueUrl(svc, *sourceQueue)

if err != nil {
logAwsError("Failed to resolve source queue", err)
return
}

log.Info(color.New(color.FgCyan).Sprintf("Source queue url: %s", sourceQueueUrl))
log.Info(color.New(color.FgCyan).Sprintf("Source queue URL: %s", sourceQueueUrl))

err, destinationQueueUrl := resolveQueueUrl(svc, *destinationQueue)
destinationQueueUrl, err := resolveQueueUrl(svc, *destinationQueue)

if err != nil {
logAwsError("Failed to resolve destination queue", err)
return
}

log.Info(color.New(color.FgCyan).Sprintf("Destination queue url: %s", destinationQueueUrl))
log.Info(color.New(color.FgCyan).Sprintf("Destination queue URL: %s", destinationQueueUrl))

queueAttributes, err := svc.GetQueueAttributes(&sqs.GetQueueAttributesInput{
QueueUrl: aws.String(sourceQueueUrl),
Expand All @@ -85,29 +85,33 @@ func main() {

numberOfMessages, _ := strconv.Atoi(*queueAttributes.Attributes["ApproximateNumberOfMessages"])

log.Info(color.New(color.FgCyan).Sprintf("Approximate number of messages in the source queue: %s",
*queueAttributes.Attributes["ApproximateNumberOfMessages"]))
log.Info(color.New(color.FgCyan).Sprintf("Approximate number of messages in the source queue: %d", numberOfMessages))

if numberOfMessages == 0 {
log.Info("Looks like nothing to move. Done.")
return
}

if *limit > 0 && numberOfMessages > *limit {
numberOfMessages = *limit
log.Info(color.New(color.FgCyan).Sprintf("Limit is set, will only move %d messages", numberOfMessages))
}

moveMessages(sourceQueueUrl, destinationQueueUrl, svc, numberOfMessages)

}

func resolveQueueUrl(svc *sqs.SQS, queueName string) (error, string) {
func resolveQueueUrl(svc *sqs.SQS, queueName string) (string, error) {
params := &sqs.GetQueueUrlInput{
QueueName: aws.String(queueName),
}
resp, err := svc.GetQueueUrl(params)

if err != nil {
return err, ""
return "", err
}

return nil, *resp.QueueUrl
return *resp.QueueUrl, nil
}

func logAwsError(message string, err error) {
Expand Down Expand Up @@ -153,7 +157,7 @@ func convertSuccessfulMessageToBatchRequestEntry(messages []*sqs.Message) []*sqs
return result
}

func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQS, numberOfMessages int) {
func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQS, totalMessages int) {
params := &sqs.ReceiveMessageInput{
QueueUrl: aws.String(sourceQueueUrl),
VisibilityTimeout: aws.Int64(2),
Expand All @@ -171,7 +175,7 @@ func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQ
term.HideCursor()
defer term.ShowCursor()

b := progress.NewInt(numberOfMessages)
b := progress.NewInt(totalMessages)
b.Width = 40
b.StartDelimiter = color.New(color.FgCyan).Sprint("|")
b.EndDelimiter = color.New(color.FgCyan).Sprint("|")
Expand All @@ -186,9 +190,9 @@ func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQ
for {
resp, err := svc.ReceiveMessage(params)

if len(resp.Messages) == 0 {
if len(resp.Messages) == 0 || messagesProcessed == totalMessages {
fmt.Println()
log.Info(color.New(color.FgCyan).Sprintf("Done. Moved %s messages", strconv.Itoa(numberOfMessages)))
log.Info(color.New(color.FgCyan).Sprintf("Done. Moved %s messages", strconv.Itoa(totalMessages)))
return
}

Expand All @@ -197,9 +201,15 @@ func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQ
return
}

messagesToCopy := resp.Messages

if len(resp.Messages)+messagesProcessed > totalMessages {
messagesToCopy = resp.Messages[0 : totalMessages-messagesProcessed]
}

batch := &sqs.SendMessageBatchInput{
QueueUrl: aws.String(destinationQueueUrl),
Entries: convertToEntries(resp.Messages),
Entries: convertToEntries(messagesToCopy),
}

sendResp, err := svc.SendMessageBatch(batch)
Expand All @@ -214,9 +224,9 @@ func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQ
return
}

if len(sendResp.Successful) == len(resp.Messages) {
if len(sendResp.Successful) == len(messagesToCopy) {
deleteMessageBatch := &sqs.DeleteMessageBatchInput{
Entries: convertSuccessfulMessageToBatchRequestEntry(resp.Messages),
Entries: convertSuccessfulMessageToBatchRequestEntry(messagesToCopy),
QueueUrl: aws.String(sourceQueueUrl),
}

Expand All @@ -232,11 +242,11 @@ func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQ
return
}

messagesProcessed += len(resp.Messages)
messagesProcessed += len(messagesToCopy)
}

// Increase the total if the approximation was under - avoids exception
if messagesProcessed > numberOfMessages {
if messagesProcessed > totalMessages {
b.Total = float64(messagesProcessed)
}

Expand All @@ -257,4 +267,4 @@ func buildVersion(version, commit, date, builtBy string) string {
result = fmt.Sprintf("%s\nbuilt by: %s", result, builtBy)
}
return result
}
}

0 comments on commit 3f75882

Please sign in to comment.