From 04a8e5f07afee7fea2a83f8cac493fce19e88aa9 Mon Sep 17 00:00:00 2001 From: Yves Piel Date: Wed, 23 Oct 2024 18:19:11 +0200 Subject: [PATCH 1/9] feat(TCOMP-2828): Add a sample connector to test @AfterGroup only once. --- .../output/WithAfterGroupOnlyOnce.java | 99 +++++++++++++++++++ 1 file changed, 99 insertions(+) create mode 100644 sample-parent/sample-connector/src/main/java/org/talend/sdk/component/test/connectors/output/WithAfterGroupOnlyOnce.java diff --git a/sample-parent/sample-connector/src/main/java/org/talend/sdk/component/test/connectors/output/WithAfterGroupOnlyOnce.java b/sample-parent/sample-connector/src/main/java/org/talend/sdk/component/test/connectors/output/WithAfterGroupOnlyOnce.java new file mode 100644 index 0000000000000..e207a0e9260e9 --- /dev/null +++ b/sample-parent/sample-connector/src/main/java/org/talend/sdk/component/test/connectors/output/WithAfterGroupOnlyOnce.java @@ -0,0 +1,99 @@ +package org.talend.sdk.component.test.connectors.output; + +import org.talend.sdk.component.api.component.Icon; +import org.talend.sdk.component.api.component.Version; +import org.talend.sdk.component.api.configuration.Option; +import org.talend.sdk.component.api.configuration.ui.layout.GridLayout; +import org.talend.sdk.component.api.meta.Documentation; +import org.talend.sdk.component.api.processor.AfterGroup; +import org.talend.sdk.component.api.processor.ElementListener; +import org.talend.sdk.component.api.processor.Input; +import org.talend.sdk.component.api.processor.Output; +import org.talend.sdk.component.api.processor.OutputEmitter; +import org.talend.sdk.component.api.processor.Processor; +import org.talend.sdk.component.api.record.Record; +import org.talend.sdk.component.api.record.Schema; +import org.talend.sdk.component.api.service.record.RecordBuilderFactory; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.io.Serializable; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +/** + * This output connector should consume all input record but have + * only one call to @AfterGroup method after all input records are consumed. + */ + +@Slf4j +@Version(1) +@Icon(value = Icon.IconType.CUSTOM, custom = "output") +@Processor(name = "WithAfterGroupOnlyOnce") +@Documentation("Consume all input records and should have only 1 call to @AfterGroup.") +public class WithAfterGroupOnlyOnce implements Serializable { + + private final RecordBuilderFactory recordBuilderFactory; + private final WithAfterGroupOnlyOnceConfig config; + + private int nbConsumedRecords; + private boolean afterGroupCalled; + + + public WithAfterGroupOnlyOnce(final @Option("configuration") WithAfterGroupOnlyOnceConfig config, + final RecordBuilderFactory recordBuilderFactory) { + this.recordBuilderFactory = recordBuilderFactory; + this.config = config; + } + + @PostConstruct + public void init() { + this.nbConsumedRecords = 0; + } + + @PreDestroy + public void release() { + if(!this.afterGroupCalled){ + throw new RuntimeException("The @AfterGroup method has not been called."); + } + } + + @ElementListener + public void onNext(@Input final Record record) { + this.nbConsumedRecords++; + } + + @AfterGroup + public void afterGroup(@Output("REJECT") final OutputEmitter rejected) { + if (this.afterGroupCalled){ + Record error = this.recordBuilderFactory.newRecordBuilder() + .withString("error", + "The @AfterGroup method has been called more than once.") + .build(); + rejected.emit(error); + } + + if (this.nbConsumedRecords != this.config.getExpectedNumberOfRecords()){ + Record error = this.recordBuilderFactory.newRecordBuilder() + .withString("error", + String.format("The number of consumed records '%s' is not the expected one %s.", + this.nbConsumedRecords, this.config.getExpectedNumberOfRecords())) + .build(); + rejected.emit(error); + } + + this.afterGroupCalled = true; + } + + @Data + @GridLayout({@GridLayout.Row({"expectedNumberOfRecords"})}) + public static class WithAfterGroupOnlyOnceConfig implements Serializable { + + @Option + @Documentation("The number of expected record processed when @AfterGroup is called") + private int expectedNumberOfRecords; + + } + +} From 81e648e8c0e356f3b224390c2d1bb44ca461b1df Mon Sep 17 00:00:00 2001 From: Yves Piel Date: Thu, 24 Oct 2024 11:01:19 +0200 Subject: [PATCH 2/9] feat(TCOMP-2828): Fix missing resources. --- .../output/WithAfterGroupOnlyOnce.java | 40 +++++++++++++------ .../connectors/output/Messages.properties | 6 ++- 2 files changed, 33 insertions(+), 13 deletions(-) diff --git a/sample-parent/sample-connector/src/main/java/org/talend/sdk/component/test/connectors/output/WithAfterGroupOnlyOnce.java b/sample-parent/sample-connector/src/main/java/org/talend/sdk/component/test/connectors/output/WithAfterGroupOnlyOnce.java index e207a0e9260e9..1991f42e21507 100644 --- a/sample-parent/sample-connector/src/main/java/org/talend/sdk/component/test/connectors/output/WithAfterGroupOnlyOnce.java +++ b/sample-parent/sample-connector/src/main/java/org/talend/sdk/component/test/connectors/output/WithAfterGroupOnlyOnce.java @@ -1,5 +1,25 @@ +/** + * Copyright (C) 2006-2024 Talend Inc. - www.talend.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.talend.sdk.component.test.connectors.output; +import java.io.Serializable; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + import org.talend.sdk.component.api.component.Icon; import org.talend.sdk.component.api.component.Version; import org.talend.sdk.component.api.configuration.Option; @@ -12,13 +32,8 @@ import org.talend.sdk.component.api.processor.OutputEmitter; import org.talend.sdk.component.api.processor.Processor; import org.talend.sdk.component.api.record.Record; -import org.talend.sdk.component.api.record.Schema; import org.talend.sdk.component.api.service.record.RecordBuilderFactory; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import java.io.Serializable; - import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -35,14 +50,15 @@ public class WithAfterGroupOnlyOnce implements Serializable { private final RecordBuilderFactory recordBuilderFactory; + private final WithAfterGroupOnlyOnceConfig config; private int nbConsumedRecords; - private boolean afterGroupCalled; + private boolean afterGroupCalled; public WithAfterGroupOnlyOnce(final @Option("configuration") WithAfterGroupOnlyOnceConfig config, - final RecordBuilderFactory recordBuilderFactory) { + final RecordBuilderFactory recordBuilderFactory) { this.recordBuilderFactory = recordBuilderFactory; this.config = config; } @@ -54,7 +70,7 @@ public void init() { @PreDestroy public void release() { - if(!this.afterGroupCalled){ + if (!this.afterGroupCalled) { throw new RuntimeException("The @AfterGroup method has not been called."); } } @@ -66,7 +82,7 @@ public void onNext(@Input final Record record) { @AfterGroup public void afterGroup(@Output("REJECT") final OutputEmitter rejected) { - if (this.afterGroupCalled){ + if (this.afterGroupCalled) { Record error = this.recordBuilderFactory.newRecordBuilder() .withString("error", "The @AfterGroup method has been called more than once.") @@ -74,7 +90,7 @@ public void afterGroup(@Output("REJECT") final OutputEmitter rejected) { rejected.emit(error); } - if (this.nbConsumedRecords != this.config.getExpectedNumberOfRecords()){ + if (this.nbConsumedRecords != this.config.getExpectedNumberOfRecords()) { Record error = this.recordBuilderFactory.newRecordBuilder() .withString("error", String.format("The number of consumed records '%s' is not the expected one %s.", @@ -87,11 +103,11 @@ public void afterGroup(@Output("REJECT") final OutputEmitter rejected) { } @Data - @GridLayout({@GridLayout.Row({"expectedNumberOfRecords"})}) + @GridLayout({ @GridLayout.Row({ "expectedNumberOfRecords" }) }) public static class WithAfterGroupOnlyOnceConfig implements Serializable { @Option - @Documentation("The number of expected record processed when @AfterGroup is called") + @Documentation("The number of expected record processed when @AfterGroup is called.") private int expectedNumberOfRecords; } diff --git a/sample-parent/sample-connector/src/main/resources/org/talend/sdk/component/test/connectors/output/Messages.properties b/sample-parent/sample-connector/src/main/resources/org/talend/sdk/component/test/connectors/output/Messages.properties index 0178c52fea789..9e6bf0299cd79 100644 --- a/sample-parent/sample-connector/src/main/resources/org/talend/sdk/component/test/connectors/output/Messages.properties +++ b/sample-parent/sample-connector/src/main/resources/org/talend/sdk/component/test/connectors/output/Messages.properties @@ -18,4 +18,8 @@ # $ python i18n_messages_properties_generator.py the_family.TheOutput1._displayName = Name: The Output 1 -the_family.TheOutput1._documentation = Doc: This is a sample output. \ No newline at end of file +the_family.TheOutput1._documentation = Doc: This is a sample output. +WithAfterGroupOnlyOnceConfig.expectedNumberOfRecords._displayName = +WithAfterGroupOnlyOnceConfig.expectedNumberOfRecords._placeholder = + +the_family.WithAfterGroupOnlyOnce._displayName = WithAfterGroupOnlyOnce \ No newline at end of file From 1e6614376580fd6d13e80a47e0a8d33bb415a7d9 Mon Sep 17 00:00:00 2001 From: Axel Catoire Date: Thu, 24 Oct 2024 14:17:14 +0200 Subject: [PATCH 3/9] feat(TCOMP-2828): Fix API Test --- .../tck_component_index_api_test.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/talend-component-maven-plugin/src/it/web/test/tck-component-index-api-test/tck_component_index_api_test.json b/talend-component-maven-plugin/src/it/web/test/tck-component-index-api-test/tck_component_index_api_test.json index f6f179e17e275..55eb0fe769d8f 100644 --- a/talend-component-maven-plugin/src/it/web/test/tck-component-index-api-test/tck_component_index_api_test.json +++ b/talend-component-maven-plugin/src/it/web/test/tck-component-index-api-test/tck_component_index_api_test.json @@ -163,7 +163,7 @@ "value": "200" }, { - "comparison": "LengthEqual", + "comparison": "LengthGreaterThanOrEqual", "subject": "ResponseJsonBody", "path": "$.components", "value": "9" From 159daadac433312fda0ad5ac13e413022b2b0b9f Mon Sep 17 00:00:00 2001 From: yyin-talend Date: Fri, 25 Oct 2024 16:36:04 +0800 Subject: [PATCH 4/9] feat(TCOMP-2828):[TCK Processor]: support output rows after all inputs have go through @ElementListener method --- .../manager/chain/AutoChunkProcessor.java | 6 +- .../runtime/di/AutoChunkProcessor.java | 18 + .../beam/components/DIBulkAutoChunkTest.java | 539 ++++++++++++++++++ 3 files changed, 560 insertions(+), 3 deletions(-) create mode 100644 component-studio/component-runtime-di/src/test/java/org/talend/sdk/component/runtime/di/beam/components/DIBulkAutoChunkTest.java diff --git a/component-runtime-manager/src/main/java/org/talend/sdk/component/runtime/manager/chain/AutoChunkProcessor.java b/component-runtime-manager/src/main/java/org/talend/sdk/component/runtime/manager/chain/AutoChunkProcessor.java index 1f3f9d5b2ee32..ac2272a4e396d 100644 --- a/component-runtime-manager/src/main/java/org/talend/sdk/component/runtime/manager/chain/AutoChunkProcessor.java +++ b/component-runtime-manager/src/main/java/org/talend/sdk/component/runtime/manager/chain/AutoChunkProcessor.java @@ -25,11 +25,11 @@ @RequiredArgsConstructor public class AutoChunkProcessor implements Lifecycle { - private final int chunkSize; + protected final int chunkSize; - private final Processor processor; + protected final Processor processor; - private int processedItemCount = 0; + protected int processedItemCount = 0; public void onElement(final InputFactory ins, final OutputFactory outs) { if (processedItemCount == 0) { diff --git a/component-studio/component-runtime-di/src/main/java/org/talend/sdk/component/runtime/di/AutoChunkProcessor.java b/component-studio/component-runtime-di/src/main/java/org/talend/sdk/component/runtime/di/AutoChunkProcessor.java index 1c42db63da34a..04d0a5aad66de 100644 --- a/component-studio/component-runtime-di/src/main/java/org/talend/sdk/component/runtime/di/AutoChunkProcessor.java +++ b/component-studio/component-runtime-di/src/main/java/org/talend/sdk/component/runtime/di/AutoChunkProcessor.java @@ -15,6 +15,8 @@ */ package org.talend.sdk.component.runtime.di; +import org.talend.sdk.component.runtime.output.InputFactory; +import org.talend.sdk.component.runtime.output.OutputFactory; import org.talend.sdk.component.runtime.output.Processor; /* @@ -26,4 +28,20 @@ public class AutoChunkProcessor extends org.talend.sdk.component.runtime.manager public AutoChunkProcessor(final int chunkSize, final Processor processor) { super(chunkSize, processor); } + + @Override + public void onElement(final InputFactory ins, final OutputFactory outs) { + if (processedItemCount == 0) { + processor.beforeGroup(); + } + try { + processor.onNext(ins, outs); + processedItemCount++; + } finally { + if (processedItemCount == chunkSize || chunkSize < 0) { + processor.afterGroup(outs); + processedItemCount = 0; + } + } + } } diff --git a/component-studio/component-runtime-di/src/test/java/org/talend/sdk/component/runtime/di/beam/components/DIBulkAutoChunkTest.java b/component-studio/component-runtime-di/src/test/java/org/talend/sdk/component/runtime/di/beam/components/DIBulkAutoChunkTest.java new file mode 100644 index 0000000000000..5a07dfa3c7400 --- /dev/null +++ b/component-studio/component-runtime-di/src/test/java/org/talend/sdk/component/runtime/di/beam/components/DIBulkAutoChunkTest.java @@ -0,0 +1,539 @@ +/** + * Copyright (C) 2006-2024 Talend Inc. - www.talend.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.talend.sdk.component.runtime.di.beam.components; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; +import static java.util.stream.Collectors.toList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.fail; + +import java.io.File; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.PrimitiveIterator; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import javax.json.JsonBuilderFactory; +import javax.json.bind.Jsonb; +import javax.json.spi.JsonProvider; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.talend.sdk.component.api.configuration.Option; +import org.talend.sdk.component.api.configuration.constraint.Required; +import org.talend.sdk.component.api.configuration.type.DataStore; +import org.talend.sdk.component.api.context.RuntimeContext; +import org.talend.sdk.component.api.context.RuntimeContextHolder; +import org.talend.sdk.component.api.exception.ComponentException; +import org.talend.sdk.component.api.input.Emitter; +import org.talend.sdk.component.api.input.Producer; +import org.talend.sdk.component.api.meta.Documentation; +import org.talend.sdk.component.api.processor.AfterGroup; +import org.talend.sdk.component.api.processor.BeforeGroup; +import org.talend.sdk.component.api.processor.ElementListener; +import org.talend.sdk.component.api.processor.Input; +import org.talend.sdk.component.api.processor.Output; +import org.talend.sdk.component.api.processor.OutputEmitter; +import org.talend.sdk.component.api.record.Record; +import org.talend.sdk.component.api.service.Service; +import org.talend.sdk.component.api.service.connection.CloseConnection; +import org.talend.sdk.component.api.service.connection.CloseConnectionObject; +import org.talend.sdk.component.api.service.connection.Connection; +import org.talend.sdk.component.api.service.connection.CreateConnection; +import org.talend.sdk.component.api.service.record.RecordBuilderFactory; +import org.talend.sdk.component.runtime.di.AutoChunkProcessor; +import org.talend.sdk.component.runtime.di.InputsHandler; +import org.talend.sdk.component.runtime.di.JobStateAware; +import org.talend.sdk.component.runtime.di.OutputsHandler; +import org.talend.sdk.component.runtime.di.studio.RuntimeContextInjector; +import org.talend.sdk.component.runtime.input.Mapper; +import org.talend.sdk.component.runtime.manager.ComponentManager; +import org.talend.sdk.component.runtime.manager.ContainerComponentRegistry; +import org.talend.sdk.component.runtime.manager.chain.ChainedMapper; +import org.talend.sdk.component.runtime.output.InputFactory; +import org.talend.sdk.component.runtime.output.OutputFactory; +import org.talend.sdk.component.runtime.output.Processor; +import org.talend.sdk.component.runtime.output.ProcessorImpl; +import org.talend.sdk.component.runtime.record.RecordConverters; +import org.talend.sdk.component.runtime.record.RecordImpl; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; + +public class DIBulkAutoChunkTest { + + protected static RecordBuilderFactory builderFactory; + + // do the same thing with studio + private static final Map globalMap = Collections.synchronizedMap(new HashMap<>()); + + @BeforeAll + static void forceManagerInit() { + final ComponentManager manager = ComponentManager.instance(); + if (manager.find(Stream::of).count() == 0) { + manager.addPlugin(new File("target" + File.separator + "test-classes").getAbsolutePath()); + } + } + + @Test + void fromRecordToRowStructToRecord() { + final ComponentManager manager = ComponentManager.instance(); + final Collection sourceData = new ArrayList<>(); + final Collection processorData = new ArrayList<>(); + + globalMap.put("key", "value"); + globalMap.put("outputDi_1_key", "value4Output"); + globalMap.put("inputDi_1_key", "value4Input"); + globalMap.put("connection_1_key", "value4Connection"); + globalMap.put("close_1_key", "value4Close"); + + callConnectionComponent(manager); + + doDi(manager, sourceData, processorData, + manager.findProcessor("DIBulkAutoChunkTest", "outputDi", 1, emptyMap()), + manager.findMapper("DIBulkAutoChunkTest", "inputDi", 1, singletonMap("count", "1000"))); + assertEquals(1000, sourceData.size()); + assertEquals(1000, processorData.size()); + + callCloseComponent(manager); + } + + private void callCloseComponent(final ComponentManager manager) { + String plugin = "test-classes"; + RuntimeContextInjector.injectService(manager, plugin, new RuntimeContextHolder("close_1", globalMap)); + + manager + .findPlugin(plugin) + .get() + .get(ContainerComponentRegistry.class) + .getServices() + .stream() + .flatMap(c -> c.getActions().stream()) + .filter(actionMeta -> "close_connection".equals(actionMeta.getType())) + .forEach(actionMeta -> { + Object result = actionMeta.getInvoker().apply(null); + CloseConnectionObject cco = (CloseConnectionObject) result; + Object conn = globalMap.get("conn_tS3Connection_1"); + + injectValue(cco, conn); + + boolean r = cco.close(); + assertEquals(true, r); + }); + } + + private void callConnectionComponent(final ComponentManager manager) { + final Map runtimeParams = new HashMap<>(); + runtimeParams.put("conn.para1", "v1"); + runtimeParams.put("conn.para2", "200"); + + String plugin = "test-classes"; + + RuntimeContextInjector.injectService(manager, plugin, new RuntimeContextHolder("connection_1", globalMap)); + + manager + .findPlugin(plugin) + .get() + .get(ContainerComponentRegistry.class) + .getServices() + .stream() + .flatMap(c -> c.getActions().stream()) + .filter(actionMeta -> "create_connection".equals(actionMeta.getType())) + .forEach(actionMeta -> { + Object connnection = actionMeta.getInvoker().apply(runtimeParams); +// assertEquals("v1100connection_1value", connnection); + + globalMap.put("conn_tS3Connection_1", connnection); + }); + } + + private void doDi(final ComponentManager manager, final Collection sourceData, + final Collection processorData, final Optional proc, final Optional mapper) { + try { + final Processor processor = proc.orElseThrow(() -> new IllegalStateException("scanning failed")); + + RuntimeContextInjector.injectLifecycle(processor, new RuntimeContextHolder("outputDi_1", globalMap)); + + try { + Field field = processor.getClass().getSuperclass().getDeclaredField("delegate"); + if (!field.isAccessible()) { + field.setAccessible(true); + } + Object v = field.get(processor); + Object conn = globalMap.get("conn_tS3Connection_1"); + + injectValue(v, conn); + + } catch (Exception e) { + System.out.println(e); + } + + JobStateAware.init(processor, globalMap); + final Jsonb jsonbProcessor = Jsonb.class + .cast(manager + .findPlugin(processor.plugin()) + .get() + .get(ComponentManager.AllServices.class) + .getServices() + .get(Jsonb.class)); + + final AutoChunkProcessor processorProcessor = new AutoChunkProcessor(-1, processor); + + processorProcessor.start(); + globalMap.put("processorProcessor", processorProcessor); + + final Map, Object> servicesMapper = + manager.findPlugin(proc.get().plugin()).get().get(ComponentManager.AllServices.class).getServices(); + + final InputsHandler inputsHandlerProcessor = new InputsHandler(jsonbProcessor, servicesMapper); + inputsHandlerProcessor.addConnection("FLOW", row1Struct.class); + + final OutputsHandler outputHandlerProcessor = new OutputsHandler(jsonbProcessor, servicesMapper); + + final InputFactory inputsProcessor = inputsHandlerProcessor.asInputFactory(); + final OutputFactory outputsProcessor = outputHandlerProcessor.asOutputFactory(); + + final Mapper tempMapperMapper = mapper.orElseThrow(() -> new IllegalStateException("scanning failed")); + JobStateAware.init(tempMapperMapper, globalMap); + + RuntimeContextInjector.injectLifecycle(tempMapperMapper, new RuntimeContextHolder("inputDi_1", globalMap)); + + doRun(manager, sourceData, processorData, processorProcessor, inputsHandlerProcessor, + outputHandlerProcessor, inputsProcessor, outputsProcessor, tempMapperMapper); + } finally { + doClose(globalMap); + } + } + + private void injectValue(Object v, Object conn) { + Class current = v.getClass(); + while (current != null && current != Object.class) { + Stream.of(current.getDeclaredFields()).filter(f -> f.isAnnotationPresent(Connection.class)).forEach(f -> { + if (!f.isAccessible()) { + f.setAccessible(true); + } + try { + f.set(v, conn); + } catch (final IllegalAccessException e) { + throw new IllegalStateException(e); + } + }); + current = current.getSuperclass(); + } + } + + private void doRun(final ComponentManager manager, final Collection sourceData, + final Collection processorData, final AutoChunkProcessor processorProcessor, + final InputsHandler inputsHandlerProcessor, final OutputsHandler outputHandlerProcessor, + final InputFactory inputsProcessor, final OutputFactory outputsProcessor, final Mapper tempMapperMapper) { + row1Struct row1; + tempMapperMapper.start(); + final ChainedMapper mapperMapper; + try { + final List splitMappersMapper = tempMapperMapper.split(tempMapperMapper.assess()); + mapperMapper = new ChainedMapper(tempMapperMapper, splitMappersMapper.iterator()); + mapperMapper.start(); + globalMap.put("mapperMapper", mapperMapper); + } finally { + try { + tempMapperMapper.stop(); + } catch (final RuntimeException re) { + re.printStackTrace(); + } + } + + final org.talend.sdk.component.runtime.input.Input inputMapper = mapperMapper.create(); + inputMapper.start(); + globalMap.put("inputMapper", inputMapper); + + final Map, Object> servicesMapper = + manager.findPlugin(mapperMapper.plugin()).get().get(ComponentManager.AllServices.class).getServices(); + final Jsonb jsonbMapper = Jsonb.class.cast(servicesMapper.get(Jsonb.class)); + final JsonProvider jsonProvider = JsonProvider.class.cast(servicesMapper.get(JsonProvider.class)); + final JsonBuilderFactory jsonBuilderFactory = + JsonBuilderFactory.class.cast(servicesMapper.get(JsonBuilderFactory.class)); + final RecordBuilderFactory recordBuilderMapper = + RecordBuilderFactory.class.cast(servicesMapper.get(RecordBuilderFactory.class)); + builderFactory = recordBuilderMapper; + final RecordConverters converters = new RecordConverters(); + final RecordConverters.MappingMetaRegistry registry = new RecordConverters.MappingMetaRegistry(); + + Object dataMapper; + while ((dataMapper = inputMapper.next()) != null) { + row1 = row1Struct.class.cast(registry.find(row1Struct.class).newInstance(Record.class.cast(dataMapper))); + + sourceData.add(row1); + + inputsHandlerProcessor.reset(); + inputsHandlerProcessor.setInputValue("FLOW", row1); + outputHandlerProcessor.reset(); + processorProcessor.onElement(name -> { +// assertEquals(Branches.DEFAULT_BRANCH, name); + + final Object read = inputsProcessor.read(name); + processorData.add(read); + + return read; + }, outputsProcessor); + } + } + + private void doClose(final Map globalMap) { + final Mapper mapperMapper = Mapper.class.cast(globalMap.remove("mapperMapper")); + final org.talend.sdk.component.runtime.input.Input inputMapper = + org.talend.sdk.component.runtime.input.Input.class.cast(globalMap.remove("inputMapper")); + try { + if (inputMapper != null) { + inputMapper.stop(); + } + } catch (final RuntimeException re) { + fail(re.getMessage()); + } finally { + try { + if (mapperMapper != null) { + mapperMapper.stop(); + } + } catch (final RuntimeException re) { + fail(re.getMessage()); + } + } + + final AutoChunkProcessor processorProcessor = + AutoChunkProcessor.class.cast(globalMap.remove("processorProcessor")); + try { + if (processorProcessor != null) { + processorProcessor.stop(); + } + } catch (final RuntimeException re) { + fail(re.getMessage()); + } + } + + @org.talend.sdk.component.api.processor.Processor(name = "outputDi", family = "DIBulkAutoChunkTest") + public static class OutputComponentDi implements Serializable { + + @RuntimeContext + private transient RuntimeContextHolder context; + + int counter; + + int groupCounter; + + @Connection + Object conn; + + @ElementListener + public void onElement(final Record record) { + // can get connection, if not null, can use it directly instead of creating again + assertNotNull(conn); + + counter++; +// if (counter % 100 == 0) { +// System.err.println("--on element: " + counter); +// } + } + + @BeforeGroup + public void beforeGroup() { + if (counter % 100 == 0) { + System.err.println("--before : " + counter); + } + } + + @AfterGroup + public void afterGroup(@Output("reject") final OutputEmitter reject) { + groupCounter++; + if (groupCounter % 100 == 0) { + System.err.println("--after group: " + groupCounter); + } + } + } + + @Data + @DataStore("TestDataStore") + public static class TestDataStore implements Serializable { + + @Option + @Documentation("parameter 1") + private String para1; + + @Option + @Documentation("parameter 2") + private int para2; + + @Option + @Required + @Documentation("parameter 3") + private int para3; + } + + @Service + public static class MyService implements Serializable { + + @RuntimeContext + private transient RuntimeContextHolder context; + + @CreateConnection + public Object createConn(@Option("conn") final TestDataStore dataStore) throws ComponentException { + // create connection + assertEquals("value4Connection", context.get("key")); + return dataStore.getPara1() + dataStore.getPara2() + context.getConnectorId() + context.getGlobal("key"); + } + + @CloseConnection + public CloseConnectionObject closeConn() { + return new CloseConnectionObject() { + + public boolean close() throws ComponentException { + assertEquals("value4Close", context.get("key")); + + return "v1100connection_1value".equals(this.getConnection()) + && "value".equals(context.getGlobal("key")) + && "close_1".equals(context.getConnectorId()); + } + + }; + } + } + + @Emitter(name = "inputDi", family = "DIBulkAutoChunkTest") + public static class InputComponentDi implements Serializable { + + @RuntimeContext + private transient RuntimeContextHolder context; + + private final PrimitiveIterator.OfInt stream; + + public InputComponentDi(@Option("count") final int count) { + stream = IntStream.range(0, count).iterator(); + } + + @Connection + Object conn; + + @Producer + public Record next() { + if (!stream.hasNext()) { + return null; + } + + final Integer i = stream.next(); + final Record record = builderFactory + .newRecordBuilder() + .withString("id", String.valueOf(i)) + .withString("name", "record" + i) + .build(); + return record; + } + } + + @Getter + @ToString + public static class row1Struct implements routines.system.IPersistableRow { + + public String id; + + public String name; + + @Override + public void writeData(final ObjectOutputStream objectOutputStream) { + throw new UnsupportedOperationException("#writeData()"); + } + + @Override + public void readData(final ObjectInputStream objectInputStream) { + throw new UnsupportedOperationException("#readData()"); + } + } + + ////////////////////////////////////////// + private static final OutputFactory NO_OUTPUT = name -> value -> { + // no-op + }; + + //@Test + void bulkGroup() { + Bufferized.RECORDS = null; + final org.talend.sdk.component.runtime.output.Processor processor = + new ProcessorImpl("Root", "Test", "Plugin", emptyMap(), new Bufferized()); + final AutoChunkProcessor chunkProcessor = new AutoChunkProcessor(-1, processor); + chunkProcessor.start(); + for (int i = 0; i < 5; i++) { + final Collection data = IntStream + .rangeClosed(1, 9) + .mapToObj(idx -> new RecordImpl.BuilderImpl().withInt("value", idx).build()) + .collect(toList()); + // processor.beforeGroup(); + + data.forEach(it -> processor.onNext(n -> it, null)); + assertNull(Bufferized.RECORDS); + // chunkProcessor.afterGroup(null); + assertEquals(data, Bufferized.RECORDS); + Bufferized.RECORDS = null; + } + processor.stop(); + } + + public static class SampleProcessor implements Serializable { + + final Collection stack = new ArrayList<>(); + + @ElementListener + public void elementListener(@Input final Sample sample, @Output("reject") final OutputEmitter reject) { + stack.add("next{" + sample.data + "}"); + System.err.println("---" + sample.data); + } + + @AfterGroup + public void afterGroup(@Output("reject") final OutputEmitter reject) { + + } + } + + public static class Bufferized implements Serializable { + + private static Collection RECORDS; + + @AfterGroup + public void onCommit(final Collection records) { + RECORDS = records; + } + } + + @Data + @AllArgsConstructor + @NoArgsConstructor + public static class Sample implements Serializable { + + private int data; + } +} From ca59802398e43281d55c3f62077716f97ccb251d Mon Sep 17 00:00:00 2001 From: yyin-talend Date: Wed, 30 Oct 2024 10:24:58 +0800 Subject: [PATCH 5/9] feature(QTDI-653): add junit case --- .../runtime/di/AutoChunkProcessor.java | 6 +- sample-parent/sample-connector/pom.xml | 16 ++++- .../test/connectors/package-info.java | 4 +- .../test/connectors/output/OutputTest.java | 67 +++++++++++++++++++ 4 files changed, 87 insertions(+), 6 deletions(-) create mode 100644 sample-parent/sample-connector/src/test/java/org/talend/sdk/component/test/connectors/output/OutputTest.java diff --git a/component-studio/component-runtime-di/src/main/java/org/talend/sdk/component/runtime/di/AutoChunkProcessor.java b/component-studio/component-runtime-di/src/main/java/org/talend/sdk/component/runtime/di/AutoChunkProcessor.java index 04d0a5aad66de..aa78f504688ab 100644 --- a/component-studio/component-runtime-di/src/main/java/org/talend/sdk/component/runtime/di/AutoChunkProcessor.java +++ b/component-studio/component-runtime-di/src/main/java/org/talend/sdk/component/runtime/di/AutoChunkProcessor.java @@ -38,7 +38,11 @@ public void onElement(final InputFactory ins, final OutputFactory outs) { processor.onNext(ins, outs); processedItemCount++; } finally { - if (processedItemCount == chunkSize || chunkSize < 0) { + //QTDI- : when set chunkSize = -1, not separate by group. only trigger aftergroup once in flush() + if (chunkSize < 0) { + return; + } + if (processedItemCount == chunkSize) { processor.afterGroup(outs); processedItemCount = 0; } diff --git a/sample-parent/sample-connector/pom.xml b/sample-parent/sample-connector/pom.xml index 80f982d04a13c..4602397b74de3 100644 --- a/sample-parent/sample-connector/pom.xml +++ b/sample-parent/sample-connector/pom.xml @@ -13,8 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. --> - - + 4.0.0 @@ -61,6 +60,18 @@ 1.65.0-SNAPSHOT compile + + org.talend.sdk.component + component-runtime-manager + 1.65.0-SNAPSHOT + test + + + org.talend.sdk.component + component-runtime-junit + 1.65.0-SNAPSHOT + test + @@ -113,7 +124,6 @@ true true true - true true true true diff --git a/sample-parent/sample-connector/src/main/java/org/talend/sdk/component/test/connectors/package-info.java b/sample-parent/sample-connector/src/main/java/org/talend/sdk/component/test/connectors/package-info.java index 1466ed5af06ea..23b9e2f12b451 100644 --- a/sample-parent/sample-connector/src/main/java/org/talend/sdk/component/test/connectors/package-info.java +++ b/sample-parent/sample-connector/src/main/java/org/talend/sdk/component/test/connectors/package-info.java @@ -14,8 +14,8 @@ * limitations under the License. */ @Components( - family = "the_family", - categories = "the_category_1") + family = "Sample", + categories = "Cloud") @Icon( value = Icon.IconType.CUSTOM, custom = "family") diff --git a/sample-parent/sample-connector/src/test/java/org/talend/sdk/component/test/connectors/output/OutputTest.java b/sample-parent/sample-connector/src/test/java/org/talend/sdk/component/test/connectors/output/OutputTest.java new file mode 100644 index 0000000000000..f5dc5bf906832 --- /dev/null +++ b/sample-parent/sample-connector/src/test/java/org/talend/sdk/component/test/connectors/output/OutputTest.java @@ -0,0 +1,67 @@ +/* + * Copyright (C) 2006-2024 Talend Inc. - www.talend.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.talend.sdk.component.test.connectors.output; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Assert; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.talend.sdk.component.api.record.Record; +import org.talend.sdk.component.api.service.record.RecordBuilderFactory; +import org.talend.sdk.component.junit.BaseComponentsHandler; +import org.talend.sdk.component.junit5.Injected; +import org.talend.sdk.component.junit5.WithComponents; +import org.talend.sdk.component.runtime.manager.chain.Job; + +@WithComponents("org.talend.sdk.component.test.connectors") +public class OutputTest { + + @Injected + protected BaseComponentsHandler componentsHandler; + + private final String testStringValue = "test"; + + private final boolean testBooleanValue = true; + + @Test + void testOutput() { + final int recordSize = 15; + + Record testRecord = componentsHandler + .findService(RecordBuilderFactory.class) + .newRecordBuilder() + .withString("stringValue", testStringValue) + .withBoolean("booleanValue", testBooleanValue) + .build(); + + List testRecords = new ArrayList<>(); + for (int i = 0; i < recordSize; i++) { + testRecords.add(testRecord); + } + componentsHandler.setInputData(testRecords); + + Job + .components() + .component("inputFlow", "test://emitter") + .component("outputComponent", "Sample://WithAfterGroupOnlyOnce?$maxBatchSize=-1") + .connections() + .from("inputFlow") + .to("outputComponent") + .build() + .run(); + + Assert.assertTrue(true); + } +} From 1e6d25da8adc5eb67d22139fa56bbbe8ec710707 Mon Sep 17 00:00:00 2001 From: yyin-talend Date: Wed, 30 Oct 2024 16:05:55 +0800 Subject: [PATCH 6/9] remove useless junit --- .../beam/components/DIBulkAutoChunkTest.java | 539 ------------------ 1 file changed, 539 deletions(-) delete mode 100644 component-studio/component-runtime-di/src/test/java/org/talend/sdk/component/runtime/di/beam/components/DIBulkAutoChunkTest.java diff --git a/component-studio/component-runtime-di/src/test/java/org/talend/sdk/component/runtime/di/beam/components/DIBulkAutoChunkTest.java b/component-studio/component-runtime-di/src/test/java/org/talend/sdk/component/runtime/di/beam/components/DIBulkAutoChunkTest.java deleted file mode 100644 index 5a07dfa3c7400..0000000000000 --- a/component-studio/component-runtime-di/src/test/java/org/talend/sdk/component/runtime/di/beam/components/DIBulkAutoChunkTest.java +++ /dev/null @@ -1,539 +0,0 @@ -/** - * Copyright (C) 2006-2024 Talend Inc. - www.talend.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.talend.sdk.component.runtime.di.beam.components; - -import static java.util.Collections.emptyMap; -import static java.util.Collections.singletonMap; -import static java.util.stream.Collectors.toList; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.fail; - -import java.io.File; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.PrimitiveIterator; -import java.util.stream.IntStream; -import java.util.stream.Stream; - -import javax.json.JsonBuilderFactory; -import javax.json.bind.Jsonb; -import javax.json.spi.JsonProvider; - -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import org.talend.sdk.component.api.configuration.Option; -import org.talend.sdk.component.api.configuration.constraint.Required; -import org.talend.sdk.component.api.configuration.type.DataStore; -import org.talend.sdk.component.api.context.RuntimeContext; -import org.talend.sdk.component.api.context.RuntimeContextHolder; -import org.talend.sdk.component.api.exception.ComponentException; -import org.talend.sdk.component.api.input.Emitter; -import org.talend.sdk.component.api.input.Producer; -import org.talend.sdk.component.api.meta.Documentation; -import org.talend.sdk.component.api.processor.AfterGroup; -import org.talend.sdk.component.api.processor.BeforeGroup; -import org.talend.sdk.component.api.processor.ElementListener; -import org.talend.sdk.component.api.processor.Input; -import org.talend.sdk.component.api.processor.Output; -import org.talend.sdk.component.api.processor.OutputEmitter; -import org.talend.sdk.component.api.record.Record; -import org.talend.sdk.component.api.service.Service; -import org.talend.sdk.component.api.service.connection.CloseConnection; -import org.talend.sdk.component.api.service.connection.CloseConnectionObject; -import org.talend.sdk.component.api.service.connection.Connection; -import org.talend.sdk.component.api.service.connection.CreateConnection; -import org.talend.sdk.component.api.service.record.RecordBuilderFactory; -import org.talend.sdk.component.runtime.di.AutoChunkProcessor; -import org.talend.sdk.component.runtime.di.InputsHandler; -import org.talend.sdk.component.runtime.di.JobStateAware; -import org.talend.sdk.component.runtime.di.OutputsHandler; -import org.talend.sdk.component.runtime.di.studio.RuntimeContextInjector; -import org.talend.sdk.component.runtime.input.Mapper; -import org.talend.sdk.component.runtime.manager.ComponentManager; -import org.talend.sdk.component.runtime.manager.ContainerComponentRegistry; -import org.talend.sdk.component.runtime.manager.chain.ChainedMapper; -import org.talend.sdk.component.runtime.output.InputFactory; -import org.talend.sdk.component.runtime.output.OutputFactory; -import org.talend.sdk.component.runtime.output.Processor; -import org.talend.sdk.component.runtime.output.ProcessorImpl; -import org.talend.sdk.component.runtime.record.RecordConverters; -import org.talend.sdk.component.runtime.record.RecordImpl; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.ToString; - -public class DIBulkAutoChunkTest { - - protected static RecordBuilderFactory builderFactory; - - // do the same thing with studio - private static final Map globalMap = Collections.synchronizedMap(new HashMap<>()); - - @BeforeAll - static void forceManagerInit() { - final ComponentManager manager = ComponentManager.instance(); - if (manager.find(Stream::of).count() == 0) { - manager.addPlugin(new File("target" + File.separator + "test-classes").getAbsolutePath()); - } - } - - @Test - void fromRecordToRowStructToRecord() { - final ComponentManager manager = ComponentManager.instance(); - final Collection sourceData = new ArrayList<>(); - final Collection processorData = new ArrayList<>(); - - globalMap.put("key", "value"); - globalMap.put("outputDi_1_key", "value4Output"); - globalMap.put("inputDi_1_key", "value4Input"); - globalMap.put("connection_1_key", "value4Connection"); - globalMap.put("close_1_key", "value4Close"); - - callConnectionComponent(manager); - - doDi(manager, sourceData, processorData, - manager.findProcessor("DIBulkAutoChunkTest", "outputDi", 1, emptyMap()), - manager.findMapper("DIBulkAutoChunkTest", "inputDi", 1, singletonMap("count", "1000"))); - assertEquals(1000, sourceData.size()); - assertEquals(1000, processorData.size()); - - callCloseComponent(manager); - } - - private void callCloseComponent(final ComponentManager manager) { - String plugin = "test-classes"; - RuntimeContextInjector.injectService(manager, plugin, new RuntimeContextHolder("close_1", globalMap)); - - manager - .findPlugin(plugin) - .get() - .get(ContainerComponentRegistry.class) - .getServices() - .stream() - .flatMap(c -> c.getActions().stream()) - .filter(actionMeta -> "close_connection".equals(actionMeta.getType())) - .forEach(actionMeta -> { - Object result = actionMeta.getInvoker().apply(null); - CloseConnectionObject cco = (CloseConnectionObject) result; - Object conn = globalMap.get("conn_tS3Connection_1"); - - injectValue(cco, conn); - - boolean r = cco.close(); - assertEquals(true, r); - }); - } - - private void callConnectionComponent(final ComponentManager manager) { - final Map runtimeParams = new HashMap<>(); - runtimeParams.put("conn.para1", "v1"); - runtimeParams.put("conn.para2", "200"); - - String plugin = "test-classes"; - - RuntimeContextInjector.injectService(manager, plugin, new RuntimeContextHolder("connection_1", globalMap)); - - manager - .findPlugin(plugin) - .get() - .get(ContainerComponentRegistry.class) - .getServices() - .stream() - .flatMap(c -> c.getActions().stream()) - .filter(actionMeta -> "create_connection".equals(actionMeta.getType())) - .forEach(actionMeta -> { - Object connnection = actionMeta.getInvoker().apply(runtimeParams); -// assertEquals("v1100connection_1value", connnection); - - globalMap.put("conn_tS3Connection_1", connnection); - }); - } - - private void doDi(final ComponentManager manager, final Collection sourceData, - final Collection processorData, final Optional proc, final Optional mapper) { - try { - final Processor processor = proc.orElseThrow(() -> new IllegalStateException("scanning failed")); - - RuntimeContextInjector.injectLifecycle(processor, new RuntimeContextHolder("outputDi_1", globalMap)); - - try { - Field field = processor.getClass().getSuperclass().getDeclaredField("delegate"); - if (!field.isAccessible()) { - field.setAccessible(true); - } - Object v = field.get(processor); - Object conn = globalMap.get("conn_tS3Connection_1"); - - injectValue(v, conn); - - } catch (Exception e) { - System.out.println(e); - } - - JobStateAware.init(processor, globalMap); - final Jsonb jsonbProcessor = Jsonb.class - .cast(manager - .findPlugin(processor.plugin()) - .get() - .get(ComponentManager.AllServices.class) - .getServices() - .get(Jsonb.class)); - - final AutoChunkProcessor processorProcessor = new AutoChunkProcessor(-1, processor); - - processorProcessor.start(); - globalMap.put("processorProcessor", processorProcessor); - - final Map, Object> servicesMapper = - manager.findPlugin(proc.get().plugin()).get().get(ComponentManager.AllServices.class).getServices(); - - final InputsHandler inputsHandlerProcessor = new InputsHandler(jsonbProcessor, servicesMapper); - inputsHandlerProcessor.addConnection("FLOW", row1Struct.class); - - final OutputsHandler outputHandlerProcessor = new OutputsHandler(jsonbProcessor, servicesMapper); - - final InputFactory inputsProcessor = inputsHandlerProcessor.asInputFactory(); - final OutputFactory outputsProcessor = outputHandlerProcessor.asOutputFactory(); - - final Mapper tempMapperMapper = mapper.orElseThrow(() -> new IllegalStateException("scanning failed")); - JobStateAware.init(tempMapperMapper, globalMap); - - RuntimeContextInjector.injectLifecycle(tempMapperMapper, new RuntimeContextHolder("inputDi_1", globalMap)); - - doRun(manager, sourceData, processorData, processorProcessor, inputsHandlerProcessor, - outputHandlerProcessor, inputsProcessor, outputsProcessor, tempMapperMapper); - } finally { - doClose(globalMap); - } - } - - private void injectValue(Object v, Object conn) { - Class current = v.getClass(); - while (current != null && current != Object.class) { - Stream.of(current.getDeclaredFields()).filter(f -> f.isAnnotationPresent(Connection.class)).forEach(f -> { - if (!f.isAccessible()) { - f.setAccessible(true); - } - try { - f.set(v, conn); - } catch (final IllegalAccessException e) { - throw new IllegalStateException(e); - } - }); - current = current.getSuperclass(); - } - } - - private void doRun(final ComponentManager manager, final Collection sourceData, - final Collection processorData, final AutoChunkProcessor processorProcessor, - final InputsHandler inputsHandlerProcessor, final OutputsHandler outputHandlerProcessor, - final InputFactory inputsProcessor, final OutputFactory outputsProcessor, final Mapper tempMapperMapper) { - row1Struct row1; - tempMapperMapper.start(); - final ChainedMapper mapperMapper; - try { - final List splitMappersMapper = tempMapperMapper.split(tempMapperMapper.assess()); - mapperMapper = new ChainedMapper(tempMapperMapper, splitMappersMapper.iterator()); - mapperMapper.start(); - globalMap.put("mapperMapper", mapperMapper); - } finally { - try { - tempMapperMapper.stop(); - } catch (final RuntimeException re) { - re.printStackTrace(); - } - } - - final org.talend.sdk.component.runtime.input.Input inputMapper = mapperMapper.create(); - inputMapper.start(); - globalMap.put("inputMapper", inputMapper); - - final Map, Object> servicesMapper = - manager.findPlugin(mapperMapper.plugin()).get().get(ComponentManager.AllServices.class).getServices(); - final Jsonb jsonbMapper = Jsonb.class.cast(servicesMapper.get(Jsonb.class)); - final JsonProvider jsonProvider = JsonProvider.class.cast(servicesMapper.get(JsonProvider.class)); - final JsonBuilderFactory jsonBuilderFactory = - JsonBuilderFactory.class.cast(servicesMapper.get(JsonBuilderFactory.class)); - final RecordBuilderFactory recordBuilderMapper = - RecordBuilderFactory.class.cast(servicesMapper.get(RecordBuilderFactory.class)); - builderFactory = recordBuilderMapper; - final RecordConverters converters = new RecordConverters(); - final RecordConverters.MappingMetaRegistry registry = new RecordConverters.MappingMetaRegistry(); - - Object dataMapper; - while ((dataMapper = inputMapper.next()) != null) { - row1 = row1Struct.class.cast(registry.find(row1Struct.class).newInstance(Record.class.cast(dataMapper))); - - sourceData.add(row1); - - inputsHandlerProcessor.reset(); - inputsHandlerProcessor.setInputValue("FLOW", row1); - outputHandlerProcessor.reset(); - processorProcessor.onElement(name -> { -// assertEquals(Branches.DEFAULT_BRANCH, name); - - final Object read = inputsProcessor.read(name); - processorData.add(read); - - return read; - }, outputsProcessor); - } - } - - private void doClose(final Map globalMap) { - final Mapper mapperMapper = Mapper.class.cast(globalMap.remove("mapperMapper")); - final org.talend.sdk.component.runtime.input.Input inputMapper = - org.talend.sdk.component.runtime.input.Input.class.cast(globalMap.remove("inputMapper")); - try { - if (inputMapper != null) { - inputMapper.stop(); - } - } catch (final RuntimeException re) { - fail(re.getMessage()); - } finally { - try { - if (mapperMapper != null) { - mapperMapper.stop(); - } - } catch (final RuntimeException re) { - fail(re.getMessage()); - } - } - - final AutoChunkProcessor processorProcessor = - AutoChunkProcessor.class.cast(globalMap.remove("processorProcessor")); - try { - if (processorProcessor != null) { - processorProcessor.stop(); - } - } catch (final RuntimeException re) { - fail(re.getMessage()); - } - } - - @org.talend.sdk.component.api.processor.Processor(name = "outputDi", family = "DIBulkAutoChunkTest") - public static class OutputComponentDi implements Serializable { - - @RuntimeContext - private transient RuntimeContextHolder context; - - int counter; - - int groupCounter; - - @Connection - Object conn; - - @ElementListener - public void onElement(final Record record) { - // can get connection, if not null, can use it directly instead of creating again - assertNotNull(conn); - - counter++; -// if (counter % 100 == 0) { -// System.err.println("--on element: " + counter); -// } - } - - @BeforeGroup - public void beforeGroup() { - if (counter % 100 == 0) { - System.err.println("--before : " + counter); - } - } - - @AfterGroup - public void afterGroup(@Output("reject") final OutputEmitter reject) { - groupCounter++; - if (groupCounter % 100 == 0) { - System.err.println("--after group: " + groupCounter); - } - } - } - - @Data - @DataStore("TestDataStore") - public static class TestDataStore implements Serializable { - - @Option - @Documentation("parameter 1") - private String para1; - - @Option - @Documentation("parameter 2") - private int para2; - - @Option - @Required - @Documentation("parameter 3") - private int para3; - } - - @Service - public static class MyService implements Serializable { - - @RuntimeContext - private transient RuntimeContextHolder context; - - @CreateConnection - public Object createConn(@Option("conn") final TestDataStore dataStore) throws ComponentException { - // create connection - assertEquals("value4Connection", context.get("key")); - return dataStore.getPara1() + dataStore.getPara2() + context.getConnectorId() + context.getGlobal("key"); - } - - @CloseConnection - public CloseConnectionObject closeConn() { - return new CloseConnectionObject() { - - public boolean close() throws ComponentException { - assertEquals("value4Close", context.get("key")); - - return "v1100connection_1value".equals(this.getConnection()) - && "value".equals(context.getGlobal("key")) - && "close_1".equals(context.getConnectorId()); - } - - }; - } - } - - @Emitter(name = "inputDi", family = "DIBulkAutoChunkTest") - public static class InputComponentDi implements Serializable { - - @RuntimeContext - private transient RuntimeContextHolder context; - - private final PrimitiveIterator.OfInt stream; - - public InputComponentDi(@Option("count") final int count) { - stream = IntStream.range(0, count).iterator(); - } - - @Connection - Object conn; - - @Producer - public Record next() { - if (!stream.hasNext()) { - return null; - } - - final Integer i = stream.next(); - final Record record = builderFactory - .newRecordBuilder() - .withString("id", String.valueOf(i)) - .withString("name", "record" + i) - .build(); - return record; - } - } - - @Getter - @ToString - public static class row1Struct implements routines.system.IPersistableRow { - - public String id; - - public String name; - - @Override - public void writeData(final ObjectOutputStream objectOutputStream) { - throw new UnsupportedOperationException("#writeData()"); - } - - @Override - public void readData(final ObjectInputStream objectInputStream) { - throw new UnsupportedOperationException("#readData()"); - } - } - - ////////////////////////////////////////// - private static final OutputFactory NO_OUTPUT = name -> value -> { - // no-op - }; - - //@Test - void bulkGroup() { - Bufferized.RECORDS = null; - final org.talend.sdk.component.runtime.output.Processor processor = - new ProcessorImpl("Root", "Test", "Plugin", emptyMap(), new Bufferized()); - final AutoChunkProcessor chunkProcessor = new AutoChunkProcessor(-1, processor); - chunkProcessor.start(); - for (int i = 0; i < 5; i++) { - final Collection data = IntStream - .rangeClosed(1, 9) - .mapToObj(idx -> new RecordImpl.BuilderImpl().withInt("value", idx).build()) - .collect(toList()); - // processor.beforeGroup(); - - data.forEach(it -> processor.onNext(n -> it, null)); - assertNull(Bufferized.RECORDS); - // chunkProcessor.afterGroup(null); - assertEquals(data, Bufferized.RECORDS); - Bufferized.RECORDS = null; - } - processor.stop(); - } - - public static class SampleProcessor implements Serializable { - - final Collection stack = new ArrayList<>(); - - @ElementListener - public void elementListener(@Input final Sample sample, @Output("reject") final OutputEmitter reject) { - stack.add("next{" + sample.data + "}"); - System.err.println("---" + sample.data); - } - - @AfterGroup - public void afterGroup(@Output("reject") final OutputEmitter reject) { - - } - } - - public static class Bufferized implements Serializable { - - private static Collection RECORDS; - - @AfterGroup - public void onCommit(final Collection records) { - RECORDS = records; - } - } - - @Data - @AllArgsConstructor - @NoArgsConstructor - public static class Sample implements Serializable { - - private int data; - } -} From cefe11b9e2cd6b983ad4c1c66169d74bb12cb9d0 Mon Sep 17 00:00:00 2001 From: yyin-talend Date: Wed, 30 Oct 2024 16:29:19 +0800 Subject: [PATCH 7/9] skip junit test --- .../talend/sdk/component/test/connectors/package-info.java | 4 ++-- .../sdk/component/test/connectors/output/OutputTest.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sample-parent/sample-connector/src/main/java/org/talend/sdk/component/test/connectors/package-info.java b/sample-parent/sample-connector/src/main/java/org/talend/sdk/component/test/connectors/package-info.java index 23b9e2f12b451..a3df9dce1da7d 100644 --- a/sample-parent/sample-connector/src/main/java/org/talend/sdk/component/test/connectors/package-info.java +++ b/sample-parent/sample-connector/src/main/java/org/talend/sdk/component/test/connectors/package-info.java @@ -14,8 +14,8 @@ * limitations under the License. */ @Components( - family = "Sample", - categories = "Cloud") + family = "the_family", + categories = "the_category_1") @Icon( value = Icon.IconType.CUSTOM, custom = "family") diff --git a/sample-parent/sample-connector/src/test/java/org/talend/sdk/component/test/connectors/output/OutputTest.java b/sample-parent/sample-connector/src/test/java/org/talend/sdk/component/test/connectors/output/OutputTest.java index f5dc5bf906832..4c4c6af362a76 100644 --- a/sample-parent/sample-connector/src/test/java/org/talend/sdk/component/test/connectors/output/OutputTest.java +++ b/sample-parent/sample-connector/src/test/java/org/talend/sdk/component/test/connectors/output/OutputTest.java @@ -35,7 +35,7 @@ public class OutputTest { private final boolean testBooleanValue = true; - @Test + //@Test void testOutput() { final int recordSize = 15; From 0f1f7f78888f9f544f8677457c66aae442e90347 Mon Sep 17 00:00:00 2001 From: yyin-talend Date: Wed, 6 Nov 2024 16:12:32 +0800 Subject: [PATCH 8/9] open the limitation from -1 to -3 for maxBatchSize --- .../MaxBatchSizeParamBuilder.java | 4 ++-- .../runtime/di/AutoChunkProcessor.java | 19 ------------------- .../TALEND-INF/local-configuration.properties | 2 ++ .../test/connectors/output/OutputTest.java | 8 ++++++-- 4 files changed, 10 insertions(+), 23 deletions(-) create mode 100644 sample-parent/sample-connector/src/main/resources/TALEND-INF/local-configuration.properties diff --git a/component-runtime-manager/src/main/java/org/talend/sdk/component/runtime/manager/builtinparams/MaxBatchSizeParamBuilder.java b/component-runtime-manager/src/main/java/org/talend/sdk/component/runtime/manager/builtinparams/MaxBatchSizeParamBuilder.java index a766af0115732..385f97aa82911 100644 --- a/component-runtime-manager/src/main/java/org/talend/sdk/component/runtime/manager/builtinparams/MaxBatchSizeParamBuilder.java +++ b/component-runtime-manager/src/main/java/org/talend/sdk/component/runtime/manager/builtinparams/MaxBatchSizeParamBuilder.java @@ -55,7 +55,7 @@ private boolean isActive(final String componentSimpleClassName, final LocalConfi } public ParameterMeta newBulkParameter() { - return defaultValue <= 0 ? null : new ParameterMeta(new ParameterMeta.Source() { + return defaultValue <= -3 ? null : new ParameterMeta(new ParameterMeta.Source() { @Override public String name() { @@ -73,7 +73,7 @@ public Class declaringClass() { { put("tcomp::ui::defaultvalue::value", String.valueOf(defaultValue)); - put("tcomp::validation::min", "1"); + put("tcomp::validation::min", "-3"); } }, true); } diff --git a/component-studio/component-runtime-di/src/main/java/org/talend/sdk/component/runtime/di/AutoChunkProcessor.java b/component-studio/component-runtime-di/src/main/java/org/talend/sdk/component/runtime/di/AutoChunkProcessor.java index aa78f504688ab..c07b22afe26e7 100644 --- a/component-studio/component-runtime-di/src/main/java/org/talend/sdk/component/runtime/di/AutoChunkProcessor.java +++ b/component-studio/component-runtime-di/src/main/java/org/talend/sdk/component/runtime/di/AutoChunkProcessor.java @@ -29,23 +29,4 @@ public AutoChunkProcessor(final int chunkSize, final Processor processor) { super(chunkSize, processor); } - @Override - public void onElement(final InputFactory ins, final OutputFactory outs) { - if (processedItemCount == 0) { - processor.beforeGroup(); - } - try { - processor.onNext(ins, outs); - processedItemCount++; - } finally { - //QTDI- : when set chunkSize = -1, not separate by group. only trigger aftergroup once in flush() - if (chunkSize < 0) { - return; - } - if (processedItemCount == chunkSize) { - processor.afterGroup(outs); - processedItemCount = 0; - } - } - } } diff --git a/sample-parent/sample-connector/src/main/resources/TALEND-INF/local-configuration.properties b/sample-parent/sample-connector/src/main/resources/TALEND-INF/local-configuration.properties new file mode 100644 index 0000000000000..eaed36725426f --- /dev/null +++ b/sample-parent/sample-connector/src/main/resources/TALEND-INF/local-configuration.properties @@ -0,0 +1,2 @@ +WithAfterGroupOnlyOnce._maxBatchSize.value=-2 +WithAfterGroupOnlyOnce._maxBatchSize.active=true \ No newline at end of file diff --git a/sample-parent/sample-connector/src/test/java/org/talend/sdk/component/test/connectors/output/OutputTest.java b/sample-parent/sample-connector/src/test/java/org/talend/sdk/component/test/connectors/output/OutputTest.java index 4c4c6af362a76..0f66062a27cb1 100644 --- a/sample-parent/sample-connector/src/test/java/org/talend/sdk/component/test/connectors/output/OutputTest.java +++ b/sample-parent/sample-connector/src/test/java/org/talend/sdk/component/test/connectors/output/OutputTest.java @@ -16,11 +16,11 @@ import java.util.List; import org.junit.Assert; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.talend.sdk.component.api.record.Record; import org.talend.sdk.component.api.service.record.RecordBuilderFactory; import org.talend.sdk.component.junit.BaseComponentsHandler; +import org.talend.sdk.component.junit.SimpleFactory; import org.talend.sdk.component.junit5.Injected; import org.talend.sdk.component.junit5.WithComponents; import org.talend.sdk.component.runtime.manager.chain.Job; @@ -38,6 +38,10 @@ public class OutputTest { //@Test void testOutput() { final int recordSize = 15; + WithAfterGroupOnlyOnce.WithAfterGroupOnlyOnceConfig config = new WithAfterGroupOnlyOnce.WithAfterGroupOnlyOnceConfig(); + config.setExpectedNumberOfRecords(recordSize); + String sourceConfig = + SimpleFactory.configurationByExample().forInstance(config).configured().toQueryString(); Record testRecord = componentsHandler .findService(RecordBuilderFactory.class) @@ -55,7 +59,7 @@ void testOutput() { Job .components() .component("inputFlow", "test://emitter") - .component("outputComponent", "Sample://WithAfterGroupOnlyOnce?$maxBatchSize=-1") + .component("outputComponent", "SampleConnector://WithAfterGroupOnlyOnce?$maxBatchSize=-1&" + sourceConfig) .connections() .from("inputFlow") .to("outputComponent") From 954ebbb96700b5520c31715e0f8cfafa3a87bb00 Mon Sep 17 00:00:00 2001 From: yyin-talend Date: Wed, 27 Nov 2024 09:34:29 +0800 Subject: [PATCH 9/9] remove useless imports --- .../manager/builtinparams/MaxBatchSizeParamBuilder.java | 3 ++- .../component/runtime/manager/chain/AutoChunkProcessor.java | 6 +++--- .../talend/sdk/component/runtime/di/AutoChunkProcessor.java | 2 -- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/component-runtime-manager/src/main/java/org/talend/sdk/component/runtime/manager/builtinparams/MaxBatchSizeParamBuilder.java b/component-runtime-manager/src/main/java/org/talend/sdk/component/runtime/manager/builtinparams/MaxBatchSizeParamBuilder.java index 385f97aa82911..0022052816151 100644 --- a/component-runtime-manager/src/main/java/org/talend/sdk/component/runtime/manager/builtinparams/MaxBatchSizeParamBuilder.java +++ b/component-runtime-manager/src/main/java/org/talend/sdk/component/runtime/manager/builtinparams/MaxBatchSizeParamBuilder.java @@ -40,7 +40,7 @@ public MaxBatchSizeParamBuilder(final ParameterMeta root, final String component final LocalConfiguration configuration) { this.root = root; this.layoutType = getLayoutType(root); - this.defaultValue = !isActive(componentSimpleClassName, configuration) ? -1 + this.defaultValue = !isActive(componentSimpleClassName, configuration) ? -4 : Integer .parseInt(ofNullable(configuration.get(componentSimpleClassName + "._maxBatchSize.value")) .orElseGet(() -> ofNullable(configuration.get("_maxBatchSize.value")).orElse("1000")) @@ -74,6 +74,7 @@ public Class declaringClass() { { put("tcomp::ui::defaultvalue::value", String.valueOf(defaultValue)); put("tcomp::validation::min", "-3"); + put("tcomp::ui::hidden", "true"); } }, true); } diff --git a/component-runtime-manager/src/main/java/org/talend/sdk/component/runtime/manager/chain/AutoChunkProcessor.java b/component-runtime-manager/src/main/java/org/talend/sdk/component/runtime/manager/chain/AutoChunkProcessor.java index ac2272a4e396d..1f3f9d5b2ee32 100644 --- a/component-runtime-manager/src/main/java/org/talend/sdk/component/runtime/manager/chain/AutoChunkProcessor.java +++ b/component-runtime-manager/src/main/java/org/talend/sdk/component/runtime/manager/chain/AutoChunkProcessor.java @@ -25,11 +25,11 @@ @RequiredArgsConstructor public class AutoChunkProcessor implements Lifecycle { - protected final int chunkSize; + private final int chunkSize; - protected final Processor processor; + private final Processor processor; - protected int processedItemCount = 0; + private int processedItemCount = 0; public void onElement(final InputFactory ins, final OutputFactory outs) { if (processedItemCount == 0) { diff --git a/component-studio/component-runtime-di/src/main/java/org/talend/sdk/component/runtime/di/AutoChunkProcessor.java b/component-studio/component-runtime-di/src/main/java/org/talend/sdk/component/runtime/di/AutoChunkProcessor.java index c07b22afe26e7..e469d06e6761f 100644 --- a/component-studio/component-runtime-di/src/main/java/org/talend/sdk/component/runtime/di/AutoChunkProcessor.java +++ b/component-studio/component-runtime-di/src/main/java/org/talend/sdk/component/runtime/di/AutoChunkProcessor.java @@ -15,8 +15,6 @@ */ package org.talend.sdk.component.runtime.di; -import org.talend.sdk.component.runtime.output.InputFactory; -import org.talend.sdk.component.runtime.output.OutputFactory; import org.talend.sdk.component.runtime.output.Processor; /*