From 88591787f8cf9c769ac60900a6a075ed59960be3 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Fri, 22 Nov 2024 10:07:10 +0800 Subject: [PATCH] update --- downstreamadapter/dispatcher/dispatcher.go | 2 +- downstreamadapter/dispatcher/helper.go | 7 +++++++ .../eventcollector/event_collector.go | 16 ++++++++-------- downstreamadapter/eventcollector/helper.go | 16 ++++++++-------- 4 files changed, 24 insertions(+), 17 deletions(-) diff --git a/downstreamadapter/dispatcher/dispatcher.go b/downstreamadapter/dispatcher/dispatcher.go index 0b11d4ae..f4b412e1 100644 --- a/downstreamadapter/dispatcher/dispatcher.go +++ b/downstreamadapter/dispatcher/dispatcher.go @@ -237,7 +237,7 @@ func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.Dispat // We ensure we only will receive one event when it's ddl event or sync point event // by setting them with different event types in DispatcherEventsHandler.GetType // When we handle events, we don't have any previous events still in sink. -func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeCallback func()) (block bool) { +func (d *Dispatcher) HandleEvents(dispatcherEvents []*DispatcherEvent, wakeCallback func()) (block bool) { // Only return false when all events are resolvedTs Event. block = false // Dispatcher is ready, handle the events diff --git a/downstreamadapter/dispatcher/helper.go b/downstreamadapter/dispatcher/helper.go index 94c6ad21..df5c04ea 100644 --- a/downstreamadapter/dispatcher/helper.go +++ b/downstreamadapter/dispatcher/helper.go @@ -275,6 +275,13 @@ func NewDispatcherEvent(from node.ID, event commonEvent.Event) DispatcherEvent { } } +func NewDispatcherEventTest(from node.ID, event commonEvent.Event) *DispatcherEvent { + return &DispatcherEvent{ + From: from, + Event: event, + } +} + type DispatcherStatusWithID struct { id common.DispatcherID status *heartbeatpb.DispatcherStatus diff --git a/downstreamadapter/eventcollector/event_collector.go b/downstreamadapter/eventcollector/event_collector.go index 6390cb99..c7547c40 100644 --- a/downstreamadapter/eventcollector/event_collector.go +++ b/downstreamadapter/eventcollector/event_collector.go @@ -84,7 +84,7 @@ type EventCollector struct { // ds is the dynamicStream for dispatcher events. // All the events from event service will be sent to ds to handle. // ds will dispatch the events to different dispatchers according to the dispatcherID. - ds dynstream.DynamicStream[common.GID, common.DispatcherID, dispatcher.DispatcherEvent, *DispatcherStat, *EventsHandler] + ds dynstream.DynamicStream[common.GID, common.DispatcherID, *dispatcher.DispatcherEvent, *DispatcherStat, *EventsHandler] coordinatorInfo struct { sync.RWMutex @@ -331,11 +331,11 @@ func (c *EventCollector) RecvEventsMessage(_ context.Context, targetMessage *mes case commonEvent.TypeBatchResolvedEvent: for _, e := range event.(*commonEvent.BatchResolvedEvent).Events { c.metricDispatcherReceivedResolvedTsEventCount.Inc() - c.ds.In(e.DispatcherID) <- dispatcher.NewDispatcherEvent(targetMessage.From, e) + c.ds.In(e.DispatcherID) <- dispatcher.NewDispatcherEventTest(targetMessage.From, e) } default: c.metricDispatcherReceivedKVEventCount.Inc() - c.ds.In(event.GetDispatcherID()) <- dispatcher.NewDispatcherEvent(targetMessage.From, event) + c.ds.In(event.GetDispatcherID()) <- dispatcher.NewDispatcherEventTest(targetMessage.From, event) } default: log.Panic("invalid message type", zap.Any("msg", msg)) @@ -417,7 +417,7 @@ func (d *DispatcherStat) reset() { d.waitHandshake.Store(true) } -func (d *DispatcherStat) checkEventSeq(event dispatcher.DispatcherEvent, eventCollector *EventCollector) bool { +func (d *DispatcherStat) checkEventSeq(event *dispatcher.DispatcherEvent, eventCollector *EventCollector) bool { switch event.GetType() { case commonEvent.TypeDMLEvent, commonEvent.TypeDDLEvent, @@ -448,7 +448,7 @@ func (d *DispatcherStat) checkEventSeq(event dispatcher.DispatcherEvent, eventCo } } -func (d *DispatcherStat) shouldIgnoreDataEvent(event dispatcher.DispatcherEvent, eventCollector *EventCollector) bool { +func (d *DispatcherStat) shouldIgnoreDataEvent(event *dispatcher.DispatcherEvent, eventCollector *EventCollector) bool { if d.eventServiceInfo.serverID != event.From { // TODO: unregister from this invalid event service if it send events for a long time return true @@ -478,7 +478,7 @@ func (d *DispatcherStat) shouldIgnoreDataEvent(event dispatcher.DispatcherEvent, return false } -func (d *DispatcherStat) handleHandshakeEvent(event dispatcher.DispatcherEvent, eventCollector *EventCollector) { +func (d *DispatcherStat) handleHandshakeEvent(event *dispatcher.DispatcherEvent, eventCollector *EventCollector) { d.eventServiceInfo.Lock() defer d.eventServiceInfo.Unlock() if event.GetType() != commonEvent.TypeHandshakeEvent { @@ -504,7 +504,7 @@ func (d *DispatcherStat) handleHandshakeEvent(event dispatcher.DispatcherEvent, d.target.SetInitialTableInfo(event.Event.(*commonEvent.HandshakeEvent).TableInfo) } -func (d *DispatcherStat) handleReadyEvent(event dispatcher.DispatcherEvent, eventCollector *EventCollector) { +func (d *DispatcherStat) handleReadyEvent(event *dispatcher.DispatcherEvent, eventCollector *EventCollector) { d.eventServiceInfo.Lock() defer d.eventServiceInfo.Unlock() if event.GetType() != commonEvent.TypeReadyEvent { @@ -562,7 +562,7 @@ func (d *DispatcherStat) handleReadyEvent(event dispatcher.DispatcherEvent, even } } -func (d *DispatcherStat) handleNotReusableEvent(event dispatcher.DispatcherEvent, eventCollector *EventCollector) { +func (d *DispatcherStat) handleNotReusableEvent(event *dispatcher.DispatcherEvent, eventCollector *EventCollector) { d.eventServiceInfo.Lock() defer d.eventServiceInfo.Unlock() if event.GetType() != commonEvent.TypeNotReusableEvent { diff --git a/downstreamadapter/eventcollector/helper.go b/downstreamadapter/eventcollector/helper.go index df45b0ec..1023aa0e 100644 --- a/downstreamadapter/eventcollector/helper.go +++ b/downstreamadapter/eventcollector/helper.go @@ -34,7 +34,7 @@ func (h pathHasher) HashPath(path common.DispatcherID) int { return int((common.GID)(path).FastHash() % (uint64)(h.streamCount)) } -func NewEventDynamicStream(collector *EventCollector) dynstream.DynamicStream[common.GID, common.DispatcherID, dispatcher.DispatcherEvent, *DispatcherStat, *EventsHandler] { +func NewEventDynamicStream(collector *EventCollector) dynstream.DynamicStream[common.GID, common.DispatcherID, *dispatcher.DispatcherEvent, *DispatcherStat, *EventsHandler] { option := dynstream.NewOption() option.BatchCount = 128 // option.InputBufferSize = 1000000 / streamCount @@ -70,12 +70,12 @@ type EventsHandler struct { eventCollector *EventCollector } -func (h *EventsHandler) Path(event dispatcher.DispatcherEvent) common.DispatcherID { +func (h *EventsHandler) Path(event *dispatcher.DispatcherEvent) common.DispatcherID { return event.GetDispatcherID() } // Invariant: at any times, we can receive events from at most two event service, and one of them must be local event service. -func (h *EventsHandler) Handle(stat *DispatcherStat, events ...dispatcher.DispatcherEvent) bool { +func (h *EventsHandler) Handle(stat *DispatcherStat, events ...*dispatcher.DispatcherEvent) bool { // do some check for safety if len(events) == 0 { return false @@ -156,7 +156,7 @@ const ( DataGroupNotReusable = 6 ) -func (h *EventsHandler) GetType(event dispatcher.DispatcherEvent) dynstream.EventType { +func (h *EventsHandler) GetType(event *dispatcher.DispatcherEvent) dynstream.EventType { switch event.GetType() { case commonEvent.TypeResolvedEvent: return dynstream.EventType{DataGroup: DataGroupResolvedTsOrDML, Property: dynstream.PeriodicSignal} @@ -178,19 +178,19 @@ func (h *EventsHandler) GetType(event dispatcher.DispatcherEvent) dynstream.Even return dynstream.DefaultEventType } -func (h *EventsHandler) GetSize(event dispatcher.DispatcherEvent) int { return int(event.GetSize()) } +func (h *EventsHandler) GetSize(event *dispatcher.DispatcherEvent) int { return int(event.GetSize()) } -func (h *EventsHandler) IsPaused(event dispatcher.DispatcherEvent) bool { return event.IsPaused() } +func (h *EventsHandler) IsPaused(event *dispatcher.DispatcherEvent) bool { return event.IsPaused() } func (h *EventsHandler) GetArea(path common.DispatcherID, dest *DispatcherStat) common.GID { return dest.target.GetChangefeedID().ID() } -func (h *EventsHandler) GetTimestamp(event dispatcher.DispatcherEvent) dynstream.Timestamp { +func (h *EventsHandler) GetTimestamp(event *dispatcher.DispatcherEvent) dynstream.Timestamp { return dynstream.Timestamp(event.GetCommitTs()) } -func (h *EventsHandler) OnDrop(event dispatcher.DispatcherEvent) { +func (h *EventsHandler) OnDrop(event *dispatcher.DispatcherEvent) { if event.GetType() != commonEvent.TypeResolvedEvent { // It is normal to drop resolved event log.Info("event dropped",