Skip to content

Commit

Permalink
Merge pull request #559 from metafacture/495-objectSleep
Browse files Browse the repository at this point in the history
  • Loading branch information
TobiasNx authored Nov 28, 2024
2 parents 05c8dae + 74bc15d commit 4ab2474
Show file tree
Hide file tree
Showing 3 changed files with 225 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright 2024 hbz
*
* Licensed under the Apache License, Version 2.0 the "License";
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.metafacture.flowcontrol;

import org.metafacture.framework.FluxCommand;
import org.metafacture.framework.MetafactureException;
import org.metafacture.framework.ObjectReceiver;
import org.metafacture.framework.annotations.Description;
import org.metafacture.framework.annotations.In;
import org.metafacture.framework.annotations.Out;
import org.metafacture.framework.helpers.DefaultObjectPipe;

import java.util.concurrent.TimeUnit;

/**
* Lets the process sleep for a specific amount of time between objects.
*
* @param <T> object type
* @author Tobias Bülte
*/
@Description("Lets the process sleep for a specific amount of time between objects.")
@In(Object.class)
@Out(Object.class)
@FluxCommand("sleep")
public final class ObjectSleeper<T> extends DefaultObjectPipe<T, ObjectReceiver<T>> {

public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
public static final long DEFAULT_SLEEP_TIME = 1000;

private static final String TIME_UNIT_SUFFIX = "S";

private TimeUnit timeUnit = DEFAULT_TIME_UNIT;
private long sleepTime = DEFAULT_SLEEP_TIME;

/**
* Creates an instance of {@link ObjectSleeper}.
*/
public ObjectSleeper() {
}

/**
* Sets the amount of time for the sleep phase (measured in {@link
* #setTimeUnit time unit}).
*
* @param sleepTime the time to sleep
*/
public void setSleepTime(final int sleepTime) {
// NOTE: ConfigurableClass.convertValue() doesn't support long.
setSleepTime((long) sleepTime);
}

/**
* Sets the amount of time for the sleep phase (measured in {@link
* #setTimeUnit time unit}).
*
* @param sleepTime the time to sleep
*/
public void setSleepTime(final long sleepTime) {
this.sleepTime = sleepTime;
}

/**
* Gets the amount of time for the sleep phase (measured in {@link
* #setTimeUnit time unit}).
*
* @return the time to sleep
*/
public long getSleepTime() {
return sleepTime;
}

/**
* Sets the time unit for the sleep phase. See {@link TimeUnit available
* time units}, case-insensitive, trailing "s" optional.
*
* @param timeUnit the time unit
*/
public void setTimeUnit(final String timeUnit) {
// NOTE: Adds NANOSECONDS and DAYS over Catmandu's supported time units.

final String timeUnitName = timeUnit.toUpperCase();
final String timeUnitSuffix = timeUnitName.endsWith(TIME_UNIT_SUFFIX) ? "" : TIME_UNIT_SUFFIX;

setTimeUnit(TimeUnit.valueOf(timeUnitName + timeUnitSuffix));
}

/**
* Sets the time unit for the sleep phase.
*
* @param timeUnit the time unit
*/
public void setTimeUnit(final TimeUnit timeUnit) {
this.timeUnit = timeUnit;
}

/**
* Gets the time unit for the sleep phase.
*
* @return the time unit
*/
public TimeUnit getTimeUnit() {
return timeUnit;
}

/**
* Sleeps for the specified amount of time.
*/
public void sleep() {
try {
timeUnit.sleep(sleepTime);
}
catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new MetafactureException(e.getMessage(), e);
}
}

@Override
public void process(final T obj) {
sleep();
getReceiver().process(obj);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ reset-object-batch org.metafacture.flowcontrol.ObjectBatchResetter
defer-stream org.metafacture.flowcontrol.StreamDeferrer
catch-stream-exception org.metafacture.flowcontrol.StreamExceptionCatcher
thread-object-tee org.metafacture.flowcontrol.ObjectThreader
sleep org.metafacture.flowcontrol.ObjectSleeper
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2024 Tobias Bülte, hbz
*
* Licensed under the Apache License, Version 2.0 the "License";
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.metafacture.flowcontrol;

import org.metafacture.framework.ObjectReceiver;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import java.util.function.Consumer;

/**
* Tests for class {@link ObjectSleeper}.
*
* @author Tobias Bülte
*/
public final class ObjectSleeperTest {

private static final int PROCESS_OVERHEAD_MILLISECONDS = 100;

private static final int MILLISECONDS_PER_SECOND = 1_000;
private static final int NANOSECONDS_PER_MILLISECOND = 1_000_000;

@Mock
private ObjectReceiver<String> receiver;

@Before
public void setup() {
MockitoAnnotations.initMocks(this);
}

@Test
public void shouldTestIfClockedTimeExceedsDuration() {
final int sleepTime = 1234;
assertSleep(sleepTime, s -> s.setSleepTime(sleepTime));
}

@Test
public void shouldTestIfClockedTimeExceedsDurationInMilliseconds() {
final int sleepTime = 567;
assertSleep(sleepTime, s -> {
s.setSleepTime(sleepTime);
s.setTimeUnit("MILLISECONDS");
});
}

@Test
public void shouldTestIfClockedTimeExceedsDurationInSeconds() {
final int sleepTime = 1;
assertSleep(sleepTime * MILLISECONDS_PER_SECOND, s -> {
s.setSleepTime(sleepTime);
s.setTimeUnit("SECOND");
});
}

private void assertSleep(final long expectedMillis, final Consumer<ObjectSleeper> consumer) {
final ObjectSleeper<String> objectSleeper = new ObjectSleeper<>();
objectSleeper.setReceiver(receiver);
consumer.accept(objectSleeper);

final long startTime = System.nanoTime();
objectSleeper.process(null);
final long actualMillis = (System.nanoTime() - startTime) / NANOSECONDS_PER_MILLISECOND;

Assert.assertTrue("sleep time too short: " + actualMillis, actualMillis >= expectedMillis);
Assert.assertTrue("sleep time too long: " + actualMillis, actualMillis < expectedMillis + PROCESS_OVERHEAD_MILLISECONDS);
}

}

0 comments on commit 4ab2474

Please sign in to comment.