diff --git a/src/main/java/com/aws/greengrass/telemetry/nucleus/emitter/Constants.java b/src/main/java/com/aws/greengrass/telemetry/nucleus/emitter/Constants.java index 852a420..5d33119 100644 --- a/src/main/java/com/aws/greengrass/telemetry/nucleus/emitter/Constants.java +++ b/src/main/java/com/aws/greengrass/telemetry/nucleus/emitter/Constants.java @@ -10,6 +10,7 @@ public class Constants { public static final String AWS_GREENGRASS_TELEMETRY_NUCLEUS_EMITTER = "aws.greengrass.telemetry.NucleusEmitter"; public static final String PUBSUB_PUBLISH_CONFIG_NAME = "pubSubPublish"; + public static final String PUBSUB_TOPIC_CONFIG_NAME = "pubSubTopic"; public static final String MQTT_TOPIC_CONFIG_NAME = "mqttTopic"; public static final String TELEMETRY_PUBLISH_INTERVAL_CONFIG_NAME = "telemetryPublishIntervalMs"; public static final String DEFAULT_TELEMETRY_PUBSUB_TOPIC = "$local/greengrass/telemetry"; @@ -20,9 +21,9 @@ public class Constants { public static final long MIN_TELEMETRY_PUBLISH_INTERVAL_MS = 500; public static final String PUBSUB_PUBLISH_SUCCESS_LOG = "Published local pub/sub message on topic " - + "'$local/greengrass/telemetry'"; + + "'{}'"; public static final String PUBSUB_PUBLISH_FAILURE_LOG = "Failed to publish local pub/sub message on topic " - + "'$local/greengrass/telemetry'"; + + "'{}'"; public static final String TELEMETRY_PUBLISH_SCHEDULED = "Scheduling telemetry publish"; public static final String TELEMETRY_PUBLISH_STOPPING = "Stopping telemetry publish"; public static final String PUBSUB_PUBLISH_STARTING = "Starting local pub/sub publishing"; @@ -40,6 +41,9 @@ public class Constants { + " configuration."; public static final String PUBSUB_PUBLISH_CONFIG_PARSE_ERROR_LOG = "Could not parse the pubSubPublish config option" + " {}. Please make sure this is set to a valid boolean value"; + + public static final String PUBSUB_TOPIC_CONFIG_PARSE_ERROR_LOG = "Could not parse the pubSubTopic config option {}." + + " Please make sure this is set to a valid topic string value"; public static final String MQTT_TOPIC_CONFIG_PARSE_ERROR_LOG = "Could not parse the mqttTopic config option {}." + " Please make sure this is set to a valid topic string value"; public static final String TELEMETRY_PUBLISH_INTERVAL_CONFIG_PARSE_ERROR_LOG = "Could not parse the " diff --git a/src/main/java/com/aws/greengrass/telemetry/nucleus/emitter/NucleusEmitter.java b/src/main/java/com/aws/greengrass/telemetry/nucleus/emitter/NucleusEmitter.java index aa57f7e..27ee30f 100644 --- a/src/main/java/com/aws/greengrass/telemetry/nucleus/emitter/NucleusEmitter.java +++ b/src/main/java/com/aws/greengrass/telemetry/nucleus/emitter/NucleusEmitter.java @@ -35,7 +35,6 @@ import static com.aws.greengrass.componentmanager.KernelConfigResolver.CONFIGURATION_CONFIG_KEY; import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.AWS_GREENGRASS_TELEMETRY_NUCLEUS_EMITTER; import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.CONFIG_UPDATE_ERROR_LOG; -import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.DEFAULT_TELEMETRY_PUBSUB_TOPIC; import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.INVALID_PUBLISH_THRESHOLD_LOG; import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.JSON_PARSE_ERROR_LOG; import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.MIN_TELEMETRY_PUBLISH_INTERVAL_MS; @@ -132,10 +131,10 @@ public void startup() { scheduleTelemetryPublish(); } - private void publishTelemetry(boolean pubSubPublish, boolean mqttPublish, String mqttTopic) { + private void publishTelemetry(boolean pubSubPublish, String pubSubTopic, boolean mqttPublish, String mqttTopic) { String jsonString = retrieveMetricsJson(jsonMapper); if (pubSubPublish) { - this.pubSubPublisher.publishMessage(jsonString, DEFAULT_TELEMETRY_PUBSUB_TOPIC); + this.pubSubPublisher.publishMessage(jsonString, pubSubTopic); } if (mqttPublish) { this.mqttPublisher.publishMessage(jsonString, mqttTopic); @@ -162,12 +161,14 @@ private void scheduleTelemetryPublish() { } logger.debug(TELEMETRY_PUBLISH_SCHEDULED); telemetryPublishFuture = ses.scheduleAtFixedRate( - () -> publishTelemetry(newPubPublish,!Utils.isEmpty(newMqttTopic), newMqttTopic), 0, + () -> publishTelemetry(newPubPublish, configuration.getPubsubTopic(), + !Utils.isEmpty(newMqttTopic), newMqttTopic), + 0, newTelemetryPublishIntervalMs, TimeUnit.MILLISECONDS); } logger.info(STARTUP_CONFIGURATION_LOG, newPubPublish, - DEFAULT_TELEMETRY_PUBSUB_TOPIC, newMqttTopic, newTelemetryPublishIntervalMs); + configuration.getPubsubTopic(), newMqttTopic, newTelemetryPublishIntervalMs); } protected String retrieveMetricsJson(ObjectMapper jsonMapper) { diff --git a/src/main/java/com/aws/greengrass/telemetry/nucleus/emitter/NucleusEmitterConfiguration.java b/src/main/java/com/aws/greengrass/telemetry/nucleus/emitter/NucleusEmitterConfiguration.java index 7f2fead..489005e 100644 --- a/src/main/java/com/aws/greengrass/telemetry/nucleus/emitter/NucleusEmitterConfiguration.java +++ b/src/main/java/com/aws/greengrass/telemetry/nucleus/emitter/NucleusEmitterConfiguration.java @@ -15,10 +15,13 @@ import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.CONFIG_INVALID_OPTION_ERROR_LOG; import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.DEFAULT_TELEMETRY_PUBLISH_INTERVAL_MS; +import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.DEFAULT_TELEMETRY_PUBSUB_TOPIC; import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.MQTT_TOPIC_CONFIG_NAME; import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.MQTT_TOPIC_CONFIG_PARSE_ERROR_LOG; import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.PUBSUB_PUBLISH_CONFIG_NAME; import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.PUBSUB_PUBLISH_CONFIG_PARSE_ERROR_LOG; +import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.PUBSUB_TOPIC_CONFIG_NAME; +import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.PUBSUB_TOPIC_CONFIG_PARSE_ERROR_LOG; import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.TELEMETRY_PUBLISH_INTERVAL_CONFIG_NAME; import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.TELEMETRY_PUBLISH_INTERVAL_CONFIG_PARSE_ERROR_LOG; @@ -30,6 +33,9 @@ public class NucleusEmitterConfiguration { //Only local pub/sub is enabled by default @Builder.Default boolean pubsubPublish = true; + @Builder.Default + String pubsubTopic = DEFAULT_TELEMETRY_PUBSUB_TOPIC; + @Builder.Default String mqttTopic = ""; @Builder.Default @@ -39,15 +45,14 @@ public class NucleusEmitterConfiguration { * Get the Nucleus Emitter configuration from the POJO map. * @param pojo POJO Topics object. * @param logger Greengrass logger. - * @return the Nucleus Emitter configuration. + * @return the Nucleus Emitter configuration. */ public static NucleusEmitterConfiguration fromPojo(Map pojo, Logger logger) { if (pojo.isEmpty()) { return null; } - long telemetryPublishIntervalMs = DEFAULT_TELEMETRY_PUBLISH_INTERVAL_MS; - boolean pubsubPublish = true; - String mqttTopic = ""; + NucleusEmitterConfigurationBuilder config = NucleusEmitterConfiguration.builder(); + for (Map.Entry entry : pojo.entrySet()) { switch (entry.getKey()) { case PUBSUB_PUBLISH_CONFIG_NAME: @@ -58,7 +63,7 @@ public static NucleusEmitterConfiguration fromPojo(Map pojo, Log logger.error(PUBSUB_PUBLISH_CONFIG_PARSE_ERROR_LOG, entry.getValue()); return null; } - pubsubPublish = parsedBoolean; + config.pubsubPublish(parsedBoolean); break; } else { logger.error(PUBSUB_PUBLISH_CONFIG_PARSE_ERROR_LOG, entry.getValue()); @@ -66,11 +71,12 @@ public static NucleusEmitterConfiguration fromPojo(Map pojo, Log } case TELEMETRY_PUBLISH_INTERVAL_CONFIG_NAME: if (entry.getValue() instanceof Number || entry.getValue() instanceof String) { - telemetryPublishIntervalMs = Coerce.toLong(entry.getValue()); + long telemetryPublishIntervalMs = Coerce.toLong(entry.getValue()); if (telemetryPublishIntervalMs == 0L) { //If value is 0 or non-numeric String logger.error(TELEMETRY_PUBLISH_INTERVAL_CONFIG_PARSE_ERROR_LOG, entry.getValue()); return null; } + config.telemetryPublishIntervalMs(telemetryPublishIntervalMs); break; } else { //If not a Number or String logger.error(TELEMETRY_PUBLISH_INTERVAL_CONFIG_PARSE_ERROR_LOG, entry.getValue()); @@ -78,22 +84,26 @@ public static NucleusEmitterConfiguration fromPojo(Map pojo, Log } case MQTT_TOPIC_CONFIG_NAME: if (entry.getValue() instanceof String) { - mqttTopic = Coerce.toString(entry.getValue()); + config.mqttTopic(Coerce.toString(entry.getValue())); break; } else { logger.error(MQTT_TOPIC_CONFIG_PARSE_ERROR_LOG, entry.getValue()); return null; } + case PUBSUB_TOPIC_CONFIG_NAME: + if (entry.getValue() instanceof String) { + config.pubsubTopic(Coerce.toString(entry.getValue())); + break; + } else { + logger.error(PUBSUB_TOPIC_CONFIG_PARSE_ERROR_LOG, entry.getValue()); + return null; + } default: logger.error(CONFIG_INVALID_OPTION_ERROR_LOG, entry.getKey()); return null; } } - return NucleusEmitterConfiguration.builder() - .pubsubPublish(pubsubPublish) - .mqttTopic(mqttTopic) - .telemetryPublishIntervalMs(telemetryPublishIntervalMs) - .build(); + return config.build(); } -} \ No newline at end of file +} diff --git a/src/main/java/com/aws/greengrass/telemetry/nucleus/emitter/publisher/PubSubPublisher.java b/src/main/java/com/aws/greengrass/telemetry/nucleus/emitter/publisher/PubSubPublisher.java index 52ce0c6..5c93f5e 100644 --- a/src/main/java/com/aws/greengrass/telemetry/nucleus/emitter/publisher/PubSubPublisher.java +++ b/src/main/java/com/aws/greengrass/telemetry/nucleus/emitter/publisher/PubSubPublisher.java @@ -36,9 +36,9 @@ public void publishMessage(String message, String topic) { try { this.pubSubIPCEventStreamAgent.publish(topic, message.getBytes(StandardCharsets.UTF_8), AWS_GREENGRASS_TELEMETRY_NUCLEUS_EMITTER); - logger.trace(PUBSUB_PUBLISH_SUCCESS_LOG); + logger.trace(PUBSUB_PUBLISH_SUCCESS_LOG, topic); } catch (InvalidArgumentsError e) { - logger.error(PUBSUB_PUBLISH_FAILURE_LOG, e); + logger.error(PUBSUB_PUBLISH_FAILURE_LOG, topic, e); } } } diff --git a/src/test/java/com/aws/greengrass/telemetry/nucleus/emitter/NucleusEmitterConfigurationTest.java b/src/test/java/com/aws/greengrass/telemetry/nucleus/emitter/NucleusEmitterConfigurationTest.java index 050166e..9a02446 100644 --- a/src/test/java/com/aws/greengrass/telemetry/nucleus/emitter/NucleusEmitterConfigurationTest.java +++ b/src/test/java/com/aws/greengrass/telemetry/nucleus/emitter/NucleusEmitterConfigurationTest.java @@ -23,6 +23,7 @@ import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.MQTT_TOPIC_CONFIG_PARSE_ERROR_LOG; import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.PUBSUB_PUBLISH_CONFIG_NAME; import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.PUBSUB_PUBLISH_CONFIG_PARSE_ERROR_LOG; +import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.PUBSUB_TOPIC_CONFIG_NAME; import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.TELEMETRY_PUBLISH_INTERVAL_CONFIG_NAME; import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.TELEMETRY_PUBLISH_INTERVAL_CONFIG_PARSE_ERROR_LOG; import static com.aws.greengrass.telemetry.nucleus.emitter.NucleusEmitterConfiguration.fromPojo; @@ -58,6 +59,14 @@ void GIVEN_valid_string_config_options_THEN_parses_correctly() { assertEquals(defaultConfiguration, generatedConfiguration); } + @Test + void GIVEN_valid_nondefault_string_config_options_THEN_parses_correctly() { + Map pojo = new TreeMap<>(); + pojo.put(PUBSUB_TOPIC_CONFIG_NAME,"pubsub"); + NucleusEmitterConfiguration generatedConfiguration = fromPojo(pojo, logger); + assertEquals("pubsub", generatedConfiguration.getPubsubTopic()); + } + @Test void GIVEN_invalid_pubSubPublish_option_THEN_fails() { Map pojo = new TreeMap<>();