pmonks/alfresco-bulk-import

View on GitHub
amp/src/main/java/org/alfresco/extension/bulkimport/impl/BatchImporterImpl.java

Summary

Maintainability
F
6 days
Test Coverage
/*
 * Copyright (C) 2007 Peter Monks
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *     http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * 
 * This file is part of an unsupported extension to Alfresco.
 * 
 */

package org.alfresco.extension.bulkimport.impl;

import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.alfresco.service.cmr.version.Version;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.alfresco.model.ContentModel;
import org.alfresco.repo.policy.BehaviourFilter;
import org.alfresco.repo.security.authentication.AuthenticationUtil;
import org.alfresco.repo.security.authentication.AuthenticationUtil.RunAsWork;
import org.alfresco.repo.transaction.RetryingTransactionHelper;
import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback;
import org.alfresco.repo.version.VersionModel;
import org.alfresco.service.ServiceRegistry;
import org.alfresco.service.cmr.model.FileInfo;
import org.alfresco.service.cmr.model.FileNotFoundException;
import org.alfresco.service.cmr.repository.ContentService;
import org.alfresco.service.cmr.repository.ContentWriter;
import org.alfresco.service.cmr.repository.InvalidNodeRefException;
import org.alfresco.service.cmr.repository.NodeRef;
import org.alfresco.service.cmr.repository.NodeService;
import org.alfresco.service.cmr.repository.StoreRef;
import org.alfresco.service.cmr.version.VersionService;
import org.alfresco.service.cmr.version.VersionType;
import org.alfresco.service.namespace.NamespaceService;
import org.alfresco.service.namespace.QName;

import org.alfresco.extension.bulkimport.BulkImportStatus;
import org.alfresco.extension.bulkimport.source.BulkImportItem;
import org.alfresco.extension.bulkimport.source.BulkImportItemVersion;

import static org.alfresco.extension.bulkimport.util.Utils.*;
import static org.alfresco.extension.bulkimport.util.LogUtils.*;


/**
 * This class implements the logic for importing a batch into Alfresco.
 *
 * @author Peter Monks (pmonks@gmail.com)
 *
 */
public final class BatchImporterImpl
    implements BatchImporter
{
    private final static Log log = LogFactory.getLog(BatchImporterImpl.class);

    private final static String REGEX_SPLIT_PATH_ELEMENTS = "[\\\\/]+";

    private final ServiceRegistry serviceRegistry;
    private final BehaviourFilter behaviourFilter;
    private final NodeService     nodeService;
    private final VersionService  versionService;
    private final ContentService  contentService;
    
    
    private final WritableBulkImportStatus importStatus;
    
    
    public BatchImporterImpl(final ServiceRegistry          serviceRegistry,
                             final BehaviourFilter          behaviourFilter,
                             final WritableBulkImportStatus importStatus)
    {
        // PRECONDITIONS
        assert serviceRegistry != null : "serviceRegistry must not be null.";
        assert behaviourFilter != null : "behaviourFilter must not be null.";
        assert importStatus    != null : "importStatus must not be null.";
        
        // Body
        this.serviceRegistry = serviceRegistry;
        this.behaviourFilter = behaviourFilter;
        this.importStatus    = importStatus;
        
        this.nodeService    = serviceRegistry.getNodeService();
        this.versionService = serviceRegistry.getVersionService();
        this.contentService = serviceRegistry.getContentService();
    }
    

    /**
     * @see org.alfresco.extension.bulkimport.impl.BatchImporter#importBatch(String, NodeRef, Batch, boolean, boolean)
     */
    @Override
    public final void importBatch(final String  userId,
                                  final NodeRef target,
                                  final Batch   batch,
                                  final boolean replaceExisting,
                                  final boolean dryRun)
        throws InterruptedException,
               OutOfOrderBatchException
    {
        long start = System.nanoTime();
        
        final String batchName = "Batch #" + batch.getNumber() + ", " + batch.size() + " items, " + batch.sizeInBytes() + " bytes.";
        if (debug(log)) debug(log, "Importing " + batchName);
        importStatus.setCurrentlyImporting(batchName);
        
        AuthenticationUtil.runAs(new RunAsWork<Object>()
        {
            @Override
            public Object doWork()
                throws Exception
            {
                importBatchInTxn(target, batch, replaceExisting, dryRun);
                return(null);
            }
        }, userId);
        
        if (debug(log))
        {
            long end = System.nanoTime();
            debug(log, "Batch #" + batch.getNumber() + " (containing " + batch.size() + " nodes) processed in " + getDurationInSeconds(end - start) + ".");
        }
    }

    
    private final void importBatchInTxn(final NodeRef target,
                                        final Batch   batch,
                                        final boolean replaceExisting,
                                        final boolean dryRun)
        throws InterruptedException,
               OutOfOrderBatchException
    {
        RetryingTransactionHelper txnHelper = serviceRegistry.getRetryingTransactionHelper();

        txnHelper.doInTransaction(new RetryingTransactionCallback<Object>()
        {
            @Override
            public Object execute()
                throws Exception
            {
                // Disable the auditable aspect's behaviours for this transaction, to allow creation & modification dates to be set
                behaviourFilter.disableBehaviour(ContentModel.ASPECT_AUDITABLE);
                
                importBatchImpl(target, batch, replaceExisting, dryRun);
                return(null);
            }
        },
        false,   // read only flag, false=R/W txn
        false);  // requires new txn flag, false=does not require a new txn if one is already in progress (which should never be the case here)

        importStatus.batchCompleted(batch);
    }
    
    
    private final void importBatchImpl(final NodeRef target,
                                       final Batch   batch,
                                       final boolean replaceExisting,
                                       final boolean dryRun)
        throws InterruptedException
    {
        if (batch != null)
        {
            for (final BulkImportItem<BulkImportItemVersion> item : batch)
            {
                if (importStatus.isStopping() || Thread.currentThread().isInterrupted()) throw new InterruptedException(Thread.currentThread().getName() + " was interrupted. Terminating early.");
                
                importItem(target, item, replaceExisting, dryRun);
            }
        }
    }
    
    
    private final void importItem(final NodeRef                               target,
                                  final BulkImportItem<BulkImportItemVersion> item,
                                  final boolean                               replaceExisting,
                                  final boolean                               dryRun)
        throws InterruptedException
    {
        try
        {
            if (trace(log)) trace(log, "Importing " + (item.isDirectory() ? "directory " : "file ") + String.valueOf(item) + ".");
            
            NodeRef nodeRef     = findOrCreateNode(target, item, replaceExisting, dryRun);
            boolean isDirectory = item.isDirectory();
            
            if (nodeRef != null)
            {
                // We're creating or replacing the item, so import it
                if (isDirectory)
                {
                    importDirectory(nodeRef, item, dryRun);
                }
                else
                {
                    importFile(nodeRef, item, dryRun);
                }
            }
            
            if (trace(log)) trace(log, "Finished importing " + String.valueOf(item));
        }
        catch (final InterruptedException ie)
        {
            Thread.currentThread().interrupt();            
            throw ie;
        }
        catch (final OutOfOrderBatchException oobe)
        {
            throw oobe;
        }
        catch (final Exception e)
        {
            // Capture the item that failed, along with the exception
            throw new ItemImportException(item, e);
        }
    }
    
    
    private final NodeRef findOrCreateNode(final NodeRef                               target,
                                           final BulkImportItem<BulkImportItemVersion> item,
                                           final boolean                               replaceExisting,
                                           final boolean                               dryRun)
    {
        NodeRef result           = null;
        String  nodeName         = item.getName();
        String  nodeNamespace    = item.getNamespace();
        QName   nodeQName        = QName.createQName(nodeNamespace == null ? NamespaceService.CONTENT_MODEL_1_0_URI : nodeNamespace,
                                                     QName.createValidLocalName(nodeName));
        boolean isDirectory      = item.isDirectory();
        String  parentAssoc      = item.getParentAssoc();
        QName   parentAssocQName = parentAssoc == null ? ContentModel.ASSOC_CONTAINS : createQName(serviceRegistry, parentAssoc);
        NodeRef parentNodeRef    = null;
        
        try
        {
            parentNodeRef = getParent(target, item);
        
            if (parentNodeRef == null)
            {
                parentNodeRef = target;
            }
            
            // Find the node
            if (trace(log)) trace(log, "Searching for node with name '" + nodeName + "' within node '" + String.valueOf(parentNodeRef) + "' with parent association '" + String.valueOf(parentAssocQName) + "'.");
            result = nodeService.getChildByName(parentNodeRef, parentAssocQName, nodeName);
        }
        catch (final OutOfOrderBatchException oobe)
        {
            if (dryRun)
            {
                parentNodeRef = new NodeRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE, "dry-run-fake-parent-node-ref");
            }
            else
            {
                throw oobe;
            }
        }
        
        if (result == null)    // We didn't find it, so create a new node in the repo. 
        {
            String itemType      = item.getVersions().first().getType();
            QName  itemTypeQName = itemType == null ? (isDirectory ? ContentModel.TYPE_FOLDER : ContentModel.TYPE_CONTENT) : createQName(serviceRegistry, itemType);

            if (dryRun)
            {
                if (info(log)) info(log, "[DRY RUN] Would have created new node of type '" + String.valueOf(itemTypeQName) + "' with qname '" + String.valueOf(nodeQName) + "' within node '" + String.valueOf(parentNodeRef) + "' with parent association '" + String.valueOf(parentAssocQName) + "'.");
                result = new NodeRef(StoreRef.STORE_REF_WORKSPACE_SPACESSTORE, "dry-run-fake-created-node-ref");
            }
            else
            {
                if (trace(log)) trace(log, "Creating new node of type '" + String.valueOf(itemTypeQName) + "' with qname '" + String.valueOf(nodeQName) + "' within node '" + String.valueOf(parentNodeRef) + "' with parent association '" + String.valueOf(parentAssocQName) + "'.");
                Map<QName, Serializable> props = new HashMap<>();
                props.put(ContentModel.PROP_NAME, nodeName);
                result = nodeService.createNode(parentNodeRef, parentAssocQName, nodeQName, itemTypeQName, props).getChildRef();
            }
        }
        else if (replaceExisting)
        {
            if (trace(log)) trace(log, "Found content node '" + String.valueOf(result) + "', replacing it.");
        }
        else
        {
            if (info(log)) info(log, "Skipping '" + item.getName() + "' as it already exists in the repository and 'replace existing' is false.");
            result = null;
            importStatus.incrementTargetCounter(BulkImportStatus.TARGET_COUNTER_NODES_SKIPPED);
        }
        
        return(result);
    }
    
    
    private NodeRef getParent(final NodeRef target, final BulkImportItem<BulkImportItemVersion> item)
    {
        NodeRef result = null;
        
        final String itemParentPath         = item.getRelativePathOfParent();
        List<String> itemParentPathElements = (itemParentPath == null || itemParentPath.length() == 0) ? null : Arrays.asList(itemParentPath.split(REGEX_SPLIT_PATH_ELEMENTS));
        
        if (debug(log)) debug(log, "Finding parent folder '" + itemParentPath + "'.");
        
        if (itemParentPathElements != null && itemParentPathElements.size() > 0)
        {
            FileInfo fileInfo = null;
                
            try
            {
                //####TODO: I THINK THIS WILL FAIL IN THE PRESENCE OF CUSTOM NAMESPACES / PARENT ASSOC QNAMES!!!!
                fileInfo = serviceRegistry.getFileFolderService().resolveNamePath(target, itemParentPathElements, false);
            }
            catch (final FileNotFoundException fnfe)  // This should never be triggered due to the last parameter in the resolveNamePath call, but just in case
            {
                throw new OutOfOrderBatchException(itemParentPath, fnfe);
            }
            
            // Out of order batch submission (child arrived before parent)
            if (fileInfo == null)
            {
                throw new OutOfOrderBatchException(itemParentPath);
            }
            
            result = fileInfo.getNodeRef();
        }
        
        return(result);
    }
    
    

    private final void importDirectory(final NodeRef                               nodeRef,
                                       final BulkImportItem<BulkImportItemVersion> item,
                                       final boolean                               dryRun)
        throws InterruptedException
    {
        if (item.getVersions() != null &&
            item.getVersions().size() > 0)
        {
            if (item.getVersions().size() > 1)
            {
                warn(log, "Skipping versions for directory '" + item.getName() + "' - Alfresco does not support versioned spaces.");
            }
            
            final BulkImportItemVersion lastVersion = item.getVersions().last();

            if (lastVersion.hasContent())
            {
                warn(log, "Skipping content for directory '" + item.getName() + "' - Alfresco doesn't support content in spaces.");
            }
            
            // Import the last version's metadata only
            importVersionMetadata(nodeRef, lastVersion, dryRun);
        }
        else
        {
            if (trace(log)) trace(log, "No metadata to import for directory '" + item.getName() + "'.");
        }

        if (trace(log)) trace(log, "Finished importing metadata for directory " + item.getName() + ".");
    }


    private final void importFile(final NodeRef                               nodeRef,
                                  final BulkImportItem<BulkImportItemVersion> item,
                                  final boolean                               dryRun)
        throws InterruptedException
    {
        final int numberOfVersions = item.getVersions().size();
        
        if (numberOfVersions == 0)
        {
            throw new IllegalStateException(item.getName() + " (being imported into " + String.valueOf(nodeRef) + ") has no versions.");
        }
        else if (numberOfVersions == 1)
        {
            importVersion(nodeRef, null, item.getVersions().first(), dryRun, true);
        }
        else
        {
            final BulkImportItemVersion firstVersion = item.getVersions().first();
            BulkImportItemVersion previousVersion = null;
            
            // Add the cm:versionable aspect if it isn't already there
            if (firstVersion.getAspects() == null ||
                firstVersion.getAspects().isEmpty() ||
                (!firstVersion.getAspects().contains(ContentModel.ASPECT_VERSIONABLE.toString()) &&
                 !firstVersion.getAspects().contains(ContentModel.ASPECT_VERSIONABLE.toPrefixString())))
            {
                if (debug(log)) debug(log, item.getName() + " has versions but is missing the cm:versionable aspect. Adding it.");
                nodeService.addAspect(nodeRef, ContentModel.ASPECT_VERSIONABLE, null);
            }
        
            for (final BulkImportItemVersion version : item.getVersions())
            {
                if (importStatus.isStopping() || Thread.currentThread().isInterrupted()) throw new InterruptedException(Thread.currentThread().getName() + " was interrupted. Terminating early.");
                
                importVersion(nodeRef, previousVersion, version, dryRun, false);
                previousVersion = version;
            }
        }
        
        if (trace(log)) trace(log, "Finished importing " + numberOfVersions + " version" + (numberOfVersions == 1 ? "" : "s") + " of file " + item.getName() + ".");
    }
    
    
    private final void importVersion(final NodeRef               nodeRef,
                                     final BulkImportItemVersion previousVersion,
                                     final BulkImportItemVersion version,
                                     final boolean               dryRun,
                                     final boolean               onlyOneVersion)
        throws InterruptedException
    {
        Map<String, Serializable> versionProperties = new HashMap<>();
        boolean                   isMajor           = true;
        
        if (version == null)
        {
            throw new IllegalStateException("version was null. This is indicative of a bug in the chosen import source.");
        }
        
        importVersionContentAndMetadata(nodeRef, version, dryRun);
        
        if (previousVersion != null && version.getVersionNumber() != null)
        {
            final BigDecimal difference = version.getVersionNumber().subtract(previousVersion.getVersionNumber());
            
            isMajor = difference.compareTo(BigDecimal.ONE) >= 0;
        }

        // Note: PROP_VERSION_LABEL is a "reserved" property, and cannot be modified by custom code.
        // In other words, we can't use the source's version label as the version label in Alfresco.  :-(
        // See: https://github.com/pmonks/alfresco-bulk-import/issues/13
//        versionProperties.put(ContentModel.PROP_VERSION_LABEL.toString(), String.valueOf(version.getVersionNumber().toString()));

        versionProperties.put(VersionModel.PROP_VERSION_TYPE, isMajor ? VersionType.MAJOR : VersionType.MINOR);

        if (version.getVersionComment() != null)
        {
            versionProperties.put(Version.PROP_DESCRIPTION, version.getVersionComment());
        }
        
        if (dryRun)
        {
            if (info(log)) info(log, "[DRY RUN] Would have created " + (isMajor ? "major" : "minor") + " version of node '" + String.valueOf(nodeRef) + "'.");
        }
        else
        {
            // Only create versions if we have to - this is an exceptionally expensive operation in Alfresco
            if (onlyOneVersion)
            {
                if (trace(log)) trace(log, "Skipping creation of a version for node '" + String.valueOf(nodeRef) + "' as it only has one version.");
            }
            else
            {
                if (trace(log)) trace(log, "Creating " + (isMajor ? "major" : "minor") + " version of node '" + String.valueOf(nodeRef) + "'.");
                versionService.createVersion(nodeRef, versionProperties);
            }
        }
    }
    
    
    private final void importVersionContentAndMetadata(final NodeRef               nodeRef,
                                                       final BulkImportItemVersion version,
                                                       final boolean               dryRun)
        throws InterruptedException
    {
        if (version.hasMetadata())
        {
            importVersionMetadata(nodeRef, version, dryRun);
        }
        
        if (version.hasContent())
        {
            importVersionContent(nodeRef, version, dryRun);
        }
    }
    
    
    private final void importVersionMetadata(final NodeRef               nodeRef,
                                             final BulkImportItemVersion version,
                                             final boolean               dryRun)
        throws InterruptedException
    {
        String                    type     = version.getType();
        Set<String>               aspects  = version.getAspects();
        Map<String, Serializable> metadata = version.getMetadata();
        
        if (type != null)
        {
            if (dryRun)
            {
                if (info(log)) info(log, "[DRY RUN] Would have set type of '" + String.valueOf(nodeRef) + "' to '" + String.valueOf(type) + "'.");
            }
            else
            {
                if (trace(log)) trace(log, "Setting type of '" + String.valueOf(nodeRef) + "' to '" + String.valueOf(type) + "'.");
                nodeService.setType(nodeRef, createQName(serviceRegistry, type));
            }
        }
        
        if (aspects != null)
        {
            for (final String aspect : aspects)
            {
                if (importStatus.isStopping() || Thread.currentThread().isInterrupted()) throw new InterruptedException(Thread.currentThread().getName() + " was interrupted. Terminating early.");

                if (dryRun)
                {
                    if (info(log)) info(log, "[DRY RUN] Would have added aspect '" + aspect + "' to '" + String.valueOf(nodeRef) + "'.");
                }
                else
                {
                    if (trace(log)) trace(log, "Adding aspect '" + aspect + "' to '" + String.valueOf(nodeRef) + "'.");
                    nodeService.addAspect(nodeRef, createQName(serviceRegistry, aspect), null);
                }
            }
        }
        
        if (version.hasMetadata())
        {
            if (metadata == null) throw new IllegalStateException("The import source has logic errors - it says it has metadata, but the metadata is null.");

            
            // QName all the keys.  It's baffling that NodeService doesn't have a method that accepts a Map<String, Serializable>, when things like VersionService do...
            Map<QName, Serializable> qNamedMetadata = new HashMap<>(metadata.size());
            
            for (final String key : metadata.keySet())
            {
                if (importStatus.isStopping() || Thread.currentThread().isInterrupted()) throw new InterruptedException(Thread.currentThread().getName() + " was interrupted. Terminating early.");
                
                QName        keyQName = createQName(serviceRegistry, key);
                Serializable value    = metadata.get(key);
                
                qNamedMetadata.put(keyQName, value);
            }

            if (dryRun)
            {
                if (info(log)) info(log, "[DRY RUN] Would have added the following properties to '" + String.valueOf(nodeRef) +
                                         "':\n" + Arrays.toString(qNamedMetadata.entrySet().toArray()));
            }
            else
            {
                try
                {
                    if (trace(log)) trace(log, "Adding the following properties to '" + String.valueOf(nodeRef) +
                                               "':\n" + Arrays.toString(qNamedMetadata.entrySet().toArray()));
                    nodeService.addProperties(nodeRef, qNamedMetadata);
                }
                catch (final InvalidNodeRefException inre)
                {
                    if (!nodeRef.equals(inre.getNodeRef()))
                    {
                        // Caused by an invalid NodeRef in the metadata (e.g. in an association)
                        throw new IllegalStateException("Invalid nodeRef found in metadata file '" + version.getMetadataSource() + "'.  " +
                                                        "Probable cause: an association is being populated via metadata, but the " +
                                                        "NodeRef for the target of that association ('" + inre.getNodeRef() + "') is invalid.  " +
                                                        "Please double check your metadata file and try again.", inre);
                    }
                    else
                    {
                        // Logic bug in the BFSIT.  :-(
                        throw inre;
                    }
                }
            }
        }
    }
    

    private final void importVersionContent(final NodeRef               nodeRef,
                                            final BulkImportItemVersion version,
                                            final boolean               dryRun)
        throws InterruptedException
    {
        if (version.hasContent())
        {
            if (version.contentIsInPlace())
            {
                if (dryRun)
                {
                    if (info(log)) info(log, "[DRY RUN] Content for node '" + String.valueOf(nodeRef) + "' is in-place.");
                }
                else
                {
                    if (trace(log)) trace(log, "Content for node '" + String.valueOf(nodeRef) + "' is in-place.");
                }
                
                if (!version.hasMetadata() ||
                    version.getMetadata() == null ||
                    (!version.getMetadata().containsKey(ContentModel.PROP_CONTENT.toPrefixString()) &&
                     !version.getMetadata().containsKey(ContentModel.PROP_CONTENT.toString())))
                {
                    throw new IllegalStateException("The source system you selected is incorrectly implemented - it is reporting" +
                                                    " that content is in place for '" + version.getContentSource() +
                                                    "', but the metadata doesn't contain the '" + String.valueOf(ContentModel.PROP_CONTENT) +
                                                    "' property.");
                }
                
                importStatus.incrementTargetCounter(BulkImportStatus.TARGET_COUNTER_IN_PLACE_CONTENT_LINKED);
            }
            else  // Content needs to be streamed into the repository
            {
                if (dryRun)
                {
                    if (info(log)) info(log, "[DRY RUN] Would have streamed content from '" + version.getContentSource() + "' into node '" + String.valueOf(nodeRef) + "'.");
                }
                else
                {
                    if (trace(log)) trace(log, "Streaming content from '" + version.getContentSource() + "' into node '" + String.valueOf(nodeRef) + "'.");
                    
                    ContentWriter writer = contentService.getWriter(nodeRef, ContentModel.PROP_CONTENT, true);
                    version.putContent(writer);

                    if (trace(log)) trace(log, "Finished streaming content from '" + version.getContentSource() + "' into node '" + String.valueOf(nodeRef) + "'.");
                }
                
                importStatus.incrementTargetCounter(BulkImportStatus.TARGET_COUNTER_CONTENT_STREAMED);
            }
        }
    }
    
}