-
Notifications
You must be signed in to change notification settings - Fork 0
/
MQTTPublisher.lf
230 lines (200 loc) · 9.55 KB
/
MQTTPublisher.lf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
/**
* Reactor that publishes strings (or arbitrary byte arrays cast to `char*`) to a specified MQTT
* topic.
*
* This publisher ensures in-order delivery messages to subscribers. If an attempt is made to send a
* message before the delivery of the previous message has completed, then the reaction that sends
* the message (the reaction to an input 'in') will block until the previous delivery has completed.
*
* If `include_timestamp` is true (the default is `false`), then two things happen:
*
* 1. The publisher ensures that the message is null terminated by adding a null terminator if
* needed. This ensures that the message can be treated as a string at the receiving end. 2. The
* publisher appends to the end of the message (after the null terminator) a magic string "LFts"
* followed by the current logical time at which the publishing occurs. If the `relative_timestamp`
* parameter is true (the default is `false`), then the logical time is relative to the start time
* of the program.
*
* This can be useful if the receiving end will be an instance of `MQTTSubscriber` in another Lingua
* Franca program. Note that `include_timestamp` *must* be true if an `MQTTSubcriber` that
* subscribes to this topic has its `use_physical_time` parameter set to false (its default is
* `true`). Otherwise, the subscriber will issue a warning.
*
* @param topic The topic name to which to publish.
* @param address The IP address of the MQTT broker.
* @param include_timestamp If true, then append the current logical time to the message.
* @param relative_timestamp If true, then the timestamp is relative to the start time of the
* program.
* @param timeout Timeout for completion of message sending in milliseconds.
* @see MQTTSubscriber.
*
* @author Ravi Akella
* @author Edward A. Lee
*/
target C {
cmake-include: [
"../include/paho-extension.cmake",
// For encode_int64()
"../include/net_utils.cmake"]
}
preamble {=
#ifndef MQTT_PUBLISHER_H
#define MQTT_PUBLISHER_H
#include "platform.h" // Defines lf_critical_section_enter(), etc.
#include <string.h> // Defines memcpy
#include "MQTTClient.h"
#include "core/federated/network/net_util.h"
// Struct type used to keep track of messages in flight between reactions.
typedef struct inflight_t {
bool message_in_flight;
MQTTClient_deliveryToken delivery_token;
char* message;
} inflight_t;
#endif // MQTT_PUBLISHER_H
=}
reactor MQTTPublisher(
topic: string = "DefaultTopic",
address: string = "tcp://localhost:1883",
include_timestamp: bool = false,
relative_timestamp: bool = false,
timeout: time = 10 sec) {
preamble {=
// Count of instances of this reactor so that unique client IDs are generated.
static size_t _lf_MQTTPublisher_count = 0;
// Connection options for the client.
// Making this global means that all instances of this reactor have
// the same connection options.
MQTTClient_connectOptions pub_connect_options = MQTTClient_connectOptions_initializer;
// Callback invoked once delivery is complete.
void pub_delivered(void *inflight, MQTTClient_deliveryToken dt) {
LF_PRINT_LOG("MQTTPublisher: Message with token value %d delivery confirmed\n", dt);
((inflight_t*)inflight)->message_in_flight = false;
free(((inflight_t*)inflight)->message);
((inflight_t*)inflight)->delivery_token = 0;
((inflight_t*)inflight)->message = NULL;
}
// Callback invoked if the connection is lost.
void pub_connection_lost(void *context, char *cause) {
lf_print_error("\nMQTTPublisher: Connection lost. Cause: %s\n", cause);
}
=}
/**
* Input type char* instead of string is used for dynamically allocated character arrays (as
* opposed to static constant strings).
*/
input in: char*
/** State variable that keeps track of a message in flight. */
state inflight: inflight_t = {= {false, 0, NULL} =}
/** Client ID. This is automatically generated. */
state clientID: char* = {= NULL =}
/** The client object. */
state client: MQTTClient = {= NULL =}
/** The message object. */
state mqtt_msg: MQTTClient_message = {= MQTTClient_message_initializer =}
/** Connect to the broker. Exit if this fails. */
reaction(startup) {=
// In case there are multiple instances of this or the subscriber, enter
// a critical section. The Paho MQTT functions are not thread safe.
lf_critical_section_enter(self->base.environment);
// Create a unique ID.
if (asprintf(&self->clientID, "LF_MQTTPublisher_%zu", _lf_MQTTPublisher_count++) < 0) {
lf_print_error_and_exit("MQTTPublisher: Failed to create client ID.");
}
MQTTClient_create(&self->client, self->address, self->clientID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
pub_connect_options.keepAliveInterval = 20;
pub_connect_options.cleansession = 1;
// Set up callback functions.
// Second to last argument should be a pointer to a function
// to handle notification of delivery of a message.
// But this reactor isn't sending any messages.
// Second argument is a pointer to context that will be passed to pub_delivered,
// which in this case is a pointer to the inflight state variable.
MQTTClient_setCallbacks(self->client, &self->inflight, pub_connection_lost, NULL, pub_delivered);
// Connect to the broker.
int rc; // response code.
if ((rc = MQTTClient_connect(self->client, &pub_connect_options)) != MQTTCLIENT_SUCCESS) {
lf_print_error_and_exit("MQTTPublisher: Failed to connect to MQTT broker.\n"
"Perhaps one is not running? Return code: %d", rc);
}
lf_critical_section_exit(self->base.environment);
LF_PRINT_LOG("MQTTPublisher: connected to broker.");
=}
/**
* React to an input by sending a message with the value of the input as the payload. If delivery
* has not yet completed for a previously sent message, then wait for it to complete before
* proceeding (blocking this reaction). This copies the message from the input into a buffer, so
* the input can freed upon return from this reaction.
*/
reaction(in) {=
// In case there are multiple instances of this or the subscriber, enter
// a critical section. The Paho MQTT functions are not thread safe.
lf_critical_section_enter(self->base.environment);
if(self->inflight.message_in_flight) {
// Wait for message delivery to be complete.
LF_PRINT_LOG("MQTTPublisher: Waiting for confirmation of publication of previous message");
int rc = MQTTClient_waitForCompletion(
self->client, self->inflight.delivery_token, self->timeout
);
if (rc != MQTTCLIENT_SUCCESS) {
lf_print_error("MQTTPublisher: Message delivery failed with error code %d.\n", rc);
lf_print_error(" Message: %s\n", in->value);
lf_print_error(" On topic '%s' for publisher with ClientID: %s\n", self->topic, self->clientID);
}
}
LF_PRINT_LOG("MQTTPublisher: Publishing message: %s", in->value);
LF_PRINT_LOG("MQTTPublisher: on topic '%s' for publisher with ClientID: %s", self->topic, self->clientID);
// Allocate memory for a copy of the message.
// The default length is just the length of the incoming message.
int length = in->length;
// If a timestamp is to be included, the length is bigger.
if (self->include_timestamp) {
// If the input message is not null terminated, then add a null terminator.
if (in->value[in->length - 1] != '\0') length++;
// Allow space (4 bytes) for the magic string "LFts".
length += 4 + sizeof(instant_t);
}
self->inflight.message = (char*) malloc(sizeof(char) * length);
memcpy(self->inflight.message, in->value, in->length);
// Append null terminator and timestamp, if appropriate.
if (self->include_timestamp) {
// If the input message is not null terminated, then add a null terminator.
if (in->value[in->length - 1] != '\0') {
self->inflight.message[in->length] = '\0';
// Add magic string.
memcpy(&self->inflight.message[in->length + 1], "LFts", 4);
} else {
// Add magic string.
memcpy(&self->inflight.message[in->length], "LFts", 4);
}
// Append the current timestamp to the message.
instant_t timestamp = (self->relative_timestamp) ? lf_time_logical_elapsed() : lf_time_logical();
encode_int64(timestamp,
(unsigned char*)(self->inflight.message + length - sizeof(instant_t))
);
LF_PRINT_LOG("MQTTPublisher: Timestamp (elapsed) of sending message: " PRINTF_TIME,
timestamp - lf_time_start()
);
}
self->mqtt_msg.payload = self->inflight.message;
self->mqtt_msg.payloadlen = length;
// QoS 2 means that the message will be delivered exactly once.
self->mqtt_msg.qos = 2;
// Retained messages are held by the server and sent to future new subscribers.
// Specify that this message should not be retained.
// It will be sent only to subscribers currently subscribed.
self->mqtt_msg.retained = 0;
MQTTClient_publishMessage(self->client, self->topic, &self->mqtt_msg, &self->inflight.delivery_token);
self->inflight.message_in_flight = true;
lf_critical_section_exit(self->base.environment);
// It is not clear why the following is needed, but the message
// does not go out until the next invocation without it.
MQTTClient_yield();
=}
/** Disconnect the client. */
reaction(shutdown) {=
LF_PRINT_LOG("MQTTPublisher: Client ID %s disconnecting.", self->clientID);
if (self->clientID) free(self->clientID);
MQTTClient_disconnect(self->client, 10000);
MQTTClient_destroy(&self->client);
=}
}