-
Notifications
You must be signed in to change notification settings - Fork 80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add MQTT dispatcher #220
base: main
Are you sure you want to change the base?
Add MQTT dispatcher #220
Conversation
OMG, watching intently :) |
datastore/mqtt/mqtt.go
Outdated
var tokens []pahomqtt.Token | ||
|
||
// Publish each field to the relevant topic on the MQTT broker | ||
for key, value := range convertedPayload { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of sending each payload attribute individually, why not send it as a single message and keep the implmentation as other datastores?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For MQTT topic structuring, it is important to maintain clarity and scalability, especially when dealing with dynamic datasets such as fleet telemetry and event logs.
Sending individual metrics over MQTT rather than combining them in a single topic has several advantages:
- Granular control: Individual topics allow for more precise subscription and filtering. Clients can subscribe only to the specific metrics they need, reducing unnecessary network traffic and processing. A Power Exchange (https://pex.energy) will likely subscript to SoC fields, and a Home Assistant (https://www.home-assistant.io/) might subscribe to location data.
- Easier parsing and debugging: With separate topics, the payload structure for each metric can be simpler and more consistent. This makes parsing and processing easier on the receiving end.
- Scalability: As the system grows, it's easier to add new metrics without modifying existing payload structures or breaking existing subscribers.
- Retention policies: MQTT brokers often allow setting different retention policies per topic, giving you more control over how long each metric's data is stored.
Here is a screenshot from MQTT Explorer demonstrating a clear and scalable topic structure:
} | ||
|
||
// Only process reliable ACK if no errors were reported | ||
if !publishError { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah this is tricky with the things you have correctly set up. you are asking the vehicle to resend the whole data if partial uploads to mqtt failed. I think we should just attempt to send the whole payload
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my comment above which explains why the MQTT topic structure is important. Topics are not the same as raw packets from the car.
The code hase been updated with the issues mentioned in the comments above.
|
Add detailed MQTT README.md explaining architecture, design choices, and configuration |
config/config.go
Outdated
@@ -312,6 +316,17 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l | |||
producers[telemetry.ZMQ] = zmqProducer | |||
} | |||
|
|||
if _, ok := requiredDispatchers[telemetry.MQTT]; ok { | |||
if c.MQTT == nil { | |||
return nil, errors.New("Expected MQTT to be configured") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
golang errors usually starts with lowercase :-
errors.New("expected MQTT to be configured")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All other related errors in the same method use a capital letter:
"Expected ZMQ/Kinesis/PubSub/Kafka/MQTT to be configured"
Maybe it is better if someone creates a separate pull request to fix all error messages?
(and then maybe also have a look at capital letters used in errors.py)
SetKeepAlive(time.Duration(config.KeepAlive) * time.Second) | ||
|
||
client := PahoNewClient(opts) | ||
client.Connect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to do any error handling here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConnectRetry is set. So, the client.Connect() might fail, but that's not a problem because the Paho client will keep trying to connect.
This will make sure a telemetry container can start even if a configured MQTT broker is not online.
@erwin314 is there something I can help with to get this fixed? |
Thanks for reaching out and offering to help! The code was working, but it's been in pull-request mode for a while now. To move forward, perhaps @agbpatro, @aaronpkahn, @ThomasAlxDmy, or @patrickdemers6 could provide feedback on whether this code has a chance of being merged. If there's potential, I’ll update it so it passes all tests again. If merging isn’t feasible, I’m happy to create a dedicated fork focused solely on MQTT. |
Hi everyone, I've updated the code, and it should now pass all the tests. However, I'm still unsure if investing my time in this is worthwhile, as it's unclear whether it has a potential for being merged. |
There is 1 suggestion I would like to make, currently you return a json object for each topic, I would rather see the value directly. Currently:
Better:
Why do you put the value inside the key value? |
Thank you, this is awesome. We should definitely get it in. |
Thank you for the suggestion and for taking the time to review my pull request! I appreciate your feedback. If the system evolves to include additional features or richer data, using JSON ensures that existing consumers remain compatible as long as they continue extracting the known keys. In contrast, returning direct values can make maintaining backward compatibility more difficult. By consistently returning a JSON object, we standardize the data format across all topics, regardless of the type or complexity of the value. This approach allows consumers to parse messages using uniform logic, reducing the likelihood of errors. If some topics return raw values while others return structured data, client-side code must accommodate both cases, leading to increased complexity. Additionally, JSON maintains the data type of the value, helping to avoid misinterpretation. For example, While it’s true that consumers need to perform an extra step to extract the value, this processing is typically negligible in practice. Parsing JSON is a lightweight operation, supported natively by nearly all programming languages and platforms. On the other hand, returning raw values requires consumers to handle data differently for each topic, which increases the overall complexity of the client-side implementation. That said, I’m open to further discussion if you feel strongly about this point or have alternative suggestions that might achieve the same benefits with less overhead. |
Thanks for the positive feedback—glad to hear there’s interest in getting this merged! Could you provide some guidance on the steps needed to move this forward? |
Thanks for taking the time to answer.
Well, actually that is a secondary problem... It is a Boolean and because you use a transformer from the logger it is casted back to a string, see here. I agree with your explanation that datatypes should be consistent. But there is actually no need to always include a json with the Example in python: >>> import json
>>> response='{"value": true}'
>>> json.loads(response)
{'value': True}
>>> response='true'
>>> json.loads(response)
True As you can see both are correctly parsed as a boolean. So no need to wrap the json in the Just to make sure you see what I mean let's do it again but this time not as a boolean but as a string >>> import json
>>> response='{"value": "true"}'
>>> json.loads(response)
{'value': 'true'}
>>> response='"true"'
>>> json.loads(response)
'true' So also a string is correctly parsed as a string. I would suggest to write your json as follows, that will write the json correctly, also with the correct types ( bool, string, int, float ... ) diff --git a/datastore/mqtt/mqtt_payload.go b/datastore/mqtt/mqtt_payload.go
index 3692c3e..595a9b9 100644
--- a/datastore/mqtt/mqtt_payload.go
+++ b/datastore/mqtt/mqtt_payload.go
@@ -19,7 +19,7 @@ func (p *MQTTProducer) processVehicleFields(rec *telemetry.Record, payload *prot
continue
}
mqttTopicName := fmt.Sprintf("%s/%s/v/%s", p.config.TopicBase, rec.Vin, key)
- jsonValue, err := json.Marshal(map[string]interface{}{"value": value})
+ jsonValue, err := json.Marshal(value)
if err != nil {
return tokens, fmt.Errorf("failed to marshal JSON for MQTT topic %s: %v", mqttTopicName, err)
} Here is a screenshot of how that looks in the MQTT browser: So to summarize:
|
|
@erwin314 it seems you are right! It seems Tesla is sending it as string, so that's correct, I played with the raw data and saw indeed that it is working as intended ( But I think that's a broken implementation from Tesla since everything seems to be a string) An other small thing, what do you think of this: --- a/datastore/mqtt/mqtt_payload.go
+++ b/datastore/mqtt/mqtt_payload.go
@@ -15,11 +15,11 @@ func (p *MQTTProducer) processVehicleFields(rec *telemetry.Record, payload *prot
var tokens []pahomqtt.Token
convertedPayload := transformers.PayloadToMap(payload, false, p.logger)
for key, value := range convertedPayload {
- if key == "Vin" || key == "CreatedAt" {
+ if key == "Vin" || key == "CreatedAt" || value == "<invalid>" { My reasoning is, if the value is invalid then don't publish it... |
var err error | ||
|
||
switch payload := payload.(type) { | ||
case *protos.Payload: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Description
This PR add's a MQTT dispatcher. Sending Tesla EV fleet telemetry (e.g., SoC) to an MQTT broker provides real-time, lightweight, and efficient data streaming. MQTT's publish-subscribe model scales well for growing fleets, enabling seamless integration with IoT platforms and data analytics systems for improved monitoring and management.
Type of change
Please select all options that apply to this change:
Checklist:
Confirm you have completed the following steps: