Skip to content
This repository has been archived by the owner on Nov 19, 2021. It is now read-only.

Commit

Permalink
Merge pull request #4 from sgirardin/improvedMultithreading
Browse files Browse the repository at this point in the history
multithreading improvements
  • Loading branch information
sgirardin authored Jan 20, 2018
2 parents ec0fc64 + 0ab2303 commit c38823b
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ public void execute(WebScriptRequest req, WebScriptResponse res) throws IOExcept
boolean useNodeCache = false;
int nbOfThreads = 1;
int exportChunkSize = 10;
int nbOfTasks = 2;


if (req.getParameter("ignoreExported") != null)
{
if(req.getParameter("ignoreExported").equals("true"))
Expand Down Expand Up @@ -134,18 +133,10 @@ public void execute(WebScriptRequest req, WebScriptResponse res) throws IOExcept
}
}

if (req.getParameter("nbOfTasks") != null)
{
if(StringUtils.isNumeric(req.getParameter("nbOfTasks")))
{
nbOfTasks = (int)Integer.parseInt(req.getParameter("nbOfTasks"));
}
}

//init variables
dao = new AlfrescoExportDaoImpl(this.serviceRegistry);
fileFolder = new FileFolder(res, base, scapeExported);
engine = new Engine(dao, fileFolder, exportVersions, revisionHead, useNodeCache, nbOfThreads, exportChunkSize, nbOfTasks);
engine = new Engine(dao, fileFolder, exportVersions, revisionHead, useNodeCache, nbOfThreads, exportChunkSize);

NodeRef nf = null;

Expand Down Expand Up @@ -185,7 +176,6 @@ public void execute(WebScriptRequest req, WebScriptResponse res) throws IOExcept
res.getWriter().write(" export versions : " + exportVersions + "\n");
res.getWriter().write(" bulk import revision scheme: " + !revisionHead + "\n");
res.getWriter().write(" Nb. of threads: " + nbOfThreads + "\n");
res.getWriter().write(" Nb. of Tasks: " + nbOfTasks + "\n");
res.getWriter().write(" Chunk size: " + exportChunkSize + "\n");

long duration = timer.elapsedTime();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
/**
* This file is part of Alfresco Bulk Export Tool.
*
* Alfresco Bulk Export Tool is free software: you can redistribute it
* and/or modify it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* Alfresco Bulk Export Tool is distributed in the hope that it will be
* useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
* Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with Alfresco Bulk Export Tool. If not, see <http://www.gnu.org/licenses/>.
* This file is part of Alfresco Bulk Export Tool.
* <p>
* Alfresco Bulk Export Tool is free software: you can redistribute it
* and/or modify it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
* <p>
* Alfresco Bulk Export Tool is distributed in the hope that it will be
* useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
* Public License for more details.
* <p>
* You should have received a copy of the GNU General Public License along
* with Alfresco Bulk Export Tool. If not, see <http://www.gnu.org/licenses/>.
*/
package org.alfresco.extensions.bulkexport.controler;

Expand All @@ -25,7 +25,9 @@
import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


/**
Expand All @@ -34,8 +36,7 @@
* @author Denys G. Santos ([email protected])
* @version 1.0.1
*/
public class Engine
{
public class Engine {
Log log = LogFactory.getLog(Engine.class);

/** Data Access Object */
Expand All @@ -61,40 +62,33 @@ public class Engine
/** How many Nodes are exported per process*/
private int exportChunkSize;

/** Nb of Tasks for the thread pool */
private int nbOfTasks;

/**
* Engine Default Builder
*
* @param dao Data Access Object
* @param fileFolder File and Folder magager
*/
public Engine(AlfrescoExportDao dao, FileFolder fileFolder, boolean exportVersions, boolean revisionHead, boolean useNodeCache, int nbOfThreads, int exportChunkSize, int nbOfTasks)
{
this.dao = dao;
public Engine(AlfrescoExportDao dao, FileFolder fileFolder, boolean exportVersions, boolean revisionHead, boolean useNodeCache, int nbOfThreads, int exportChunkSize) {
this.dao = dao;
this.fileFolder = fileFolder;
this.exportVersions = exportVersions;
this.revisionHead = revisionHead;
this.useNodeCache = useNodeCache;
this.nbOfThreads = nbOfThreads;
this.exportChunkSize = exportChunkSize;
this.nbOfTasks = nbOfTasks;
}

/**
* Recursive method to export alfresco nodes to file system
*
* @param nodeRef
*/
public void execute(NodeRef nodeRef) throws Exception
{
public void execute(NodeRef nodeRef) throws Exception {
// case node is folder create a folder and execute recursively
// other else create file
log.debug("execute (noderef)");

if(!this.dao.isNodeIgnored(nodeRef.toString()))
{
if (!this.dao.isNodeIgnored(nodeRef.toString())) {
log.info("Find all nodes to export (no history)");
List<NodeRef> allNodes = getNodesToExport(nodeRef);
log.info("Nodes to export = " + allNodes.size());
Expand All @@ -103,61 +97,49 @@ public void execute(NodeRef nodeRef) throws Exception
log.debug("execute (noderef) finished");
}

private List<NodeRef> getNodesToExport(NodeRef rootNode) throws Exception
{
private List<NodeRef> getNodesToExport(NodeRef rootNode) throws Exception {
List<NodeRef> nodes = null;
if (useNodeCache)
{
if (useNodeCache) {
nodes = retrieveNodeListFromCache(rootNode);
}

if (nodes == null)
{
if (nodes == null) {
nodes = findAllNodes(rootNode);
storeNodeListToCache(rootNode, nodes);
if (useNodeCache)
{
if (useNodeCache) {
log.info("Generated Cached Node list");
throw new CacheGeneratedException("Generated Cached Node List Only");
}
}
else
{
} else {
log.info("Using Cached Node list");
}

return nodes;
}

private String nodeFileName(NodeRef rootNode)
{
private String nodeFileName(NodeRef rootNode) {
File fname = new File(fileFolder.basePath(), rootNode.getId() + ".cache");
return fname.getPath();
}

private void storeNodeListToCache(NodeRef rootNode, List<NodeRef> list) throws Exception
{
private void storeNodeListToCache(NodeRef rootNode, List<NodeRef> list) throws Exception {
// get a better name
FileOutputStream fos= new FileOutputStream(nodeFileName(rootNode));
ObjectOutputStream oos= new ObjectOutputStream(fos);
FileOutputStream fos = new FileOutputStream(nodeFileName(rootNode));
ObjectOutputStream oos = new ObjectOutputStream(fos);
oos.writeObject(list);
oos.close();
fos.close();
}

private List<NodeRef> retrieveNodeListFromCache(NodeRef rootNode) throws Exception
{
private List<NodeRef> retrieveNodeListFromCache(NodeRef rootNode) throws Exception {
List<NodeRef> list = null;

try
{
try {
FileInputStream fis = new FileInputStream(nodeFileName(rootNode));
ObjectInputStream ois = new ObjectInputStream(fis);
list = (List<NodeRef>) ois.readObject();
ois.close();
}
catch (FileNotFoundException e)
{
} catch (FileNotFoundException e) {
// this exception means we have no noelist cache - we just ignore and continue
log.debug("could not open nodelist cache file");
}
Expand All @@ -169,29 +151,23 @@ private List<NodeRef> retrieveNodeListFromCache(NodeRef rootNode) throws Excepti
*
* @param nodeRef
*/
private List<NodeRef> findAllNodes(NodeRef nodeRef) throws Exception
{
private List<NodeRef> findAllNodes(NodeRef nodeRef) throws Exception {
List<NodeRef> nodes = new ArrayList<NodeRef>();

log.debug("findAllNodes (noderef)");
try{
if(!this.dao.isNodeIgnored(nodeRef.toString()))
{
if(this.dao.isFolder(nodeRef))
{
try {
if (!this.dao.isNodeIgnored(nodeRef.toString())) {
if (this.dao.isFolder(nodeRef)) {
nodes.add(nodeRef); // add folder as well
List<NodeRef> children= this.dao.getChildren(nodeRef);
for (NodeRef child : children)
{
List<NodeRef> children = this.dao.getChildren(nodeRef);
for (NodeRef child : children) {
nodes.addAll(this.findAllNodes(child));
}
}
else
{
} else {
nodes.add(nodeRef);
}
}
}catch (Throwable e){
} catch (Throwable e) {
e.printStackTrace();
log.info("Error Multithreading", e);
throw e;
Expand All @@ -201,47 +177,43 @@ private List<NodeRef> findAllNodes(NodeRef nodeRef) throws Exception
}



/**
* Creates Thread Pool and Tasks with dispatch nodes
*
* @param nodesToExport
*/
private void exportNodes(final List<NodeRef> nodesToExport) throws InterruptedException, ExecutionException
{
private void exportNodes(final List<NodeRef> nodesToExport) throws InterruptedException, ExecutionException {
ExecutorService threadPool = Executors.newFixedThreadPool(nbOfThreads);
CompletionService<String> pool = new ExecutorCompletionService<String>(threadPool);

int previousLowerLimitNodeNumber = 0 ;
for(int taskNumber = 1; taskNumber <= this.nbOfTasks; taskNumber++) {
int previousLowerLimitNodeNumber = 0;
int noOfTasks = new Double(Math.ceil(nodesToExport.size() / this.exportChunkSize)).intValue();

log.info("Number of tasks: " + noOfTasks);

for (int taskNumber = 1; taskNumber <= noOfTasks; taskNumber++) {
int upperLimitNodeNumber = calculateNextUpperLimitNodeNumber(previousLowerLimitNodeNumber, nodesToExport.size());
int lowerLimitNodeNumber = calculateNextLowerLimitNodeNumber(previousLowerLimitNodeNumber, upperLimitNodeNumber);
log.info("Task number"+ taskNumber +" LowerLimitNodeNumber " + lowerLimitNodeNumber);
log.info("Task number"+ taskNumber +" UpperLimitNodeNumber " + upperLimitNodeNumber);
log.info("Task number" + taskNumber + " LowerLimitNodeNumber " + lowerLimitNodeNumber);
log.info("Task number" + taskNumber + " UpperLimitNodeNumber " + upperLimitNodeNumber);

previousLowerLimitNodeNumber = upperLimitNodeNumber;

List<NodeRef> nodesForCurrentThread = nodesToExport.subList(lowerLimitNodeNumber, upperLimitNodeNumber);
pool.submit(new NodeExportTask(nodesForCurrentThread, exportVersions, revisionHead, dao, fileFolder, taskNumber));
threadPool.submit(new NodeExportTask(nodesForCurrentThread, exportVersions, revisionHead, dao, fileFolder, taskNumber));
}
}

for(int i = 0; i < this.nbOfTasks; i++){
String result = pool.take().get();
log.info(result);
private int calculateNextLowerLimitNodeNumber(int previousLowerLimitNodeNumber, int upperLimitNodeNumber) {
int nextLowerLimitNodeNumber = previousLowerLimitNodeNumber;
if (nextLowerLimitNodeNumber > upperLimitNodeNumber) {
nextLowerLimitNodeNumber = upperLimitNodeNumber;
}
}

private int calculateNextLowerLimitNodeNumber(int previousLowerLimitNodeNumber, int upperLimitNodeNumber){
int nextLowerLimitNodeNumber = previousLowerLimitNodeNumber;
if(nextLowerLimitNodeNumber > upperLimitNodeNumber){
nextLowerLimitNodeNumber = upperLimitNodeNumber;
}
return nextLowerLimitNodeNumber;
}
return nextLowerLimitNodeNumber;
}

private int calculateNextUpperLimitNodeNumber(int previousLowerLimitNodeNumber, int nodesToExportSize){
private int calculateNextUpperLimitNodeNumber(int previousLowerLimitNodeNumber, int nodesToExportSize) {
int nextUpperLimitNodeNumber = previousLowerLimitNodeNumber + this.exportChunkSize;
if (nextUpperLimitNodeNumber > nodesToExportSize){
if (nextUpperLimitNodeNumber > nodesToExportSize) {
nextUpperLimitNodeNumber = nodesToExportSize;
}
return nextUpperLimitNodeNumber;
Expand Down
Loading

0 comments on commit c38823b

Please sign in to comment.