Skip to content

Commit

Permalink
Add autojoin and show join leave events (#587)
Browse files Browse the repository at this point in the history
space payload snapshot channel metadata includes new `ChannelSettings`
message with fields for
- autojoin
- hide user join and leave events

These can be optionally included in the Channel Inception message, if
not all fields are false by default.
Two new messages on Space Payload to manually toggle these two settings,
as well as client endpoints.

migration sets autojoin as true for default channels (22 trailing 0s),
but from this PR on all channels will be not autojoin unless explicitly
toggled on. snapshot migrations apply these defaults.

e2e tests validate:
- settings properly set on channel inception
- client events emitted on changes to both settings
- client space streamview state updates appropriately in response to
inception and update messages
- only permitted users can update these properties
  • Loading branch information
clemire authored Aug 5, 2024
1 parent e3afdb2 commit 1730085
Show file tree
Hide file tree
Showing 21 changed files with 2,509 additions and 1,144 deletions.
2 changes: 2 additions & 0 deletions core/node/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ func Make_SpacePayload_ChannelUpdate(
op ChannelOp,
channelId StreamId,
originEvent *EventRef,
settings *SpacePayload_ChannelSettings,
) *StreamEvent_SpacePayload {
return &StreamEvent_SpacePayload{
SpacePayload: &SpacePayload{
Expand All @@ -310,6 +311,7 @@ func Make_SpacePayload_ChannelUpdate(
Op: op,
ChannelId: channelId[:],
OriginEvent: originEvent,
Settings: settings,
},
},
},
Expand Down
1 change: 1 addition & 0 deletions core/node/events/migrations/migrate_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type migrationFunc func(*Snapshot) *Snapshot
var MIGRATIONS = []migrationFunc{
snapshot_migration_0000,
snapshot_migration_0001,
snapshot_migration_0002,
}

func CurrentSnapshotVersion() int32 {
Expand Down
27 changes: 27 additions & 0 deletions core/node/events/migrations/snapshot_migration_0002.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package migrations

import (
. "github.com/river-build/river/core/node/protocol"
"github.com/river-build/river/core/node/shared"
)

func snapshot_migration_0002(iSnapshot *Snapshot) *Snapshot {
switch snapshot := iSnapshot.Content.(type) {
case *Snapshot_SpaceContent:
for _, channel := range snapshot.SpaceContent.Channels {
if channel.Settings == nil {
channel.Settings = &SpacePayload_ChannelSettings{}
}
channelId, err := shared.StreamIdFromBytes(channel.ChannelId)
if err != nil {
// Note: it would be better to log this error, but we have no logging
// context here at this time
continue
}
if shared.IsDefaultChannelId(channelId) {
channel.Settings.Autojoin = true
}
}
}
return iSnapshot
}
54 changes: 54 additions & 0 deletions core/node/events/migrations/snapshot_migration_0002_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package migrations

import (
"bytes"
"testing"

"github.com/stretchr/testify/require"

. "github.com/river-build/river/core/node/protocol"

"github.com/river-build/river/core/node/shared"
)

func TestSnapshotMigration0002(t *testing.T) {
spaceId, err := shared.MakeSpaceId()
require.NoError(t, err)

defaultChannelId, err := shared.MakeDefaultChannelId(spaceId)
require.NoError(t, err)

channelId1, err := shared.MakeChannelId(spaceId)
require.NoError(t, err)

spaceSnap := &Snapshot{
Content: &Snapshot_SpaceContent{
SpaceContent: &SpacePayload_Snapshot{
Inception: &SpacePayload_Inception{
StreamId: spaceId[:],
},
Channels: []*SpacePayload_ChannelMetadata{
{
ChannelId: defaultChannelId[:],
},
{
ChannelId: channelId1[:],
},
},
},
},
}

migratedSnapshot := snapshot_migration_0002(spaceSnap)

require.Equal(t, 2, len(migratedSnapshot.GetSpaceContent().Channels))

require.True(t, bytes.Equal(migratedSnapshot.GetSpaceContent().Channels[0].ChannelId, defaultChannelId[:]))
require.True(t, bytes.Equal(migratedSnapshot.GetSpaceContent().Channels[1].ChannelId, channelId1[:]))

require.True(t, migratedSnapshot.GetSpaceContent().Channels[0].Settings.Autojoin)
require.False(t, migratedSnapshot.GetSpaceContent().Channels[1].Settings.Autojoin)

require.False(t, migratedSnapshot.GetSpaceContent().Channels[0].Settings.HideUserJoinLeaveEvents)
require.False(t, migratedSnapshot.GetSpaceContent().Channels[1].Settings.HideUserJoinLeaveEvents)
}
2 changes: 1 addition & 1 deletion core/node/events/miniblock.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func Make_GenesisMiniblockHeader(parsedEvents []*ParsedEvent) (*MiniblockHeader,
}
}

snapshot, err := Make_GenisisSnapshot(parsedEvents)
snapshot, err := Make_GenesisSnapshot(parsedEvents)
if err != nil {
return nil, err
}
Expand Down
37 changes: 36 additions & 1 deletion core/node/events/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/river-build/river/core/node/shared"
)

func Make_GenisisSnapshot(events []*ParsedEvent) (*Snapshot, error) {
func Make_GenesisSnapshot(events []*ParsedEvent) (*Snapshot, error) {
if len(events) == 0 {
return nil, RiverError(Err_INVALID_ARGUMENT, "no events to make snapshot from")
}
Expand Down Expand Up @@ -201,9 +201,44 @@ func update_Snapshot_Space(
Op: content.Channel.Op,
OriginEvent: content.Channel.OriginEvent,
UpdatedAtEventNum: eventNum,
Settings: content.Channel.Settings,
}
if channel.Settings == nil {
if channel.Op == ChannelOp_CO_CREATED {
// Apply default channel settings for new channels when settings are not provided.
// Invariant: channel.Settings is defined for all channels in the snapshot.
channelId, err := shared.StreamIdFromBytes(content.Channel.ChannelId)
if err != nil {
return err
}
channel.Settings = &SpacePayload_ChannelSettings{
Autojoin: shared.IsDefaultChannelId(channelId),
}
} else if channel.Op == ChannelOp_CO_UPDATED {
// Find the existing channel and copy over the settings if new ones are not provided.
existingChannel, err := findChannel(snapshot.SpaceContent.Channels, content.Channel.ChannelId)
if err != nil {
return err
}
channel.Settings = existingChannel.Settings
}
}
snapshot.SpaceContent.Channels = insertChannel(snapshot.SpaceContent.Channels, channel)
return nil
case *SpacePayload_UpdateChannelAutojoin_:
channel, err := findChannel(snapshot.SpaceContent.Channels, content.UpdateChannelAutojoin.ChannelId)
if err != nil {
return err
}
channel.Settings.Autojoin = content.UpdateChannelAutojoin.Autojoin
return nil
case *SpacePayload_UpdateChannelHideUserJoinLeaveEvents_:
channel, err := findChannel(snapshot.SpaceContent.Channels, content.UpdateChannelHideUserJoinLeaveEvents.ChannelId)
if err != nil {
return err
}
channel.Settings.HideUserJoinLeaveEvents = content.UpdateChannelHideUserJoinLeaveEvents.HideUserJoinLeaveEvents
return nil
case *SpacePayload_SpaceImage:
snapshot.SpaceContent.SpaceImage = &SpacePayload_SnappedSpaceImage{
Data: content.SpaceImage,
Expand Down
10 changes: 5 additions & 5 deletions core/node/events/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func TestMakeSnapshot(t *testing.T) {
wallet, _ := crypto.NewWallet(ctx)
streamId := UserStreamIdFromAddr(wallet.Address)
inception := make_User_Inception(wallet, streamId, t)
snapshot, err := Make_GenisisSnapshot([]*ParsedEvent{inception})
snapshot, err := Make_GenesisSnapshot([]*ParsedEvent{inception})
assert.NoError(t, err)
assert.Equal(
t,
Expand All @@ -161,7 +161,7 @@ func TestUpdateSnapshot(t *testing.T) {
wallet, _ := crypto.NewWallet(ctx)
streamId := UserStreamIdFromAddr(wallet.Address)
inception := make_User_Inception(wallet, streamId, t)
snapshot, err := Make_GenisisSnapshot([]*ParsedEvent{inception})
snapshot, err := Make_GenesisSnapshot([]*ParsedEvent{inception})
assert.NoError(t, err)

membership := make_User_Membership(wallet, MembershipOp_SO_JOIN, streamId, nil, t)
Expand All @@ -185,7 +185,7 @@ func TestCloneAndUpdateUserSnapshot(t *testing.T) {
wallet, _ := crypto.NewWallet(ctx)
streamId := UserStreamIdFromAddr(wallet.Address)
inception := make_User_Inception(wallet, streamId, t)
snapshot1, err := Make_GenisisSnapshot([]*ParsedEvent{inception})
snapshot1, err := Make_GenesisSnapshot([]*ParsedEvent{inception})
assert.NoError(t, err)

snapshot := proto.Clone(snapshot1).(*Snapshot)
Expand All @@ -211,7 +211,7 @@ func TestCloneAndUpdateSpaceSnapshot(t *testing.T) {
wallet, _ := crypto.NewWallet(ctx)
streamId := UserStreamIdFromAddr(wallet.Address)
inception := make_Space_Inception(wallet, streamId, t)
snapshot1, err := Make_GenisisSnapshot([]*ParsedEvent{inception})
snapshot1, err := Make_GenesisSnapshot([]*ParsedEvent{inception})
assert.NoError(t, err)
userId, err := AddressHex(inception.Event.CreatorAddress)
assert.NoError(t, err)
Expand Down Expand Up @@ -276,7 +276,7 @@ func TestUpdateSnapshotFailsIfInception(t *testing.T) {
wallet, _ := crypto.NewWallet(ctx)
streamId := UserStreamIdFromAddr(wallet.Address)
inception := make_User_Inception(wallet, streamId, t)
snapshot, err := Make_GenisisSnapshot([]*ParsedEvent{inception})
snapshot, err := Make_GenesisSnapshot([]*ParsedEvent{inception})
assert.NoError(t, err)

err = Update_Snapshot(snapshot, inception, 0, 1)
Expand Down
Loading

0 comments on commit 1730085

Please sign in to comment.