Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(QTDI-653): Add a sample connector to test @AfterGroup only once. #941

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public MaxBatchSizeParamBuilder(final ParameterMeta root, final String component
final LocalConfiguration configuration) {
this.root = root;
this.layoutType = getLayoutType(root);
this.defaultValue = !isActive(componentSimpleClassName, configuration) ? -1
this.defaultValue = !isActive(componentSimpleClassName, configuration) ? -4
: Integer
.parseInt(ofNullable(configuration.get(componentSimpleClassName + "._maxBatchSize.value"))
.orElseGet(() -> ofNullable(configuration.get("_maxBatchSize.value")).orElse("1000"))
Expand All @@ -55,7 +55,7 @@ private boolean isActive(final String componentSimpleClassName, final LocalConfi
}

public ParameterMeta newBulkParameter() {
return defaultValue <= 0 ? null : new ParameterMeta(new ParameterMeta.Source() {
return defaultValue <= -3 ? null : new ParameterMeta(new ParameterMeta.Source() {

@Override
public String name() {
Expand All @@ -73,7 +73,8 @@ public Class<?> declaringClass() {

{
put("tcomp::ui::defaultvalue::value", String.valueOf(defaultValue));
put("tcomp::validation::min", "1");
put("tcomp::validation::min", "-3");
put("tcomp::ui::hidden", "true");
}
}, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ public class AutoChunkProcessor extends org.talend.sdk.component.runtime.manager
public AutoChunkProcessor(final int chunkSize, final Processor processor) {
super(chunkSize, processor);
}

}
16 changes: 13 additions & 3 deletions sample-parent/sample-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- Maven model definition -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><!-- Maven model definition -->
<modelVersion>4.0.0</modelVersion>

<parent>
Expand Down Expand Up @@ -61,6 +60,18 @@
<version>1.65.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.talend.sdk.component</groupId>
<artifactId>component-runtime-manager</artifactId>
<version>1.65.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.talend.sdk.component</groupId>
<artifactId>component-runtime-junit</artifactId>
<version>1.65.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -113,7 +124,6 @@
<validateActions>true</validateActions>
<validateDocumentation>true</validateDocumentation>
<validateWording>true</validateWording>
<validateLayout>true</validateLayout>
<validateOptionNames>true</validateOptionNames>
<validateOutputConnection>true</validateOutputConnection>
<validateLocalConfiguration>true</validateLocalConfiguration>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/**
* Copyright (C) 2006-2024 Talend Inc. - www.talend.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.talend.sdk.component.test.connectors.output;

import java.io.Serializable;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.talend.sdk.component.api.component.Icon;
import org.talend.sdk.component.api.component.Version;
import org.talend.sdk.component.api.configuration.Option;
import org.talend.sdk.component.api.configuration.ui.layout.GridLayout;
import org.talend.sdk.component.api.meta.Documentation;
import org.talend.sdk.component.api.processor.AfterGroup;
import org.talend.sdk.component.api.processor.ElementListener;
import org.talend.sdk.component.api.processor.Input;
import org.talend.sdk.component.api.processor.Output;
import org.talend.sdk.component.api.processor.OutputEmitter;
import org.talend.sdk.component.api.processor.Processor;
import org.talend.sdk.component.api.record.Record;
import org.talend.sdk.component.api.service.record.RecordBuilderFactory;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;

/**
* This output connector should consume all input record but have
* only one call to @AfterGroup method after all input records are consumed.
*/

@Slf4j
@Version(1)
@Icon(value = Icon.IconType.CUSTOM, custom = "output")
@Processor(name = "WithAfterGroupOnlyOnce")
@Documentation("Consume all input records and should have only 1 call to @AfterGroup.")
public class WithAfterGroupOnlyOnce implements Serializable {

private final RecordBuilderFactory recordBuilderFactory;

private final WithAfterGroupOnlyOnceConfig config;

private int nbConsumedRecords;

private boolean afterGroupCalled;

public WithAfterGroupOnlyOnce(final @Option("configuration") WithAfterGroupOnlyOnceConfig config,
final RecordBuilderFactory recordBuilderFactory) {
this.recordBuilderFactory = recordBuilderFactory;
this.config = config;
}

@PostConstruct
public void init() {
this.nbConsumedRecords = 0;
}

@PreDestroy
public void release() {
if (!this.afterGroupCalled) {
throw new RuntimeException("The @AfterGroup method has not been called.");
}
}

@ElementListener
public void onNext(@Input final Record record) {
this.nbConsumedRecords++;
}

@AfterGroup
public void afterGroup(@Output("REJECT") final OutputEmitter<Record> rejected) {
if (this.afterGroupCalled) {
Record error = this.recordBuilderFactory.newRecordBuilder()
.withString("error",
"The @AfterGroup method has been called more than once.")
.build();
rejected.emit(error);
}

if (this.nbConsumedRecords != this.config.getExpectedNumberOfRecords()) {
Record error = this.recordBuilderFactory.newRecordBuilder()
.withString("error",
String.format("The number of consumed records '%s' is not the expected one %s.",
this.nbConsumedRecords, this.config.getExpectedNumberOfRecords()))
.build();
rejected.emit(error);
}

this.afterGroupCalled = true;
}

@Data
@GridLayout({ @GridLayout.Row({ "expectedNumberOfRecords" }) })
public static class WithAfterGroupOnlyOnceConfig implements Serializable {

@Option
@Documentation("The number of expected record processed when @AfterGroup is called.")
private int expectedNumberOfRecords;

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/
@Components(
family = "the_family",
family = "the_family",
categories = "the_category_1")
@Icon(
value = Icon.IconType.CUSTOM,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
WithAfterGroupOnlyOnce._maxBatchSize.value=-2
WithAfterGroupOnlyOnce._maxBatchSize.active=true
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,8 @@
# $ python i18n_messages_properties_generator.py

the_family.TheOutput1._displayName = Name: The Output 1
the_family.TheOutput1._documentation = Doc: This is a sample output.
the_family.TheOutput1._documentation = Doc: This is a sample output.
WithAfterGroupOnlyOnceConfig.expectedNumberOfRecords._displayName = <expectedNumberOfRecords>
WithAfterGroupOnlyOnceConfig.expectedNumberOfRecords._placeholder =

the_family.WithAfterGroupOnlyOnce._displayName = WithAfterGroupOnlyOnce
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (C) 2006-2024 Talend Inc. - www.talend.com
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.talend.sdk.component.test.connectors.output;

import java.util.ArrayList;
import java.util.List;

import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.talend.sdk.component.api.record.Record;
import org.talend.sdk.component.api.service.record.RecordBuilderFactory;
import org.talend.sdk.component.junit.BaseComponentsHandler;
import org.talend.sdk.component.junit.SimpleFactory;
import org.talend.sdk.component.junit5.Injected;
import org.talend.sdk.component.junit5.WithComponents;
import org.talend.sdk.component.runtime.manager.chain.Job;

@WithComponents("org.talend.sdk.component.test.connectors")
public class OutputTest {

@Injected
protected BaseComponentsHandler componentsHandler;

private final String testStringValue = "test";

private final boolean testBooleanValue = true;

//@Test
void testOutput() {
final int recordSize = 15;
WithAfterGroupOnlyOnce.WithAfterGroupOnlyOnceConfig config = new WithAfterGroupOnlyOnce.WithAfterGroupOnlyOnceConfig();
config.setExpectedNumberOfRecords(recordSize);
String sourceConfig =
SimpleFactory.configurationByExample().forInstance(config).configured().toQueryString();

Record testRecord = componentsHandler
.findService(RecordBuilderFactory.class)
.newRecordBuilder()
.withString("stringValue", testStringValue)
.withBoolean("booleanValue", testBooleanValue)
.build();

List<Record> testRecords = new ArrayList<>();
for (int i = 0; i < recordSize; i++) {
testRecords.add(testRecord);
}
componentsHandler.setInputData(testRecords);

Job
.components()
.component("inputFlow", "test://emitter")
.component("outputComponent", "SampleConnector://WithAfterGroupOnlyOnce?$maxBatchSize=-1&" + sourceConfig)
.connections()
.from("inputFlow")
.to("outputComponent")
.build()
.run();

Assert.assertTrue(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@
"value": "200"
},
{
"comparison": "LengthEqual",
"comparison": "LengthGreaterThanOrEqual",
"subject": "ResponseJsonBody",
"path": "$.components",
"value": "9"
Expand Down