Skip to content

Commit

Permalink
Pinning and Unpinning of channel messages (#293)
Browse files Browse the repository at this point in the history
  • Loading branch information
texuf authored Jun 27, 2024
1 parent a145ae4 commit 5ba3a27
Show file tree
Hide file tree
Showing 15 changed files with 1,623 additions and 965 deletions.
13 changes: 13 additions & 0 deletions core/node/events/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,19 @@ func update_Snapshot_Channel(iSnapshot *Snapshot, channelPayload *ChannelPayload
return RiverError(Err_INVALID_ARGUMENT, "cannot update blockheader with inception event")
case *ChannelPayload_Message:
return nil
case *ChannelPayload_Pin_:
snapshot.ChannelContent.Pins = append(snapshot.ChannelContent.Pins, content.Pin)
return nil
case *ChannelPayload_Unpin_:
snapPins := snapshot.ChannelContent.Pins
for i, pin := range snapPins {
if bytes.Equal(pin.EventId, content.Unpin.EventId) {
snapPins = append(snapPins[:i], snapshot.ChannelContent.Pins[i+1:]...)
break
}
}
snapshot.ChannelContent.Pins = snapPins
return nil
default:
return RiverError(Err_INVALID_ARGUMENT, "unknown channel payload type %T", content)
}
Expand Down
39 changes: 39 additions & 0 deletions core/node/events/stream_viewstate_channel.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package events

import (
"bytes"

. "github.com/river-build/river/core/node/base"
. "github.com/river-build/river/core/node/protocol"
)

type ChannelStreamView interface {
JoinableStreamView
GetChannelInception() (*ChannelPayload_Inception, error)
GetPinnedMessages() ([]*ChannelPayload_Pin, error)
}

var _ ChannelStreamView = (*streamViewImpl)(nil)
Expand All @@ -21,3 +24,39 @@ func (r *streamViewImpl) GetChannelInception() (*ChannelPayload_Inception, error
return nil, RiverError(Err_WRONG_STREAM_TYPE, "Expected channel stream", "streamId", i.GetStreamId())
}
}

func (r *streamViewImpl) GetPinnedMessages() ([]*ChannelPayload_Pin, error) {
s := r.snapshot.Content
channelSnapshot := s.(*Snapshot_ChannelContent)
// make a copy of the pins
pins := make([]*ChannelPayload_Pin, len(channelSnapshot.ChannelContent.Pins))
copy(pins, channelSnapshot.ChannelContent.Pins)

updateFn := func(e *ParsedEvent, minibockNum int64, eventNum int64) (bool, error) {
switch payload := e.Event.Payload.(type) {
case *StreamEvent_ChannelPayload:
switch payload := payload.ChannelPayload.Content.(type) {
case *ChannelPayload_Pin_:
pins = append(pins, payload.Pin)
case *ChannelPayload_Unpin_:
for i, pin := range pins {
if bytes.Equal(pin.EventId, payload.Unpin.EventId) {
pins = append(pins[:i], pins[i+1:]...)
break
}
}
default:
break
}
default:
break
}
return true, nil
}

err := r.forEachEvent(r.snapshotIndex+1, updateFn)
if err != nil {
return nil, err
}
return pins, nil
}
Loading

0 comments on commit 5ba3a27

Please sign in to comment.