-
Notifications
You must be signed in to change notification settings - Fork 1
/
mqtt_port.c
146 lines (127 loc) · 4.5 KB
/
mqtt_port.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/* FreeRTOS includes. */
#include "FreeRTOS.h"
#include "task.h"
/* FreeRTOS+TCP includes. */
#include "FreeRTOS_IP.h"
#include "FreeRTOS_Sockets.h"
#include "MQTT/mqtt.h"
/*-----------------------------------------------------------*/
int mqtt_write(struct mqtt_context* mqtt, uint8_t* ptr, int32_t len)
{
/* Send the string to the socket. */
return FreeRTOS_send(*(Socket_t*)mqtt->network_tag, /* The socket being sent to. */
(void*)ptr, /* The data being sent. */
len, /* The length of the data being sent. */
0); /* No flags. */
}
int mqtt_read(struct mqtt_context* mqtt, uint8_t* ptr, int32_t len)
{
int32_t xReceivedBytes = 0;
int xReturned;
while (xReceivedBytes < len)
{
xReturned = FreeRTOS_recv(*(Socket_t*)mqtt->network_tag, /* The socket being received from. */
ptr + xReceivedBytes, /* The buffer into which the received data will be written. */
len - xReceivedBytes, /* The size of the buffer provided to receive the data. */
0); /* No flags. */
configASSERT(xReturned >= 0);
if (xReturned == 0)
{
/* Timed out. We must now exit*/
break;
}
else
{
/* Keep a count of the bytes received so far. */
xReceivedBytes += xReturned;
}
}
return xReceivedBytes;
}
/*
* What follows is an example of how a MQTT packet processor could be built as a middle layer.
* This layer will receive MQTT messages (PUBLISH and others) from the network stream using
* the read function and route the received messages to the appropriate processor.
* While PUBLISH messages are routed based on their Topic according to a lookup table, other
* messages are routed by message type as contained in the header.
*
*/
/* Prototype function pointer for callbacks */
typedef void (*processPacketFn_t)(uint8_t* data, int32_t len);
/* Two example functions to show how processing could happen in upstream modules.
* The function pointers could be stored statically as application level configuration or
* be injected by the upstream modules at runtime by choice of the application.
*/
void topic1Function(uint8_t* data, int32_t len)
{
FreeRTOS_debug_printf(("Topic 1 data : %s\r\n", data));
}
void topic2Function(uint8_t* data, int32_t len)
{
FreeRTOS_debug_printf(("Topic 2 data : %s\r\n", data));
}
/* Create a routing table for topic data (Topics here are NOT Topic filters with wildcards! */
struct processingTable {
char topicName[32]; /* Topic name to route */
processPacketFn_t fn; /* Function for processing this topic */
} processingTable[2] = {
{"MyTopic", topic1Function},
{"OtherTopic", topic2Function}
};
/* Function to process and route packets. This function transforms the stream from TCP into
* a packet that lives in a statically allocated buffer
*/
int mqtt_processPacket(struct mqtt_context* tag, struct mqtt_header* header)
{
int status = MQTT_ERROR;
static uint8_t buffer[128];
if (header->remainingLength > 128)
{
return MQTT_ERROR;
}
// First always clear the buffer out
memset(buffer, 0, sizeof(buffer));
// Process our pubish packet. We could use a lookup table here to route by topic.
if (header->type == MQTT_PACKET_TYPE_PUBLISH)
{
uint16_t topicLength;
// Read the entire packet into the processing buffer
if (mqtt_read(tag, buffer, header->remainingLength) == header->remainingLength)
{
topicLength = (buffer[0] << 8) + buffer[1];
// If we get a valid topic length which does not excceed the packet length
if (topicLength < header->remainingLength - 2)
{
// In case we do not find it
// Route the packet to the right function
for (int i = 0; i < sizeof(processingTable) / sizeof(processingTable[0]); i++)
{
if (strncmp(&buffer[2], processingTable[i].topicName, topicLength) == 0)
{
// We found a match, process it!
status = MQTT_SUCCESS;
processingTable[i].fn(&buffer[topicLength + 2], header->remainingLength - topicLength - 2);
}
}
if (status == MQTT_ERROR)
{
// No match, just print it out
FreeRTOS_debug_printf(("Unprocessed Publish : %s\r\n", &buffer[topicLength + 2]));
}
}
}
}
else if ((header->type == MQTT_PACKET_TYPE_SUBACK) ||
(header->type == MQTT_PACKET_TYPE_UNSUBACK))
{
// Just read the data and ignore. For SUBACK it has packet ID and QOS values, for UNSUBACK just ID
mqtt_read(tag, buffer, header->remainingLength);
status = MQTT_SUCCESS;
}
else // Just dump all other packets for now
{
mqtt_read(tag, buffer, header->remainingLength);
status = MQTT_SUCCESS;
}
return status;
}