Skip to content

Commit

Permalink
feat!: modify URN format to follow ODPF standard (#387)
Browse files Browse the repository at this point in the history
* feat: add base plugin

* fix: breaking plugin test

* fix: lint errors

* feat: create a NewURN function

* test: add missing tests

* feat: update all extractors to use odpf urn

* feat(plugins): add BaseExtractor

* fix: breaking test

* fix: improve test coverage

* fix: config not passed properly
  • Loading branch information
StewartJingga authored Aug 2, 2022
1 parent b536051 commit d5ba47c
Show file tree
Hide file tree
Showing 107 changed files with 1,862 additions and 1,772 deletions.
15 changes: 9 additions & 6 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ func (r *Agent) Validate(rcp recipe.Recipe) (errs []error) {
if ext, err := r.extractorFactory.Get(rcp.Source.Name); err != nil {
errs = append(errs, err)
} else {
if err = ext.Validate(rcp.Source.Config); err != nil {
if err = ext.Validate(plugins.Config{
URNScope: rcp.Source.Scope,
RawConfig: rcp.Source.Config,
}); err != nil {
errs = append(errs, r.enrichInvalidConfigError(err, rcp.Source.Name, plugins.PluginTypeExtractor))
}
}
Expand All @@ -72,7 +75,7 @@ func (r *Agent) Validate(rcp recipe.Recipe) (errs []error) {
errs = append(errs, err)
continue
}
if err = sink.Validate(s.Config); err != nil {
if err = sink.Validate(plugins.Config{RawConfig: s.Config}); err != nil {
errs = append(errs, r.enrichInvalidConfigError(err, s.Name, plugins.PluginTypeSink))
}
}
Expand All @@ -83,7 +86,7 @@ func (r *Agent) Validate(rcp recipe.Recipe) (errs []error) {
errs = append(errs, err)
continue
}
if err = procc.Validate(p.Config); err != nil {
if err = procc.Validate(plugins.Config{RawConfig: p.Config}); err != nil {
errs = append(errs, r.enrichInvalidConfigError(err, p.Name, plugins.PluginTypeProcessor))
}
}
Expand Down Expand Up @@ -197,7 +200,7 @@ func (r *Agent) setupExtractor(ctx context.Context, sr recipe.PluginRecipe, str
err = errors.Wrapf(err, "could not find extractor \"%s\"", sr.Name)
return
}
if err = extractor.Init(ctx, sr.Config); err != nil {
if err = extractor.Init(ctx, recipeToPluginConfig(sr)); err != nil {
err = errors.Wrapf(err, "could not initiate extractor \"%s\"", sr.Name)
return
}
Expand All @@ -217,7 +220,7 @@ func (r *Agent) setupProcessor(ctx context.Context, pr recipe.PluginRecipe, str
if proc, err = r.processorFactory.Get(pr.Name); err != nil {
return errors.Wrapf(err, "could not find processor \"%s\"", pr.Name)
}
if err = proc.Init(ctx, pr.Config); err != nil {
if err = proc.Init(ctx, recipeToPluginConfig(pr)); err != nil {
return errors.Wrapf(err, "could not initiate processor \"%s\"", pr.Name)
}

Expand All @@ -240,7 +243,7 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s
if sink, err = r.sinkFactory.Get(sr.Name); err != nil {
return errors.Wrapf(err, "could not find sink \"%s\"", sr.Name)
}
if err = sink.Init(ctx, sr.Config); err != nil {
if err = sink.Init(ctx, recipeToPluginConfig(sr)); err != nil {
return errors.Wrapf(err, "could not initiate sink \"%s\"", sr.Name)
}
retryNotification := func(e error, d time.Duration) {
Expand Down
Loading

0 comments on commit d5ba47c

Please sign in to comment.