Skip to content

Commit

Permalink
Add DeploymentMessage and enable per-node task iteration in WFs
Browse files Browse the repository at this point in the history
- Workflows have been improved using DeploymentMessage as the main
DeploymentService interface method's parameter in order to allow for
greater customization of the Deployment (i.e. chosen CloudProvider,
OneData settings, etc).

- Workflows now support Deploy/Poll/Undeploy task iteration to allow the
underlying command to work on a subset of TOSCA nodes (i.e. creating one
Job at a time to avoid transaction timeout).

Related to #48, #51, #53, #55
Fixes #44
  • Loading branch information
lorenzo-biava committed Jun 8, 2016
1 parent b1ee9c2 commit 0f6cd44
Show file tree
Hide file tree
Showing 28 changed files with 1,255 additions and 725 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public int getExecutorServiceThreadPoolSize() {

@Override
public int getExecutorServiceInterval() {
return 3;
return 1;
}

}
6 changes: 6 additions & 0 deletions src/main/java/it/reply/orchestrator/dal/entity/Resource.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ public Resource() {
super();
}

public Resource(String toscaNodeName) {
super();
this.toscaNodeName = toscaNodeName;
state = NodeStates.INITIAL;
}

public NodeStates getState() {
return state;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package it.reply.orchestrator.dto.deployment;

import it.reply.orchestrator.dal.entity.Deployment;
import it.reply.orchestrator.dal.entity.Resource;
import it.reply.orchestrator.enums.DeploymentProvider;
import it.reply.orchestrator.service.deployment.providers.ChronosServiceImpl.IndigoJob;

import java.io.Serializable;
import java.util.List;
import java.util.Map;

/**
* A message containing all the information needed during Deployment WF.
*
* @author l.biava
*
*/
public class DeploymentMessage implements Serializable {

private static final long serialVersionUID = 8003907220093782923L;

/**
* The internal deployment representation (stored in the DB).
*/
private Deployment deployment;

private String deploymentId;

private DeploymentProvider deploymentProvider;

private TemplateTopologicalOrderIterator templateTopologicalOrderIterator;

private boolean createComplete;
private boolean deleteComplete;
private boolean pollComplete;
private boolean skipPollInterval;

/**
* TEMPORARY Chronos Job Graph (to avoid regenerating the template representation each time).
*/
private Map<String, IndigoJob> chronosJobGraph;

public Deployment getDeployment() {
return deployment;
}

public void setDeployment(Deployment deployment) {
this.deployment = deployment;
}

public String getDeploymentId() {
return deploymentId;
}

public void setDeploymentId(String deploymentId) {
this.deploymentId = deploymentId;
}

public DeploymentProvider getDeploymentProvider() {
return deploymentProvider;
}

public void setDeploymentProvider(DeploymentProvider deploymentProvider) {
this.deploymentProvider = deploymentProvider;
}

public TemplateTopologicalOrderIterator getTemplateTopologicalOrderIterator() {
return templateTopologicalOrderIterator;
}

public void setTemplateTopologicalOrderIterator(
TemplateTopologicalOrderIterator templateTopologicalOrderIterator) {
this.templateTopologicalOrderIterator = templateTopologicalOrderIterator;
}

public Map<String, IndigoJob> getChronosJobGraph() {
return chronosJobGraph;
}

public void setChronosJobGraph(Map<String, IndigoJob> chronosJobGraph) {
this.chronosJobGraph = chronosJobGraph;
}

public boolean isCreateComplete() {
return createComplete;
}

public void setCreateComplete(boolean createComplete) {
this.createComplete = createComplete;
}

public boolean isDeleteComplete() {
return deleteComplete;
}

public void setDeleteComplete(boolean deleteComplete) {
this.deleteComplete = deleteComplete;
}

public boolean isPollComplete() {
return pollComplete;
}

public void setPollComplete(boolean pollComplete) {
this.pollComplete = pollComplete;
}

public boolean isSkipPollInterval() {
return skipPollInterval;
}

public void setSkipPollInterval(boolean skipPollInterval) {
this.skipPollInterval = skipPollInterval;
}

@Override
public String toString() {
return "DeploymentMessage [deploymentId=" + deploymentId + ", deploymentProvider="
+ deploymentProvider + ", templateTopologicalOrderIterator="
+ templateTopologicalOrderIterator + ", createComplete=" + createComplete
+ ", pollComplete=" + pollComplete + "]";
}

/**
* Class to contain template's nodes in topological order and to allow to iterate on the list.
*
* @author l.biava
*
*/
public static class TemplateTopologicalOrderIterator implements Serializable {

private static final long serialVersionUID = 1557615023166610397L;

/**
* Template's nodes, topologically ordered.
*/
List<Resource> topologicalOrder;

int position = 0;

public TemplateTopologicalOrderIterator(List<Resource> topologicalOrder) {
this.topologicalOrder = topologicalOrder;
}

public int getPosition() {
return position;
}

public List<Resource> getTopologicalOrder() {
return topologicalOrder;
}

public int getNodeSize() {
return topologicalOrder.size();
}

public synchronized boolean hasNext() {
return topologicalOrder.size() - 1 > position;
}

/**
* Get the node in the current position of the iterator. <br/>
* <b>Note that the first time this method is called it returns the first element of the list,
* or <tt>null</tt> if the list is empty</b>
*
* @return the current node, or <tt>null</tt> if the list is empty.
*/
public synchronized Resource getCurrent() {
if (position >= topologicalOrder.size()) {
return null;
}
return topologicalOrder.get(position);
}

/**
* Get the next element of the collection (after incrementing the position pointer).
*
* @return the next node, or <tt>null</tt> if there aren't any others.
*/
public synchronized Resource getNext() {
if (!hasNext()) {
position++;
return null;
}
position++;
return topologicalOrder.get(position);
}

public synchronized void reset() {
position = 0;
}

@Override
public String toString() {
return "TemplateTopologicalOrderStatus [topologicalOrder=" + topologicalOrder + ", position="
+ position + "]";
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import it.reply.orchestrator.dal.entity.WorkflowReference;
import it.reply.orchestrator.dal.repository.DeploymentRepository;
import it.reply.orchestrator.dal.repository.ResourceRepository;
import it.reply.orchestrator.dto.deployment.DeploymentMessage;
import it.reply.orchestrator.dto.request.DeploymentRequest;
import it.reply.orchestrator.enums.DeploymentProvider;
import it.reply.orchestrator.enums.NodeStates;
Expand All @@ -35,7 +36,6 @@

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

@Service
Expand Down Expand Up @@ -116,6 +116,13 @@ public Deployment createDeployment(DeploymentRequest request) {
params.put(WorkflowConstants.WF_PARAM_DEPLOYMENT_TYPE,
(isChronosDeployment ? DEPLOYMENT_TYPE_CHRONOS : DEPLOYMENT_TYPE_TOSCA));

// Build deployment message
DeploymentMessage deploymentMessage = new DeploymentMessage();
deploymentMessage.setDeploymentId(deployment.getId());
deploymentMessage.setDeploymentProvider(
(isChronosDeployment ? DeploymentProvider.CHRONOS : DeploymentProvider.IM));
params.put(WorkflowConstants.WF_PARAM_DEPLOYMENT_MESSAGE, deploymentMessage);

ProcessInstance pi = null;
try {
pi = wfService.startProcess(WorkflowConfigProducerBean.DEPLOY.getProcessId(), params,
Expand Down Expand Up @@ -166,12 +173,12 @@ public void deleteDeployment(String uuid) {
deployment = deploymentRepository.save(deployment);

// Abort all WF currently active on this deployment
Iterator<WorkflowReference> wrIt = deployment.getWorkflowReferences().iterator();
while (wrIt.hasNext()) {
WorkflowReference wr = wrIt.next();
wfService.abortProcess(wr.getProcessId(), wr.getRuntimeStrategy());
wrIt.remove();
}
// Iterator<WorkflowReference> wrIt = deployment.getWorkflowReferences().iterator();
// while (wrIt.hasNext()) {
// WorkflowReference wr = wrIt.next();
// wfService.abortProcess(wr.getProcessId(), wr.getRuntimeStrategy());
// wrIt.remove();
// }

Map<String, Object> params = new HashMap<>();
params.put("DEPLOYMENT_ID", deployment.getId());
Expand All @@ -180,15 +187,21 @@ public void deleteDeployment(String uuid) {
params.put(WorkflowConstants.WF_PARAM_DEPLOYMENT_TYPE,
deployment.getDeploymentProvider().name());

ProcessInstance pi = null;
try {
pi = wfService.startProcess(WorkflowConfigProducerBean.UNDEPLOY.getProcessId(), params,
RUNTIME_STRATEGY.PER_PROCESS_INSTANCE);
} catch (WorkflowException ex) {
throw new OrchestratorException(ex);
}
// Build deployment message
DeploymentMessage deploymentMessage = new DeploymentMessage();
deploymentMessage.setDeploymentId(deployment.getId());
deploymentMessage.setDeploymentProvider(deployment.getDeploymentProvider());
params.put(WorkflowConstants.WF_PARAM_DEPLOYMENT_MESSAGE, deploymentMessage);

// ProcessInstance pi = null;
// try {
// pi = wfService.startProcess(WorkflowConfigProducerBean.UNDEPLOY.getProcessId(), params,
// RUNTIME_STRATEGY.PER_PROCESS_INSTANCE);
// } catch (WorkflowException ex) {
// throw new OrchestratorException(ex);
// }
deployment.addWorkflowReferences(
new WorkflowReference(pi.getId(), RUNTIME_STRATEGY.PER_PROCESS_INSTANCE));
new WorkflowReference(998, RUNTIME_STRATEGY.PER_PROCESS_INSTANCE));
deployment = deploymentRepository.save(deployment);
}
} else {
Expand Down Expand Up @@ -223,9 +236,20 @@ public void updateDeployment(String id, DeploymentRequest request) {

deployment = deploymentRepository.save(deployment);

// !! WARNING !! That's an hack to avoid an obscure NonUniqueObjetException on the new
// WorkflowReference created after the WF start
deployment.getWorkflowReferences().size();

Map<String, Object> params = new HashMap<>();
params.put("DEPLOYMENT_ID", deployment.getId());
params.put("TOSCA_TEMPLATE", request.getTemplate());

// Build deployment message
DeploymentMessage deploymentMessage = new DeploymentMessage();
deploymentMessage.setDeploymentId(deployment.getId());
deploymentMessage.setDeploymentProvider(deployment.getDeploymentProvider());
params.put(WorkflowConstants.WF_PARAM_DEPLOYMENT_MESSAGE, deploymentMessage);

ProcessInstance pi = null;
try {
pi = wfService.startProcess(WorkflowConfigProducerBean.UPDATE.getProcessId(), params,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ public Map<String, NodeTemplate> getCountNodes(ArchiveRoot archiveRoot) {
ScalarPropertyValue scalarPropertyValue =
(ScalarPropertyValue) scalable.getProperties().get("count");
// Check if this value is read from the template and is not a default value
if (scalarPropertyValue.isPrintable()) {
if (scalarPropertyValue != null && scalarPropertyValue.isPrintable()) {
nodes.put(entry.getKey(), entry.getValue());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@
public class WorkflowConstants {
public static final String WF_PARAM_DEPLOYMENT_ID = "DEPLOYMENT_ID";
public static final String WF_PARAM_DEPLOYMENT_TYPE = "DEPLOYMENT_TYPE";
public static final String WF_PARAM_DEPLOYMENT_MESSAGE = "DeploymentMessage";
public static final String WF_PARAM_RANK_CLOUD_PROVIDERS_MESSAGE = "RankCloudProvidersMessage";

}
Loading

0 comments on commit 0f6cd44

Please sign in to comment.