Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TEST] #582

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion downstreamadapter/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.Dispat
// We ensure we only will receive one event when it's ddl event or sync point event
// by setting them with different event types in DispatcherEventsHandler.GetType
// When we handle events, we don't have any previous events still in sink.
func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeCallback func()) (block bool) {
func (d *Dispatcher) HandleEvents(dispatcherEvents []*DispatcherEvent, wakeCallback func()) (block bool) {
// Only return false when all events are resolvedTs Event.
block = false
// Dispatcher is ready, handle the events
Expand Down
7 changes: 7 additions & 0 deletions downstreamadapter/dispatcher/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,13 @@ func NewDispatcherEvent(from node.ID, event commonEvent.Event) DispatcherEvent {
}
}

func NewDispatcherEventTest(from node.ID, event commonEvent.Event) *DispatcherEvent {
return &DispatcherEvent{
From: from,
Event: event,
}
}

type DispatcherStatusWithID struct {
id common.DispatcherID
status *heartbeatpb.DispatcherStatus
Expand Down
16 changes: 8 additions & 8 deletions downstreamadapter/eventcollector/event_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type EventCollector struct {
// ds is the dynamicStream for dispatcher events.
// All the events from event service will be sent to ds to handle.
// ds will dispatch the events to different dispatchers according to the dispatcherID.
ds dynstream.DynamicStream[common.GID, common.DispatcherID, dispatcher.DispatcherEvent, *DispatcherStat, *EventsHandler]
ds dynstream.DynamicStream[common.GID, common.DispatcherID, *dispatcher.DispatcherEvent, *DispatcherStat, *EventsHandler]

coordinatorInfo struct {
sync.RWMutex
Expand Down Expand Up @@ -331,11 +331,11 @@ func (c *EventCollector) RecvEventsMessage(_ context.Context, targetMessage *mes
case commonEvent.TypeBatchResolvedEvent:
for _, e := range event.(*commonEvent.BatchResolvedEvent).Events {
c.metricDispatcherReceivedResolvedTsEventCount.Inc()
c.ds.In(e.DispatcherID) <- dispatcher.NewDispatcherEvent(targetMessage.From, e)
c.ds.In(e.DispatcherID) <- dispatcher.NewDispatcherEventTest(targetMessage.From, e)
}
default:
c.metricDispatcherReceivedKVEventCount.Inc()
c.ds.In(event.GetDispatcherID()) <- dispatcher.NewDispatcherEvent(targetMessage.From, event)
c.ds.In(event.GetDispatcherID()) <- dispatcher.NewDispatcherEventTest(targetMessage.From, event)
}
default:
log.Panic("invalid message type", zap.Any("msg", msg))
Expand Down Expand Up @@ -417,7 +417,7 @@ func (d *DispatcherStat) reset() {
d.waitHandshake.Store(true)
}

func (d *DispatcherStat) checkEventSeq(event dispatcher.DispatcherEvent, eventCollector *EventCollector) bool {
func (d *DispatcherStat) checkEventSeq(event *dispatcher.DispatcherEvent, eventCollector *EventCollector) bool {
switch event.GetType() {
case commonEvent.TypeDMLEvent,
commonEvent.TypeDDLEvent,
Expand Down Expand Up @@ -448,7 +448,7 @@ func (d *DispatcherStat) checkEventSeq(event dispatcher.DispatcherEvent, eventCo
}
}

func (d *DispatcherStat) shouldIgnoreDataEvent(event dispatcher.DispatcherEvent, eventCollector *EventCollector) bool {
func (d *DispatcherStat) shouldIgnoreDataEvent(event *dispatcher.DispatcherEvent, eventCollector *EventCollector) bool {
if d.eventServiceInfo.serverID != event.From {
// TODO: unregister from this invalid event service if it send events for a long time
return true
Expand Down Expand Up @@ -478,7 +478,7 @@ func (d *DispatcherStat) shouldIgnoreDataEvent(event dispatcher.DispatcherEvent,
return false
}

func (d *DispatcherStat) handleHandshakeEvent(event dispatcher.DispatcherEvent, eventCollector *EventCollector) {
func (d *DispatcherStat) handleHandshakeEvent(event *dispatcher.DispatcherEvent, eventCollector *EventCollector) {
d.eventServiceInfo.Lock()
defer d.eventServiceInfo.Unlock()
if event.GetType() != commonEvent.TypeHandshakeEvent {
Expand All @@ -504,7 +504,7 @@ func (d *DispatcherStat) handleHandshakeEvent(event dispatcher.DispatcherEvent,
d.target.SetInitialTableInfo(event.Event.(*commonEvent.HandshakeEvent).TableInfo)
}

func (d *DispatcherStat) handleReadyEvent(event dispatcher.DispatcherEvent, eventCollector *EventCollector) {
func (d *DispatcherStat) handleReadyEvent(event *dispatcher.DispatcherEvent, eventCollector *EventCollector) {
d.eventServiceInfo.Lock()
defer d.eventServiceInfo.Unlock()
if event.GetType() != commonEvent.TypeReadyEvent {
Expand Down Expand Up @@ -562,7 +562,7 @@ func (d *DispatcherStat) handleReadyEvent(event dispatcher.DispatcherEvent, even
}
}

func (d *DispatcherStat) handleNotReusableEvent(event dispatcher.DispatcherEvent, eventCollector *EventCollector) {
func (d *DispatcherStat) handleNotReusableEvent(event *dispatcher.DispatcherEvent, eventCollector *EventCollector) {
d.eventServiceInfo.Lock()
defer d.eventServiceInfo.Unlock()
if event.GetType() != commonEvent.TypeNotReusableEvent {
Expand Down
16 changes: 8 additions & 8 deletions downstreamadapter/eventcollector/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (h pathHasher) HashPath(path common.DispatcherID) int {
return int((common.GID)(path).FastHash() % (uint64)(h.streamCount))
}

func NewEventDynamicStream(collector *EventCollector) dynstream.DynamicStream[common.GID, common.DispatcherID, dispatcher.DispatcherEvent, *DispatcherStat, *EventsHandler] {
func NewEventDynamicStream(collector *EventCollector) dynstream.DynamicStream[common.GID, common.DispatcherID, *dispatcher.DispatcherEvent, *DispatcherStat, *EventsHandler] {
option := dynstream.NewOption()
option.BatchCount = 128
// option.InputBufferSize = 1000000 / streamCount
Expand Down Expand Up @@ -70,12 +70,12 @@ type EventsHandler struct {
eventCollector *EventCollector
}

func (h *EventsHandler) Path(event dispatcher.DispatcherEvent) common.DispatcherID {
func (h *EventsHandler) Path(event *dispatcher.DispatcherEvent) common.DispatcherID {
return event.GetDispatcherID()
}

// Invariant: at any times, we can receive events from at most two event service, and one of them must be local event service.
func (h *EventsHandler) Handle(stat *DispatcherStat, events ...dispatcher.DispatcherEvent) bool {
func (h *EventsHandler) Handle(stat *DispatcherStat, events ...*dispatcher.DispatcherEvent) bool {
// do some check for safety
if len(events) == 0 {
return false
Expand Down Expand Up @@ -156,7 +156,7 @@ const (
DataGroupNotReusable = 6
)

func (h *EventsHandler) GetType(event dispatcher.DispatcherEvent) dynstream.EventType {
func (h *EventsHandler) GetType(event *dispatcher.DispatcherEvent) dynstream.EventType {
switch event.GetType() {
case commonEvent.TypeResolvedEvent:
return dynstream.EventType{DataGroup: DataGroupResolvedTsOrDML, Property: dynstream.PeriodicSignal}
Expand All @@ -178,19 +178,19 @@ func (h *EventsHandler) GetType(event dispatcher.DispatcherEvent) dynstream.Even
return dynstream.DefaultEventType
}

func (h *EventsHandler) GetSize(event dispatcher.DispatcherEvent) int { return int(event.GetSize()) }
func (h *EventsHandler) GetSize(event *dispatcher.DispatcherEvent) int { return int(event.GetSize()) }

func (h *EventsHandler) IsPaused(event dispatcher.DispatcherEvent) bool { return event.IsPaused() }
func (h *EventsHandler) IsPaused(event *dispatcher.DispatcherEvent) bool { return event.IsPaused() }

func (h *EventsHandler) GetArea(path common.DispatcherID, dest *DispatcherStat) common.GID {
return dest.target.GetChangefeedID().ID()
}

func (h *EventsHandler) GetTimestamp(event dispatcher.DispatcherEvent) dynstream.Timestamp {
func (h *EventsHandler) GetTimestamp(event *dispatcher.DispatcherEvent) dynstream.Timestamp {
return dynstream.Timestamp(event.GetCommitTs())
}

func (h *EventsHandler) OnDrop(event dispatcher.DispatcherEvent) {
func (h *EventsHandler) OnDrop(event *dispatcher.DispatcherEvent) {
if event.GetType() != commonEvent.TypeResolvedEvent {
// It is normal to drop resolved event
log.Info("event dropped",
Expand Down
Loading