Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add publish callback #263

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

shaileshahuja
Copy link

Attempt 2 to get feedback on the logic.

Primarily, instead of the client creating the channel to receive the publish, I am making it at the autopaho level to ensure the channel can survive reconnects.

TODO:

  1. I wouldn't say I like mixing channels with callback, but I will think about how to merge them
  2. Maybe there's a better way than exposing ProcessPublishResponse
  3. Better naming
  4. Tests

@shaileshahuja shaileshahuja marked this pull request as draft August 14, 2024 08:38
@MattBrittan
Copy link
Contributor

A few points from a really quick look:

  • Please don't change published method definitions unless it's unavoidable (I added PublishOptions so that things could be added to this without impacting existing users code). Note: I'm guessing this is your intent; the code seems incomplete.
  • The QueuePublish in PublishViaQueue has not yet been assigned a packet ID (that happens when the publish message is actually sent - remember that more than 65535 messages can be queued).
  • ret = o.AsyncCompleteChan - Looks like you end up with two recieved on the same channel.

I'm not going to have time to fully review this in its current state (I find it often takes as long to review code as it does to write it, so will await something that you feel is ready for submission or where you have a specific question). You appear to have a decent general approach, but it needs some more work (and tests). Unfortunately with this kind of change I've found that you will find issues as you go (you probably don't understand the entirety of the requirement yet) and tests are essential (hopefully the tests I've already written will provide a decent starting point and highlight some areas where there have been issues in the past).

@shaileshahuja
Copy link
Author

shaileshahuja commented Aug 16, 2024

Thanks @MattBrittan.

The QueuePublish in PublishViaQueue has not yet been assigned a packet ID (that happens when the publish message is actually sent - remember that more than 65535 messages can be queued).

Thank you for reiterating this point. I also noticed that the queue supports both memory and file persistence, and the enqueue/dequeue process involves serialization and deserialization. I realize that I cannot serialize the callback function, which means it will have to stay in memory. This creates a limitation for such a callback system.
I also need a way to map the callbacks to the queued messages. One idea is to assign a UUID and serialize it along with the message. However, this approach would require us to create a new struct with its own serialization and deserialization logic. Are you open to this approach, or do you think there's a better way to handle this?

type QueuePublish struct {
	uuid       string      // assigned automatically
	QoS        byte
	Retain     bool
	Topic      string
	Properties *PublishProperties
	Payload    []byte
}

@MattBrittan
Copy link
Contributor

MattBrittan commented Aug 17, 2024

I also need a way to map the callbacks to the queued messages. One idea is to assign a UUID and serialize it along with the message. However, this approach would require us to create a new struct with its own serialization and deserialization logic. Are you open to this approach, or do you think there's a better way to handle this?

This is why I did not implement the callbacks at the time persistance was implemented, it gets a bit trickey (and I don't think there is a perfect solution). Assigning a uuid or similar is one approach (but this could be something like

type QueuePublishStore struct {
	uuid       string      // assigned automatically
	*QueuePublish
}

). If we take this approach I'd also suggest adding a version flag as the first byte (existing packets should have a 3 in the top 4 bits) as this will enable non-breaking changes in the future.

However I wonder if it might make more sense to pass this off to the queue implementation (for example, if passed a unique ID the file store could use the ID to name the file). This would have the advantage of enabling additional functionality in the future (e.g. a remove message from queue type option) and might be useful if a database queue is added (as the ID could be stored seperatly from the packet). My thought here is that the queue could optionally implement something like:

// Entry - permits access to a queue entry
// Users must call one of Leave, Remove, or Quarantine when done with the entry (and before calling Peek again)
// `Reader()` must not be called after calling Leave, Remove, or Quarantine (and any Reader previously requestes should be considered invalid)
type EntryWithID interface {
	Reader() (uint64, io.Reader, error) // Provides access to the queue ID, file contents, subsequent calls may return the same reader
	Leave() error               // Leave the entry in the queue (same entry will be returned on subsequent calls to Peek).
	Remove() error              // Remove this entry from the queue. Returns queue.ErrEmpty if queue is empty after operation
	Quarantine() error          // Flag that this entry has an error (remove from queue, potentially retaining data with error flagged)
}

// Queue provides the functionality needed to manage queued messages
type QueueWithID interface {
	// Wait returns a channel that is closed when there is something in the queue (will return a closed channel if the
	// queue is empty at the time of the call)
	Wait() chan struct{}

	// Enqueue add item to the queue.
	Enqueue(uniqueID uint64, p io.Reader) error

	// Peek retrieves the oldest item from the queue without removing it
	// Users must call one of Close, Remove, or Quarantine when done with the entry, and before calling Peek again.
	// Warning: Peek is not safe for concurrent use (it may return the same Entry leading to unpredictable results)
	Peek() (EntryWithID, error)
}

This could be in addition to the current queue (so maintaining compatibility - the callback would be called with an error if the queue does not support ID handling) or replace the current queues (I doubt many users have implemented custom queues and we are still pre v1 so breaking changes are OK).

Note: This is not something I have really thought through in detail (chose a uint64 rather than a string because that would be eaisied to encode for use in a filename etc and should be big enough to ensure uniqueness).

@shaileshahuja
Copy link
Author

shaileshahuja commented Aug 18, 2024

If we take this approach I'd also suggest adding a version flag as the first byte (existing packets should have a 3 in the top 4 bits) as this will enable non-breaking changes in the future.

Can you explain more? I could not find any related code where 3 is added in the top 4 bits.

This would have the advantage of enabling additional functionality in the future

I believe this can be done as a follow-up when such functionality is needed. Not doing it now does not close the door for the future, so it is fine to decouple from the current effort.

@MattBrittan
Copy link
Contributor

Can you explain more? I could not find any related code where 3 is added in the top 4 bits.

The first byte will be a "PUBLISH Fixed Header" (as publish messages are the only thring written to the queue).

I believe this can be done as a follow-up when such functionality is needed.

Correct - but it is nice to try an design things so it is possible to add functionality later without breaking current programs.

@shaileshahuja
Copy link
Author

shaileshahuja commented Aug 21, 2024

Do you prefer to add QueuePublish as a packet type in packets.go to re-use the ControlPacket struct, or create a separate encoding / decoding logic (while still using the Packet interface though)?

I recommend keeping it separate as it is not sent via MQTT protocol, and the same format does not apply. We can have more freedom in the format, for example, add a version number as the first byte instead of trying to write hacky code to make FixedHeader work. It will also avoid adding non-MQTT packets to the ControlPacket methods. The trade-off is there is some duplicate code with PUBLISH packet.

@MattBrittan
Copy link
Contributor

Users should be able to use github.com/eclipse/paho.golang/paho as a base MQTT library (i.e. without autopaho). As such packets/packets.go should ony handle functionality needed to implement the MQTT protocol handling a queue is an implementation detail of autopaho, so I see it as quite seperate). Of course sometimes modifications will be needed in paho to enable wrappers like autopaho to meet their goals (but any such modifications should not be tied to autopaho).

As mentioned above my preference would be that the location the ID is stored be handled by the queue store (for example the filestore could use the ID as a filename rather than it being stored within the file).

@shaileshahuja
Copy link
Author

Created #264 to implement the UUID in queue store.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants