diff --git a/pkg/agent/poller/client.go b/pkg/agent/poller/client.go index be5da7f38..948ee23ca 100644 --- a/pkg/agent/poller/client.go +++ b/pkg/agent/poller/client.go @@ -1,7 +1,6 @@ package poller import ( - "fmt" "sync" "time" @@ -27,6 +26,7 @@ type PollClient struct { newPollManager newPollExecutorFunc harvesterConfig harvesterConfig mutex sync.RWMutex + initialized bool } type harvesterConfig struct { @@ -74,23 +74,23 @@ func (p *PollClient) Start() error { p.handlers..., ) - poller := p.newPollManager( + p.poller = p.newPollManager( p.interval, withOnStop(p.onClientStop), withHarvester(p.harvesterConfig), ) - p.poller = poller - p.mutex.Unlock() - listenCh := p.listener.Listen() - p.poller.RegisterWatch(eventCh, eventErrorCh) if p.onStreamConnection != nil { p.onStreamConnection() } + p.mutex.Lock() + p.initialized = true + p.mutex.Unlock() + select { case err := <-listenCh: return err @@ -103,11 +103,10 @@ func (p *PollClient) Start() error { func (p *PollClient) Status() error { p.mutex.RLock() defer p.mutex.RUnlock() - if p.poller == nil || p.listener == nil { - return fmt.Errorf("harvester polling client is not ready") - } - if ok := p.poller.Status(); !ok { - return errors.ErrHarvesterConnection + if p.initialized { + if ok := p.poller.Status(); !ok { + return errors.ErrHarvesterConnection + } } return nil diff --git a/pkg/agent/poller/poller.go b/pkg/agent/poller/poller.go index 56743be26..ae6836c5c 100644 --- a/pkg/agent/poller/poller.go +++ b/pkg/agent/poller/poller.go @@ -70,6 +70,20 @@ func (m *pollExecutor) RegisterWatch(eventChan chan *proto.Event, errChan chan e return } + if err := m.harvester.EventCatchUp(m.topicSelfLink, eventChan); err != nil { + m.logger.WithError(err).Error("harvester returned an error when syncing events") + m.onHarvesterErr() + go func() { + m.Stop() + errChan <- err + }() + return + } + + m.lock.Lock() + m.isReady = true + m.lock.Unlock() + go func() { err := m.sync(m.topicSelfLink, eventChan) m.Stop() @@ -79,15 +93,7 @@ func (m *pollExecutor) RegisterWatch(eventChan chan *proto.Event, errChan chan e func (m *pollExecutor) sync(topicSelfLink string, eventChan chan *proto.Event) error { m.logger.Trace("sync events") - if err := m.harvester.EventCatchUp(topicSelfLink, eventChan); err != nil { - m.logger.WithError(err).Error("harvester returned an error when syncing events") - m.onHarvesterErr() - return err - } - m.lock.Lock() - m.isReady = true - m.lock.Unlock() for { select { case <-m.ctx.Done(): @@ -150,5 +156,9 @@ func (m *pollExecutor) Stop() { func (m *pollExecutor) Status() bool { m.lock.RLock() defer m.lock.RUnlock() - return m.ctx.Err() == nil && m.isReady + if m.ctx.Err() != nil { + return false + } + + return m.isReady } diff --git a/pkg/apic/client.go b/pkg/apic/client.go index 1c84268db..9571c185e 100644 --- a/pkg/apic/client.go +++ b/pkg/apic/client.go @@ -754,24 +754,37 @@ func (c *ServiceClient) CreateResource(url string, bts []byte) (*apiv1.ResourceI return ri, err } -// updateORCreateResourceInstance -func (c *ServiceClient) updateSpecORCreateResourceInstance(data *apiv1.ResourceInstance) (*apiv1.ResourceInstance, error) { - // default to post - url := c.createAPIServerURL(data.GetKindLink()) - method := coreapi.POST +func (c *ServiceClient) getCachedResource(data *apiv1.ResourceInstance) (*apiv1.ResourceInstance, error) { + switch data.Kind { + case management.AccessRequestDefinitionGVK().Kind: + return c.caches.GetAccessRequestDefinitionByName(data.Name) + case management.CredentialRequestDefinitionGVK().Kind: + return c.caches.GetCredentialRequestDefinitionByName(data.Name) + case management.APIServiceInstanceGVK().Kind: + return c.caches.GetAPIServiceInstanceByName(data.Name) + } + return nil, nil +} - // check if the KIND and ID combo have an item in the cache - var existingRI *apiv1.ResourceInstance - var err error +func (c *ServiceClient) addResourceToCache(data *apiv1.ResourceInstance) { switch data.Kind { case management.AccessRequestDefinitionGVK().Kind: - existingRI, err = c.caches.GetAccessRequestDefinitionByName(data.Name) + c.caches.AddAccessRequestDefinition(data) case management.CredentialRequestDefinitionGVK().Kind: - existingRI, err = c.caches.GetCredentialRequestDefinitionByName(data.Name) + c.caches.AddCredentialRequestDefinition(data) case management.APIServiceInstanceGVK().Kind: - existingRI, err = c.caches.GetAPIServiceInstanceByName(data.Name) + c.caches.AddAPIServiceInstance(data) } +} +// updateORCreateResourceInstance +func (c *ServiceClient) updateSpecORCreateResourceInstance(data *apiv1.ResourceInstance) (*apiv1.ResourceInstance, error) { + // default to post + url := c.createAPIServerURL(data.GetKindLink()) + method := coreapi.POST + + // check if the KIND and ID combo have an item in the cache + existingRI, err := c.getCachedResource(data) updateRI := true updateAgentDetails := true @@ -839,6 +852,10 @@ func (c *ServiceClient) updateSpecORCreateResourceInstance(data *apiv1.ResourceI } } + + if existingRI == nil { + c.addResourceToCache(newRI) + } return newRI, err }