diff --git a/core-bus-aws-iot-mqtt/include/ggl/core_bus/aws_iot_mqtt.h b/core-bus-aws-iot-mqtt/include/ggl/core_bus/aws_iot_mqtt.h index 7e1a9d76f..e22714c5e 100644 --- a/core-bus-aws-iot-mqtt/include/ggl/core_bus/aws_iot_mqtt.h +++ b/core-bus-aws-iot-mqtt/include/ggl/core_bus/aws_iot_mqtt.h @@ -35,4 +35,16 @@ GglError ggl_aws_iot_mqtt_subscribe_parse_resp( GglObject data, GglBuffer **topic, GglBuffer **payload ); +/// Wrapper for core-bus `aws_iot_mqtt` `connection_status` +GglError ggl_aws_iot_mqtt_connection_status( + GglSubscribeCallback on_response, + GglSubscribeCloseCallback on_close, + void *ctx, + uint32_t *handle +); + +GglError ggl_aws_iot_mqtt_connection_status_parse( + GglObject data, bool *connection_status +); + #endif diff --git a/core-bus-aws-iot-mqtt/src/aws_iot_mqtt.c b/core-bus-aws-iot-mqtt/src/aws_iot_mqtt.c index a14dfece4..a4d7c0a6b 100644 --- a/core-bus-aws-iot-mqtt/src/aws_iot_mqtt.c +++ b/core-bus-aws-iot-mqtt/src/aws_iot_mqtt.c @@ -102,3 +102,45 @@ GglError ggl_aws_iot_mqtt_subscribe_parse_resp( return GGL_ERR_OK; } + +/// Call this API to subscribe to MQTT connection status. To parse the data +/// received from the subscription, call +/// ggl_aws_iot_mqtt_connection_status_parse function which will return a true +/// for connected and a false for not connected. +/// +/// Note that when a subscription is accepted, the current MQTT status is sent +/// to the subscribers. +GglError ggl_aws_iot_mqtt_connection_status( + GglSubscribeCallback on_response, + GglSubscribeCloseCallback on_close, + void *ctx, + uint32_t *handle +) { + // The GGL subscribe API expects a map. Sending a dummy map. + GglMap args = GGL_MAP(); + return ggl_subscribe( + GGL_STR("aws_iot_mqtt"), + GGL_STR("connection_status"), + args, + on_response, + on_close, + ctx, + NULL, + handle + ); +} + +GglError ggl_aws_iot_mqtt_connection_status_parse( + GglObject data, bool *connection_status +) { + if (data.type != GGL_TYPE_BOOLEAN) { + GGL_LOGE( + "MQTT connection status subscription response is not a boolean." + ); + return GGL_ERR_FAILURE; + } + + *connection_status = data.boolean; + + return GGL_ERR_OK; +} diff --git a/core-bus/include/ggl/core_bus/client.h b/core-bus/include/ggl/core_bus/client.h index 4631e869a..e16f4138b 100644 --- a/core-bus/include/ggl/core_bus/client.h +++ b/core-bus/include/ggl/core_bus/client.h @@ -16,7 +16,7 @@ /// Maximum number of core-bus connections. /// Can be configured with `-DGGL_COREBUS_CLIENT_MAX_SUBSCRIPTIONS=`. #ifndef GGL_COREBUS_CLIENT_MAX_SUBSCRIPTIONS -#define GGL_COREBUS_CLIENT_MAX_SUBSCRIPTIONS 50 +#define GGL_COREBUS_CLIENT_MAX_SUBSCRIPTIONS 100 #endif /// Send a Core Bus notification (call, but don't wait for response). diff --git a/iotcored/src/bus_server.c b/iotcored/src/bus_server.c index 3316ad333..90608759e 100644 --- a/iotcored/src/bus_server.c +++ b/iotcored/src/bus_server.c @@ -18,11 +18,13 @@ static void rpc_publish(void *ctx, GglMap params, uint32_t handle); static void rpc_subscribe(void *ctx, GglMap params, uint32_t handle); +static void rpc_get_status(void *ctx, GglMap params, uint32_t handle); void iotcored_start_server(IotcoredArgs *args) { GglRpcMethodDesc handlers[] = { { GGL_STR("publish"), false, rpc_publish, NULL }, { GGL_STR("subscribe"), true, rpc_subscribe, NULL }, + { GGL_STR("connection_status"), true, rpc_get_status, NULL }, }; size_t handlers_len = sizeof(handlers) / sizeof(handlers[0]); @@ -178,3 +180,26 @@ static void rpc_subscribe(void *ctx, GglMap params, uint32_t handle) { ggl_sub_accept(handle, sub_close_callback, NULL); } + +static void mqtt_status_sub_close_callback(void *ctx, uint32_t handle) { + (void) ctx; + iotcored_mqtt_status_update_unregister(handle); +} + +static void rpc_get_status(void *ctx, GglMap params, uint32_t handle) { + (void) ctx; + (void) params; + + GglError ret = iotcored_mqtt_status_update_register(handle); + if (ret != GGL_ERR_OK) { + ggl_return_err(handle, ret); + return; + } + + ggl_sub_accept(handle, mqtt_status_sub_close_callback, NULL); + + // Send a status update as soon as a subscription is accepted. + iotcored_mqtt_status_update_send( + GGL_OBJ_BOOL(iotcored_mqtt_connection_status()) + ); +} diff --git a/iotcored/src/mqtt.c b/iotcored/src/mqtt.c index 801b2224a..4476c9811 100644 --- a/iotcored/src/mqtt.c +++ b/iotcored/src/mqtt.c @@ -339,6 +339,9 @@ noreturn static void *mqtt_recv_thread_fn(void *arg) { _Exit(1); } + // Send status update to indicate mqtt (re)connection. + iotcored_mqtt_status_update_send(GGL_OBJ_BOOL(true)); + // Send a fleet status update on reconnection if (reconnect) { // Resubscribe to all subscriptions. @@ -379,6 +382,9 @@ noreturn static void *mqtt_recv_thread_fn(void *arg) { (void) MQTT_Disconnect(ctx); iotcored_tls_cleanup(ctx->transportInterface.pNetworkContext->tls_ctx); + // Send status update to indicate mqtt disconnection. + iotcored_mqtt_status_update_send(GGL_OBJ_BOOL(false)); + GGL_LOGE("Removing all IoT core subscriptions"); // Set reconnect flag. Future connections will send a fleet status @@ -505,6 +511,14 @@ GglError iotcored_mqtt_connect(const IotcoredArgs *args) { return GGL_ERR_OK; } +bool iotcored_mqtt_connection_status(void) { + bool connected = false; + if (MQTT_CheckConnectStatus(&mqtt_ctx) == MQTTStatusConnected) { + connected = true; + } + return connected; +} + GglError iotcored_mqtt_publish(const IotcoredMsg *msg, uint8_t qos) { assert(msg != NULL); diff --git a/iotcored/src/mqtt.h b/iotcored/src/mqtt.h index 7ea414056..ded1d04ff 100644 --- a/iotcored/src/mqtt.h +++ b/iotcored/src/mqtt.h @@ -22,6 +22,8 @@ typedef struct { GglError iotcored_mqtt_connect(const IotcoredArgs *args); +bool iotcored_mqtt_connection_status(void); + GglError iotcored_mqtt_publish(const IotcoredMsg *msg, uint8_t qos); GglError iotcored_mqtt_subscribe( GglBuffer *topic_filters, size_t count, uint8_t qos diff --git a/iotcored/src/subscription_dispatch.c b/iotcored/src/subscription_dispatch.c index 03df854ae..c75bdcc05 100644 --- a/iotcored/src/subscription_dispatch.c +++ b/iotcored/src/subscription_dispatch.c @@ -34,6 +34,9 @@ static uint32_t handles[IOTCORED_MAX_SUBSCRIPTIONS]; static uint8_t topic_qos[IOTCORED_MAX_SUBSCRIPTIONS]; static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; +static uint32_t mqtt_status_handles[IOTCORED_MAX_SUBSCRIPTIONS]; +static pthread_mutex_t mqtt_status_mtx = PTHREAD_MUTEX_INITIALIZER; + static GglBuffer topic_filter_buf(size_t index) { return ggl_buffer_substr( GGL_BUF(sub_topic_filters[index]), 0, topic_filter_len[index] @@ -142,6 +145,37 @@ void iotcored_mqtt_receive(const IotcoredMsg *msg) { } } +GglError iotcored_mqtt_status_update_register(uint32_t handle) { + GGL_MTX_SCOPE_GUARD(&mqtt_status_mtx); + for (size_t i = 0; i < IOTCORED_MAX_SUBSCRIPTIONS; i++) { + if (mqtt_status_handles[i] == 0) { + mqtt_status_handles[i] = handle; + return GGL_ERR_OK; + } + } + return GGL_ERR_NOMEM; +} + +void iotcored_mqtt_status_update_unregister(uint32_t handle) { + GGL_MTX_SCOPE_GUARD(&mqtt_status_mtx); + for (size_t i = 0; i < IOTCORED_MAX_SUBSCRIPTIONS; i++) { + if (mqtt_status_handles[i] == handle) { + mqtt_status_handles[i] = 0; + return; + } + } +} + +void iotcored_mqtt_status_update_send(GglObject status) { + GGL_MTX_SCOPE_GUARD(&mqtt_status_mtx); + + for (size_t i = 0; i < IOTCORED_MAX_SUBSCRIPTIONS; i++) { + if (mqtt_status_handles[i] != 0) { + ggl_respond(mqtt_status_handles[i], status); + } + } +} + void iotcored_re_register_all_subs(void) { GGL_MTX_SCOPE_GUARD(&mtx); diff --git a/iotcored/src/subscription_dispatch.h b/iotcored/src/subscription_dispatch.h index fd6220b20..1af8d04d6 100644 --- a/iotcored/src/subscription_dispatch.h +++ b/iotcored/src/subscription_dispatch.h @@ -7,6 +7,7 @@ #include #include +#include #include #include @@ -18,4 +19,10 @@ void iotcored_unregister_subscriptions(uint32_t handle); void iotcored_re_register_all_subs(void); +GglError iotcored_mqtt_status_update_register(uint32_t handle); + +void iotcored_mqtt_status_update_unregister(uint32_t handle); + +void iotcored_mqtt_status_update_send(GglObject status); + #endif