Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spring-batch-dynamic-composite #42

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions spring-batch-dynamic-composite/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/target/
.classpath
.project
.settings
19 changes: 19 additions & 0 deletions spring-batch-dynamic-composite/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# spring-batch-dynamic-composite

This project is all about spring-batch' composite processor and writer. Currently, spring-batch' built-in composite processor and writer are not dynamic. Meaning, one set them in design time (and coding) and it cannot be changed during runtime. But there are cases that this is needed.

Without loss of generality, we say here "processor" but mean "reader" and "writer" as well.

The idea is to have the ability to replace a processor(s) at runtime. For example, in case of multiple processors (AKA composite-processor), to have the option to add/remove/replace/change-order of processors.

In the implementation, there is `DynamicCompositeItemProcessor` (which is a real `ItemProcessor`) that uses a manager to read the list of processors bean-names from the DB. Thus, the processors list can be modified and reloaded.

As mentioned, same for reader as well as for writer.

## why do we need this?

There are cases that processors are used as "filters", and it may occur that the business (the client) may change the requirements (yes, it is very annoying) and ask to switch among filters (change the priority).

Other use case is having multiple readers, reading the data from different data warehouses, and again - the client changes the warehouse from time to time (integration phase), and I do not want my app to be restarted each and every time.

There are many other use cases, of course.
77 changes: 77 additions & 0 deletions spring-batch-dynamic-composite/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>
<groupId>org.springframewor.batch</groupId>
<artifactId>spring-batch-dynamic-composite</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>


<properties>
<spring.version>4.2.3.RELEASE</spring.version>
<spring-batch.version>3.0.5.RELEASE</spring-batch.version>
</properties>

<dependencies>

<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring.version}</version>
</dependency>

<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>${spring-batch.version}</version>
</dependency>


<!-- we use the DB for the "persistent remember me -->
<!-- +++++++++++++++ DB: +++++++++++++++ -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.16</version>
</dependency>

</dependencies>


<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<version>2.5.1</version>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>

<!-- attach the source to deploymant (.m2), so it can be seen within user-app upon debugging -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>

</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.ohadr.spring_batch_dynamic_composite.core;

public enum BatchBeanTypeEnum
{
READER, PROCESSOR, WRITER
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.ohadr.spring_batch_dynamic_composite.core;

import java.io.Serializable;
import org.springframework.batch.core.Entity;

public class CompositeBatchBeanEntity
extends Entity
implements Comparable<CompositeBatchBeanEntity>, Serializable
{
private static final long serialVersionUID = 6582817624632934459L;

private String name;

private Integer priority;

private String taskName;

private BatchBeanTypeEnum batchBeanType;


public CompositeBatchBeanEntity(Long id,
String name, Integer priority, String taskName, BatchBeanTypeEnum batchBeanType)
{
super(id);
this.name = name;
this.priority = priority;
this.taskName = taskName;
this.batchBeanType = batchBeanType;
}

public String getName()
{
return name;
}

public void setName(String name)
{
this.name = name;
}

public Integer getPriority()
{
return priority;
}

public void setPriority(Integer priority)
{
this.priority = priority;
}

public String getTaskName()
{
return taskName;
}

public void setTaskName(String taskName)
{
this.taskName = taskName;
}

public BatchBeanTypeEnum getBatchBeanType()
{
return batchBeanType;
}

public void setBatchBeanType(BatchBeanTypeEnum batchBeanType)
{
this.batchBeanType = batchBeanType;
}

@Override
public int compareTo(CompositeBatchBeanEntity compositeProcessorEntity)
{
// descending order
return compositeProcessorEntity.getPriority() - this.getPriority();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.ohadr.spring_batch_dynamic_composite.core;

import java.util.List;
import java.util.Set;

public interface CompositeBatchBeanManager
{

List<CompositeBatchBeanEntity> getBatchBeanList(String task, BatchBeanTypeEnum batchBeanType);

Set<String> getAllTaskNames();

Set<String> getAllValuesOfBeanTypes();

void deleteBatchBean(Long compositeBatchBeanId);

CompositeBatchBeanEntity getBatchBean(Long compositeBatchBeanId);

void addBatchBean(CompositeBatchBeanEntity processor);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/**
* Copyright (c) 2016, William Hill Online. All rights reserved
*/
package com.ohadr.spring_batch_dynamic_composite.core;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
//import org.springframework.cache.annotation.Cacheable;
import org.springframework.util.StringUtils;

import com.ohadr.spring_batch_dynamic_composite.repository.CompositeBatchBeanDao;


@Component
public class CompositeBatchBeanManagerImpl implements CompositeBatchBeanManager
{
private static Logger log = Logger.getLogger(CompositeBatchBeanManagerImpl.class);
/**
* UID for CompositeBatchBeanManager
*/
public final static String UID = "compositeBatchBeanManager";

@Autowired
private CompositeBatchBeanDao compositeBatchBeanDao;



@Override
// @Cacheable(value = "compositeProcessorCache", key = "#key1+#batchBeanType+#key2")
public List<CompositeBatchBeanEntity> getBatchBeanList(String taskName, BatchBeanTypeEnum batchBeanType)
{
if (StringUtils.isEmpty(taskName))
{
throw new IllegalArgumentException("taskName is null.");
}

if (batchBeanType == null)
{
throw new IllegalArgumentException("batchBeanType is null.");
}

List<CompositeBatchBeanEntity> batchBeanEntities = compositeBatchBeanDao.getCompositeBatchBeans(taskName, batchBeanType);

if (batchBeanEntities == null)
{
return new ArrayList<>();
}

Collections.sort(batchBeanEntities);

log.info("beans list:" + batchBeanEntities);

return batchBeanEntities;
}


@Override
public void addBatchBean(CompositeBatchBeanEntity processor)
{
if (processor == null)
{
throw new IllegalArgumentException("processor is null.");
}

compositeBatchBeanDao.update(processor);
}

@Override
public void deleteBatchBean(Long compositeBatchBeanId)
{
compositeBatchBeanDao.delete(compositeBatchBeanId);
}

@Override
public CompositeBatchBeanEntity getBatchBean(Long compositeBatchBeanId)
{
return compositeBatchBeanDao.get(compositeBatchBeanId);
}


@Override
public Set<String> getAllTaskNames()
{
return compositeBatchBeanDao.getAllTaskNames();
}

@Override
public Set<String> getAllValuesOfBeanTypes()
{
return compositeBatchBeanDao.getAllValuesOfBeanTypes();
}

public void setTablePrefix(String tablePrefix)
{
compositeBatchBeanDao.setTablePrefix(tablePrefix);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.ohadr.spring_batch_dynamic_composite.item;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.util.StringUtils;
import com.ohadr.spring_batch_dynamic_composite.core.CompositeBatchBeanManager;

public abstract class AbstractDynamicCompositeItem
implements
StepExecutionListener,
ApplicationContextAware,
InitializingBean
{
protected String taskName;

@Autowired
protected CompositeBatchBeanManager compositeBatchBeanManager;

// if false, filter result is false if any filter is defined
protected Boolean acceptEmptyFiltersList = false;

protected ApplicationContext applicationContext;

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
{
this.applicationContext = applicationContext;
}

//hide super's implementation
@Override
public void afterPropertiesSet() throws Exception
{
}

@Override
public ExitStatus afterStep(StepExecution stepExecution)
{
return stepExecution != null ? stepExecution.getExitStatus() : null;
}

/**
* get items list, which means processors, readers or writers. each implemetation should
* set the list that is read from DB to the "delegates" member (e.g. CompositeItemProcessor.delegates)
*/
protected abstract void getItemsList();

@Override
public void beforeStep(StepExecution stepExecution)
{
taskName = stepExecution.getJobExecution().getJobInstance().getJobName();

if (StringUtils.isEmpty(taskName))
{
String message = getClass().getSimpleName() + " beforeStep: taskName is null or empty.";
// Log.error(message);
throw new RuntimeException(message);
}

//get processors list from DB
getItemsList();
}
}
Loading