Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Florimond committed Oct 15, 2024
1 parent 9ec673d commit bf5c7a2
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 10 deletions.
2 changes: 0 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
"type": "go",
"request": "launch",
"mode": "debug",
"remotePath": "",
"port": 2345,
"host": "127.0.0.1",
"program": "${fileDirname}",
"env": {},
Expand Down
1 change: 1 addition & 0 deletions emitter.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"license": "PfA8IFFbf_BHbe8gjlq7e2E7fHb26zdEP9OtQuxONRWlmRiaEMVZzVTm1KOGzNVmlXJ6BxGE5ZDWBNT36-QOAQ:3",
"listen": ":8080",
"tls": {
"listen": ":443"
Expand Down
17 changes: 17 additions & 0 deletions internal/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ func (m *Message) Size() int64 {
return int64(len(m.Payload))
}

// TotalSize returns the total byte size of the message, including the ID and channel.
func (m *Message) TotalSize() int64 {
return int64(len(m.Payload) + len(m.ID) + len(m.Channel))
}

// Time gets the time of the key, adjusted.
func (m *Message) Time() int64 {
return m.ID.Time()
Expand Down Expand Up @@ -145,6 +150,18 @@ func (f *Frame) Limit(n int) {
}
}

// LimitPayloadSize limits the payload size of the frame.
func (f *Frame) LimitPayloadSize(frame Frame, maxPayloadSize int64) {
var sum int64
for i := 0; i < len(frame); i++ {
sum += frame[i].TotalSize()
if sum >= maxPayloadSize {
*f = frame[:i]
return
}
}
}

// Encode encodes the message frame
func (f *Frame) Encode() []byte {

Expand Down
12 changes: 10 additions & 2 deletions internal/network/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,16 @@ import (
)

const (
maxHeaderSize = 6 // max MQTT header size
MaxMessageSize = 65536 // max MQTT message size is impossible to increase as per protocol (uint16 len)
maxHeaderSize = 6 // max MQTT header size

/* Original comment: "[65536] max MQTT message size is impossible to increase as per protocol (uint16 len)"
This is not true according the MQTT 3.1.1 spec, the max size is 256MB.
Official spec: https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
Also see this article for simplified explanation: https://www.hivemq.com/blog/mqtt-essentials-part-6-mqtt-quality-of-service-levels/
However, as noted in most sources, brokers could and should impose a much sensible limit.
65536 is the original limit in Emitter and seems sensible. This number will be used as the payload limit.
*/
MaxMessageSize = 65536
)

// ErrMessageTooLarge occurs when a message encoded/decoded is larger than max MQTT frame.
Expand Down
12 changes: 9 additions & 3 deletions internal/provider/storage/ssd.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ func (s *SSD) Query(ssid message.Ssid, from, until time.Time, startFromID messag
}
}

match.Limit(limit)
match.Limit(limit) // Limit the number of messages.
match.LimitPayloadSize(match, mqtt.MaxMessageSize)
return match, nil
}

Expand Down Expand Up @@ -196,7 +197,7 @@ func (s *SSD) lookup(q lookupQuery) (matches message.Frame) {
it.Next()
}

matchesSize := 0
var matchesSize int64 = 0
// Seek the prefix and check the key so we can quickly exit the iteration.
for ; it.Valid() &&
message.ID(it.Item().Key()).HasPrefix(q.Ssid, q.From) &&
Expand All @@ -211,7 +212,12 @@ func (s *SSD) lookup(q lookupQuery) (matches message.Frame) {
continue
}

if matchesSize += len(msg.Payload) + len(msg.ID) + len(msg.Channel); matchesSize > mqtt.MaxMessageSize {
// MaxMessageSize is the maximum size of the payload of an MQTT message in Emitter. See comment on mqtt.MaxMessageSize.

// STILL BUGGY: messages are ingested based on the size of the payload, not including the size of the topic... REALLY?
// Technically, this algorithms would try to send a message longer than the maximum size of the Payload as defined in mqtt.MaxMessageSize
// This means, through History, you might not be able to retrieve a message whose Payload is still within the maximum size of the MQTT message.
if matchesSize += msg.TotalSize(); matchesSize > mqtt.MaxMessageSize {
break
}

Expand Down
10 changes: 7 additions & 3 deletions internal/service/history/history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package history
import (
"crypto/rand"
"encoding/json"
"fmt"
"testing"

"github.com/emitter-io/emitter/internal/message"
Expand Down Expand Up @@ -171,15 +172,18 @@ func TestSumOfTwoExceedMaxSize(t *testing.T) {
}

// Store 2 messages
randomBytes := make([]byte, int(mqtt.MaxMessageSize/2))
rand.Read(randomBytes)
firstSSID := message.NewID(ssid)
store.Store(&message.Message{
fmt.Println(int(mqtt.MaxMessageSize - len(firstSSID) - len("test/") - 1)) // KEYSIZE???
//randomBytes := make([]byte, int(mqtt.MaxMessageSize-len(firstSSID)-len("a/b/c/")-1)) // BUG: MaxMessageSize represents the maximum size of the payload, but the message is composed of the ID, the channel size and the payload.
randomBytes := make([]byte, int(mqtt.MaxMessageSize))
rand.Read(randomBytes)
err := store.Store(&message.Message{
ID: firstSSID,
Channel: []byte("a/b/c/"),
Payload: randomBytes,
TTL: 30,
})
assert.NoError(t, err)
store.Store(&message.Message{
ID: message.NewID(ssid),
Channel: []byte("a/b/c/"),
Expand Down

0 comments on commit bf5c7a2

Please sign in to comment.