Skip to content

Commit

Permalink
-D for removal of retained messages
Browse files Browse the repository at this point in the history
* Refactoring cEchoErr(), cRemoceUnwantedChars()
* Implement option -D for removal of retained messages
  • Loading branch information
sheilbronn committed Apr 10, 2023
1 parent 30e828e commit b98e16b
Showing 1 changed file with 35 additions and 31 deletions.
66 changes: 35 additions & 31 deletions mqtt-grep-color
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,19 @@ then
# I'm on BusyBox
### grepexp="^ *$|[\}{ ]|\[\]|\[\{|\}\]" # { must be escaped
# colorizes }{, etc. if no other grep expression given; space must be there!
[ "$stdbufcmd_tr" ] || echo "$scriptname: Warning: Install stdbuf for better line buffering! (Package coreutils-stdbuf on OpenWrt)" 1>&2
[ "$stdbufcmd_tr" ] || cEchoErr "Warning: Install stdbuf for better line buffering! (Package coreutils-stdbuf on OpenWrt)"
else
# Not on BusyBox
### grepexp="^ *$|[\}{ ]|\[\]|\[{|\}\]" # { must NOT be escaped
# grepexp="$grepexp| PNG"
GREPOPTS="--text --color=always --line-buffered"
[ "$stdbufcmd_tr" ] || echo "$scriptname: Warning: Install stdbuf for better line buffering!" 1>&2
[ "$stdbufcmd_tr" ] || cEchoErr "Warning: Install stdbuf for better line buffering!"
fi

command -v mosquitto_sub > /dev/null || { echo "$0: Error: Install mosquitto_sub first, e.g. from package mosquitto-client-nossl" 1>&2 ; exit 1 ; }
command -v gawk > /dev/null || { echo "$0: Error: Install gawk first, e.g. from package gawk" 1>&2 ; exit 1 ; }
cCmv() { command -v "$@" > /dev/null ; } # find out whether a command exists

cCmv mosquitto_sub || { cEchoErr "Error: Install mosquitto_sub first, e.g. from package mosquitto-client-nossl" ; exit 1 ; }
cCmv gawk || { cEchoErr "Error: Install gawk first, e.g. from package gawk" ; exit 1 ; }

mqtthost=""
noretained="-v" # use "-v" to make sure $noretained is not an empty option
Expand All @@ -52,18 +54,15 @@ awkDeltaTime=0 # default
scriptname="${0##*/}"
jsonppcmd="cat"

remove_unwanted_chars() {
tr -d '()"^%$ \r\000-\011\013-\037'
}

cCmv() { command -v "$@" >$fNul ;} # find out whether a command exists
cRemoveUnwantedChars() { tr -d ')("^%$ \r\000-\011\013-\037' ; }
cEchoErr() { echo "$scriptname: $*" 1>&2 ; }

### process command line options ###

while getopts "?e:h:t:T:f:p:W:Rl:C:SrsconPxv-" opt
while getopts "?e:h:t:T:f:p:W:RDl:C:SrsconPxv-" opt
do
case "$opt" in
\?) echo "Usage: $scriptname -e expr -h host -x -v -t topic1 -t topic2 -o -C maxcnt -p port -P -- othermosquittoopts" 1>&2
\?) cEchoErr "Usage: $scriptname -e expr -h host -x -v -t topic1 -t topic2 -o -C maxcnt -p port -P -- othermosquittoopts"
exit 1
;;
e) grepexp="$OPTARG"
Expand All @@ -77,14 +76,17 @@ do
hivemq) mqtthost="broker.hivemq.com" ;;
emqx) mqtthost="broker.emqx.io" ;;
dash) mqtthost="broker.mqttdashboard.com" ;; # https://moxd.io/2015/10/17/public-mqtt-brokers/
*) mqtthost="$( echo "$OPTARG" | remove_unwanted_chars )" ;; # clean up for sec purposes
*) mqtthost="$( echo "$OPTARG" | cRemoveUnwantedChars )" ;; # clean up for sec purposes
esac
[ "$bVerbose" ] && { host $mqtthost || nslookup $mqtthost ; ping -4 -c 1 $mqtthost ; ping -6 -c 1 $mqtthost ; }
# ... used for checking: for h in test eclipse iot mosca hivemq emqx dash ; do echo ==== $h ==== ; mqtt-grep-color -W 3 -C 1 -v -h $h ; done
;;
t) topics="$topics -t $( echo "$OPTARG" | sed -e 's:/$:/\#:' )" # as a service, add \# to any trailing /
;;
T) topics="$topics -T $( echo "$OPTARG" | sed -e 's:/$:/\#:' )"
t|T) if [ "$OPTARG" != "${OPTARG%/}" ] ; then # implicitly add the MQTT wildcard hashtag to any trailing "/"
topics="$topics -$opt $OPTARG#" # topics="$topics -t $( echo "$t" | sed -e 's:/$:/\#:' )"
else
[ ! "$OPTARG" != "${OPTARG%#}" ] && cEchoErr "Warning: Option -$opt \"$OPTARG\" has no trailing / or \#, i.e. is not a wildcard topic"
topics="$topics -$opt $OPTARG"
fi
;;
f) inputfile="$OPTARG"
;;
Expand All @@ -94,45 +96,45 @@ do
;;
R) noretained="-R"
;;
l) awkMinTopicChars="$( echo "$OPTARG" | remove_unwanted_chars )" # minimum length of outputted topic string (padded with spaces)
D) bRemoveRetained=1 # will continue with streaming after removing the retained messages...
;;
C) maxcount="-C $( echo "$OPTARG" | remove_unwanted_chars )" # clean up for sec purposes
l) awkMinTopicChars="$( echo "$OPTARG" | cRemoveUnwantedChars )" # minimum length of outputted topic string (padded with spaces)
;;
C) maxcount="-C $( echo "$OPTARG" | cRemoveUnwantedChars )" # clean up for sec purposes
;;
n) noColor="1"
;;
c) awkAdditionalSkipLines="1"
c) awkAdditionalSkipLines=1
;;
P) if command -v json_pp ; then
jsonppcmd=json_pp
else
echo "$scriptname: Warning: json_pp not found..." 1>&2
fi
P) cCmv json_pp && jsonppcmd=json_pp || cEchoErr "Warning: json_pp not found..."
;;
r) awkRemoveKeyQuotes="1"
r) awkRemoveKeyQuotes=1
;;
s) sSuppressAttrs="$sSuppressAttrs $( echo "$OPTARG" | tr -d -c "A-Za-z0-9_")" # attrs that will be tried to be eliminated
s) sSuppressAttrs="$sSuppressAttrs $( echo "$OPTARG" | tr -d -c "A-Za-z0-9_")" # JSON attrs that will be tried to be eliminated
;;
S) awkTimeFormat="%s"
awkTimeLen=3 # only the last three digits (len of %s is 11)
awkDeltaTime=1
;;
o) awkOptimizeJSON="1"
o) awkOptimizeJSON=1
;;
x) set -x # turn on shell debugging from here on
;;
v) bVerbose="yes" # still unused...
;;
-) echo "-- not supported."
exit 1
-) echo "option -$opt not supported. Exiting." ; exit 1
;;
esac
done

shift "$((OPTIND-1))" # Discard options processed by getopts, any remaining options will be passed to mosquitto_sub

[ -z "$topics" ] && topics="-t #" # use '#' as MQTT topic if nothing else given
if [ -z "$topics" ] ; then
[ "$bRemoveRetained" ] && { cEchoErr "Retained topics to be removed must be given explicitly with option -t. Exiting." ; exit 1 ; }
topics="-t #" # use '#' as default MQTT topic if nothing else given
fi

# These are the following steps for processing the MQTT stream:
# These are the basic steps for processing the MQTT stream:
# 1. remove any non-printable stuff from the data,
# 2. color the MQTT topic red,
# 3. put the time in front and color it yellow,
Expand All @@ -144,7 +146,9 @@ if [ -n "$inputfile" ] ; then
cat "$inputfile"
else
_ms_options="${mqtthost:+-h $mqtthost} ${port:+-p $port} -v $topics $noretained $maxcount $messageTimeOut $*"
[ "$bVerbose" ] && echo "$scriptname: m_s options: $_ms_options" 1>&2
[ "$bRemoveRetained" ] && mosquitto_sub $_ms_options -W 1 --retained-only --remove-retained 2>&1 |
awk -v topics="$topics" '/^Timed out/ { next } { count++ ; print } BEGIN { print "=== Removing retained messages with topics: " topics " ===" ; count=0 } END { print "=== Removed " count " retained messages. ===" }' # grep -v "^Timed out$"
[ "$bVerbose" ] && cEchoErr "mosquitto_sub options: $_ms_options"
mosquitto_sub $_ms_options
fi |
gawk -b -v timeLen="$awkTimeLen" -v timeFormat="$awkTimeFormat" -v deltaTime="$awkDeltaTime" -v verbose="$bVerbose" -v removeKeyQuotes="$awkRemoveKeyQuotes" \
Expand Down

0 comments on commit b98e16b

Please sign in to comment.