diff --git a/alfresco-bulk-export/src/main/java/org/alfresco/extensions/bulkexport/Export.java b/alfresco-bulk-export/src/main/java/org/alfresco/extensions/bulkexport/Export.java index 9df256a..c3a6701 100644 --- a/alfresco-bulk-export/src/main/java/org/alfresco/extensions/bulkexport/Export.java +++ b/alfresco-bulk-export/src/main/java/org/alfresco/extensions/bulkexport/Export.java @@ -78,8 +78,7 @@ public void execute(WebScriptRequest req, WebScriptResponse res) throws IOExcept boolean useNodeCache = false; int nbOfThreads = 1; int exportChunkSize = 10; - int nbOfTasks = 2; - + if (req.getParameter("ignoreExported") != null) { if(req.getParameter("ignoreExported").equals("true")) @@ -134,18 +133,10 @@ public void execute(WebScriptRequest req, WebScriptResponse res) throws IOExcept } } - if (req.getParameter("nbOfTasks") != null) - { - if(StringUtils.isNumeric(req.getParameter("nbOfTasks"))) - { - nbOfTasks = (int)Integer.parseInt(req.getParameter("nbOfTasks")); - } - } - //init variables dao = new AlfrescoExportDaoImpl(this.serviceRegistry); fileFolder = new FileFolder(res, base, scapeExported); - engine = new Engine(dao, fileFolder, exportVersions, revisionHead, useNodeCache, nbOfThreads, exportChunkSize, nbOfTasks); + engine = new Engine(dao, fileFolder, exportVersions, revisionHead, useNodeCache, nbOfThreads, exportChunkSize); NodeRef nf = null; @@ -185,7 +176,6 @@ public void execute(WebScriptRequest req, WebScriptResponse res) throws IOExcept res.getWriter().write(" export versions : " + exportVersions + "\n"); res.getWriter().write(" bulk import revision scheme: " + !revisionHead + "\n"); res.getWriter().write(" Nb. of threads: " + nbOfThreads + "\n"); - res.getWriter().write(" Nb. of Tasks: " + nbOfTasks + "\n"); res.getWriter().write(" Chunk size: " + exportChunkSize + "\n"); long duration = timer.elapsedTime(); diff --git a/alfresco-bulk-export/src/main/java/org/alfresco/extensions/bulkexport/controler/Engine.java b/alfresco-bulk-export/src/main/java/org/alfresco/extensions/bulkexport/controler/Engine.java index 1d39bf2..f9abdba 100644 --- a/alfresco-bulk-export/src/main/java/org/alfresco/extensions/bulkexport/controler/Engine.java +++ b/alfresco-bulk-export/src/main/java/org/alfresco/extensions/bulkexport/controler/Engine.java @@ -1,18 +1,18 @@ /** - * This file is part of Alfresco Bulk Export Tool. - * - * Alfresco Bulk Export Tool is free software: you can redistribute it - * and/or modify it under the terms of the GNU General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * Alfresco Bulk Export Tool is distributed in the hope that it will be - * useful, but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General - * Public License for more details. - * - * You should have received a copy of the GNU General Public License along - * with Alfresco Bulk Export Tool. If not, see . + * This file is part of Alfresco Bulk Export Tool. + *

+ * Alfresco Bulk Export Tool is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + *

+ * Alfresco Bulk Export Tool is distributed in the hope that it will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General + * Public License for more details. + *

+ * You should have received a copy of the GNU General Public License along + * with Alfresco Bulk Export Tool. If not, see . */ package org.alfresco.extensions.bulkexport.controler; @@ -25,7 +25,9 @@ import java.io.*; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** @@ -34,8 +36,7 @@ * @author Denys G. Santos (gsdenys@gmail.com) * @version 1.0.1 */ -public class Engine -{ +public class Engine { Log log = LogFactory.getLog(Engine.class); /** Data Access Object */ @@ -61,25 +62,20 @@ public class Engine /** How many Nodes are exported per process*/ private int exportChunkSize; - /** Nb of Tasks for the thread pool */ - private int nbOfTasks; - /** * Engine Default Builder * * @param dao Data Access Object * @param fileFolder File and Folder magager */ - public Engine(AlfrescoExportDao dao, FileFolder fileFolder, boolean exportVersions, boolean revisionHead, boolean useNodeCache, int nbOfThreads, int exportChunkSize, int nbOfTasks) - { - this.dao = dao; + public Engine(AlfrescoExportDao dao, FileFolder fileFolder, boolean exportVersions, boolean revisionHead, boolean useNodeCache, int nbOfThreads, int exportChunkSize) { + this.dao = dao; this.fileFolder = fileFolder; this.exportVersions = exportVersions; this.revisionHead = revisionHead; this.useNodeCache = useNodeCache; this.nbOfThreads = nbOfThreads; this.exportChunkSize = exportChunkSize; - this.nbOfTasks = nbOfTasks; } /** @@ -87,14 +83,12 @@ public Engine(AlfrescoExportDao dao, FileFolder fileFolder, boolean exportVersio * * @param nodeRef */ - public void execute(NodeRef nodeRef) throws Exception - { + public void execute(NodeRef nodeRef) throws Exception { // case node is folder create a folder and execute recursively // other else create file log.debug("execute (noderef)"); - if(!this.dao.isNodeIgnored(nodeRef.toString())) - { + if (!this.dao.isNodeIgnored(nodeRef.toString())) { log.info("Find all nodes to export (no history)"); List allNodes = getNodesToExport(nodeRef); log.info("Nodes to export = " + allNodes.size()); @@ -103,61 +97,49 @@ public void execute(NodeRef nodeRef) throws Exception log.debug("execute (noderef) finished"); } - private List getNodesToExport(NodeRef rootNode) throws Exception - { + private List getNodesToExport(NodeRef rootNode) throws Exception { List nodes = null; - if (useNodeCache) - { + if (useNodeCache) { nodes = retrieveNodeListFromCache(rootNode); } - if (nodes == null) - { + if (nodes == null) { nodes = findAllNodes(rootNode); storeNodeListToCache(rootNode, nodes); - if (useNodeCache) - { + if (useNodeCache) { log.info("Generated Cached Node list"); throw new CacheGeneratedException("Generated Cached Node List Only"); } - } - else - { + } else { log.info("Using Cached Node list"); } return nodes; } - private String nodeFileName(NodeRef rootNode) - { + private String nodeFileName(NodeRef rootNode) { File fname = new File(fileFolder.basePath(), rootNode.getId() + ".cache"); return fname.getPath(); } - private void storeNodeListToCache(NodeRef rootNode, List list) throws Exception - { + private void storeNodeListToCache(NodeRef rootNode, List list) throws Exception { // get a better name - FileOutputStream fos= new FileOutputStream(nodeFileName(rootNode)); - ObjectOutputStream oos= new ObjectOutputStream(fos); + FileOutputStream fos = new FileOutputStream(nodeFileName(rootNode)); + ObjectOutputStream oos = new ObjectOutputStream(fos); oos.writeObject(list); oos.close(); fos.close(); } - private List retrieveNodeListFromCache(NodeRef rootNode) throws Exception - { + private List retrieveNodeListFromCache(NodeRef rootNode) throws Exception { List list = null; - try - { + try { FileInputStream fis = new FileInputStream(nodeFileName(rootNode)); ObjectInputStream ois = new ObjectInputStream(fis); list = (List) ois.readObject(); ois.close(); - } - catch (FileNotFoundException e) - { + } catch (FileNotFoundException e) { // this exception means we have no noelist cache - we just ignore and continue log.debug("could not open nodelist cache file"); } @@ -169,29 +151,23 @@ private List retrieveNodeListFromCache(NodeRef rootNode) throws Excepti * * @param nodeRef */ - private List findAllNodes(NodeRef nodeRef) throws Exception - { + private List findAllNodes(NodeRef nodeRef) throws Exception { List nodes = new ArrayList(); log.debug("findAllNodes (noderef)"); - try{ - if(!this.dao.isNodeIgnored(nodeRef.toString())) - { - if(this.dao.isFolder(nodeRef)) - { + try { + if (!this.dao.isNodeIgnored(nodeRef.toString())) { + if (this.dao.isFolder(nodeRef)) { nodes.add(nodeRef); // add folder as well - List children= this.dao.getChildren(nodeRef); - for (NodeRef child : children) - { + List children = this.dao.getChildren(nodeRef); + for (NodeRef child : children) { nodes.addAll(this.findAllNodes(child)); } - } - else - { + } else { nodes.add(nodeRef); } } - }catch (Throwable e){ + } catch (Throwable e) { e.printStackTrace(); log.info("Error Multithreading", e); throw e; @@ -201,47 +177,43 @@ private List findAllNodes(NodeRef nodeRef) throws Exception } - /** * Creates Thread Pool and Tasks with dispatch nodes * * @param nodesToExport */ - private void exportNodes(final List nodesToExport) throws InterruptedException, ExecutionException - { + private void exportNodes(final List nodesToExport) throws InterruptedException, ExecutionException { ExecutorService threadPool = Executors.newFixedThreadPool(nbOfThreads); - CompletionService pool = new ExecutorCompletionService(threadPool); - int previousLowerLimitNodeNumber = 0 ; - for(int taskNumber = 1; taskNumber <= this.nbOfTasks; taskNumber++) { + int previousLowerLimitNodeNumber = 0; + int noOfTasks = new Double(Math.ceil(nodesToExport.size() / this.exportChunkSize)).intValue(); + + log.info("Number of tasks: " + noOfTasks); + + for (int taskNumber = 1; taskNumber <= noOfTasks; taskNumber++) { int upperLimitNodeNumber = calculateNextUpperLimitNodeNumber(previousLowerLimitNodeNumber, nodesToExport.size()); int lowerLimitNodeNumber = calculateNextLowerLimitNodeNumber(previousLowerLimitNodeNumber, upperLimitNodeNumber); - log.info("Task number"+ taskNumber +" LowerLimitNodeNumber " + lowerLimitNodeNumber); - log.info("Task number"+ taskNumber +" UpperLimitNodeNumber " + upperLimitNodeNumber); + log.info("Task number" + taskNumber + " LowerLimitNodeNumber " + lowerLimitNodeNumber); + log.info("Task number" + taskNumber + " UpperLimitNodeNumber " + upperLimitNodeNumber); previousLowerLimitNodeNumber = upperLimitNodeNumber; List nodesForCurrentThread = nodesToExport.subList(lowerLimitNodeNumber, upperLimitNodeNumber); - pool.submit(new NodeExportTask(nodesForCurrentThread, exportVersions, revisionHead, dao, fileFolder, taskNumber)); + threadPool.submit(new NodeExportTask(nodesForCurrentThread, exportVersions, revisionHead, dao, fileFolder, taskNumber)); } + } - for(int i = 0; i < this.nbOfTasks; i++){ - String result = pool.take().get(); - log.info(result); + private int calculateNextLowerLimitNodeNumber(int previousLowerLimitNodeNumber, int upperLimitNodeNumber) { + int nextLowerLimitNodeNumber = previousLowerLimitNodeNumber; + if (nextLowerLimitNodeNumber > upperLimitNodeNumber) { + nextLowerLimitNodeNumber = upperLimitNodeNumber; } - } - - private int calculateNextLowerLimitNodeNumber(int previousLowerLimitNodeNumber, int upperLimitNodeNumber){ - int nextLowerLimitNodeNumber = previousLowerLimitNodeNumber; - if(nextLowerLimitNodeNumber > upperLimitNodeNumber){ - nextLowerLimitNodeNumber = upperLimitNodeNumber; - } - return nextLowerLimitNodeNumber; - } + return nextLowerLimitNodeNumber; + } - private int calculateNextUpperLimitNodeNumber(int previousLowerLimitNodeNumber, int nodesToExportSize){ + private int calculateNextUpperLimitNodeNumber(int previousLowerLimitNodeNumber, int nodesToExportSize) { int nextUpperLimitNodeNumber = previousLowerLimitNodeNumber + this.exportChunkSize; - if (nextUpperLimitNodeNumber > nodesToExportSize){ + if (nextUpperLimitNodeNumber > nodesToExportSize) { nextUpperLimitNodeNumber = nodesToExportSize; } return nextUpperLimitNodeNumber; diff --git a/alfresco-bulk-export/src/main/java/org/alfresco/extensions/bulkexport/controler/NodeExportTask.java b/alfresco-bulk-export/src/main/java/org/alfresco/extensions/bulkexport/controler/NodeExportTask.java index 716775a..7baa7c4 100644 --- a/alfresco-bulk-export/src/main/java/org/alfresco/extensions/bulkexport/controler/NodeExportTask.java +++ b/alfresco-bulk-export/src/main/java/org/alfresco/extensions/bulkexport/controler/NodeExportTask.java @@ -16,37 +16,49 @@ /** * This thread class manages the output of nodes on the filesystem. - * @author Simon Girardin * + * @author Simon Girardin */ -public class NodeExportTask implements Callable{ +public class NodeExportTask implements Callable { Log log = LogFactory.getLog(NodeExportTask.class); - /** Flag indicating if versions are exported */ + /** + * Flag indicating if versions are exported + */ private boolean exportVersions; - /** If true the the head revision will be named, eg. if head revision is 1.4 then filename will contain the revision. - * This behaviour is not how the bulk importer expects revisions */ + /** + * If true the the head revision will be named, eg. if head revision is 1.4 then filename will contain the revision. + * This behaviour is not how the bulk importer expects revisions + */ private boolean revisionHead; - /** Data Access Object */ + /** + * Data Access Object + */ private AlfrescoExportDao dao; - /** File and folder manager */ + /** + * File and folder manager + */ private FileFolder fileFolder; - /**Nodes this thread has to export*/ + /** + * Nodes this thread has to export + */ private List nodesToExport; - /**Task Number for keeping logs accurate*/ + /** + * Task Number for keeping logs accurate + */ private int taskNumber; - - NodeExportTask(List nodesToExport, boolean exportVersions, boolean revisionHead, AlfrescoExportDao dao, FileFolder fileFolder, int taskNumber){ + + NodeExportTask(List nodesToExport, boolean exportVersions, boolean revisionHead, AlfrescoExportDao dao, FileFolder fileFolder, int taskNumber) { this.dao = dao; this.fileFolder = fileFolder; this.nodesToExport = nodesToExport; - this.exportVersions = exportVersions; + this.exportVersions = exportVersions; this.revisionHead = revisionHead; this.taskNumber = taskNumber; } @@ -57,35 +69,30 @@ public class NodeExportTask implements Callable{ * @param file * @throws Exception */ - private void createFile(NodeRef headNode, NodeRef file, String revision, boolean isHeadRevision) throws Exception - { + private void createFile(NodeRef headNode, NodeRef file, String revision, boolean isHeadRevision) throws Exception { String path = null; - if (revision == null) - { - log.error("createFile (headNode: "+headNode.toString() + " , filenode: )"+file.toString()+" , revision: " + revision + ")"); + if (revision == null) { + log.error("createFile (headNode: " + headNode.toString() + " , filenode: )" + file.toString() + " , revision: " + revision + ")"); throw new Exception("revision for node was not found"); } path = this.dao.getPath(headNode) + "." + revision; // if we are exporting using the revisions compatible with alfresco bulk import then we do not number the head(most recent) revisoon - if (!revisionHead && isHeadRevision) - { + if (!revisionHead && isHeadRevision) { path = this.dao.getPath(headNode); } doCreateFile(file, path); } - private void createFile(NodeRef file) throws Exception - { + private void createFile(NodeRef file) throws Exception { String path = null; path = this.dao.getPath(file); doCreateFile(file, path); } - private void doCreateFile(NodeRef file, String path) throws Exception - { + private void doCreateFile(NodeRef file, String path) throws Exception { //get Informations log.debug("doCreateFile (noderef)"); @@ -94,12 +101,10 @@ private void doCreateFile(NodeRef file, String path) throws Exception List aspects = null; Map properties = null; - try - { + try { String fname = this.fileFolder.createFullPath(path); log.debug("doCreateFile file =" + fname); - if (this.dao.getContentAndStoreInFile(file, fname) == false) - { + if (this.dao.getContentAndStoreInFile(file, fname) == false) { log.debug("doCreateFile ignore this file"); return; } @@ -112,9 +117,7 @@ private void doCreateFile(NodeRef file, String path) throws Exception type = null; properties = null; aspects = null; - } - catch (Exception e) - { + } catch (Exception e) { // for debugging purposes log.error("doCreateFile failed for noderef = " + file.toString()); throw e; @@ -125,17 +128,16 @@ private void doCreateFile(NodeRef file, String path) throws Exception /** * Create Folder and XML Metadata * - * @param file + * @param folder * @throws Exception */ - private void createFolder(NodeRef folder) throws Exception - { + private void createFolder(NodeRef folder) throws Exception { //Get Data log.debug("createFolder"); String path = this.dao.getPath(folder); - log.debug("createFolder path="+path); + log.debug("createFolder path=" + path); String type = this.dao.getType(folder); - log.debug("createFolder type="+type); + log.debug("createFolder type=" + type); List aspects = this.dao.getAspectsAsString(folder); Map properties = this.dao.getPropertiesAsString(folder); @@ -144,34 +146,27 @@ private void createFolder(NodeRef folder) throws Exception this.fileFolder.insertFileProperties(type, aspects, properties, path); } - private void exportHeadRevision(NodeRef nodeRef) throws Exception - { + private void exportHeadRevision(NodeRef nodeRef) throws Exception { this.createFile(nodeRef); } - private void exportFullRevisionHistory(NodeRef nodeRef) throws Exception - { - Map nodes = this.dao.getNodeRefHistory(nodeRef.toString()); - if (nodes != null) - { - List sortedKeys=new ArrayList(nodes.keySet()); + private void exportFullRevisionHistory(NodeRef nodeRef) throws Exception { + Map nodes = this.dao.getNodeRefHistory(nodeRef.toString()); + if (nodes != null) { + List sortedKeys = new ArrayList(nodes.keySet()); Collections.sort(sortedKeys, new VersionNumberComparator()); - if (sortedKeys.size() < 1) - { + if (sortedKeys.size() < 1) { throw new Exception("no revisions available"); } - String headRevision = (String)sortedKeys.get(sortedKeys.size()-1); + String headRevision = (String) sortedKeys.get(sortedKeys.size() - 1); - for (String revision : nodes.keySet()) - { + for (String revision : nodes.keySet()) { NodeRefRevision nodeRevision = nodes.get(revision); this.createFile(nodeRef, nodeRevision.node, revision, headRevision == revision); } - } - else - { + } else { // no revision history so lets just create the most recent revision log.debug("execute (noderef) no revision history found, dump node as head revision"); this.createFile(nodeRef, nodeRef, "1.0", true); @@ -184,10 +179,14 @@ public String call() throws Exception { AuthenticationUtil.setAdminUserAsFullyAuthenticatedUser(); int logCount = nodesToExport.size(); - log.info("Running task "+ taskNumber + " will export " + logCount +" nodes"); + log.info("Running task " + taskNumber + " will export " + logCount + " nodes"); final int NODES_TO_PROCESS = 100; try { for (NodeRef nodeRef : nodesToExport) { + if (Thread.currentThread().isInterrupted()) { + log.error(Thread.currentThread().getName() + " interrupted"); + throw new InterruptedException(); + } logCount--; if (this.dao.isFolder(nodeRef)) { this.createFolder(nodeRef); @@ -202,7 +201,7 @@ public String call() throws Exception { log.info("Task " + taskNumber + " has remaining nodes to process " + logCount); } } - }catch (Exception e) { + } catch (Exception e) { log.error(e); } AuthenticationUtil.clearCurrentSecurityContext();