diff --git a/src/Oro/Bundle/TrackingBundle/Command/ImportLogsCommand.php b/src/Oro/Bundle/TrackingBundle/Command/ImportLogsCommand.php index 815b212240f..f810a5fca9e 100644 --- a/src/Oro/Bundle/TrackingBundle/Command/ImportLogsCommand.php +++ b/src/Oro/Bundle/TrackingBundle/Command/ImportLogsCommand.php @@ -3,7 +3,9 @@ namespace Oro\Bundle\TrackingBundle\Command; use Akeneo\Bundle\BatchBundle\Job\BatchStatus; +use Akeneo\Bundle\BatchBundle\Job\DoctrineJobRepository; +use Doctrine\DBAL\Types\Type; use Doctrine\ORM\QueryBuilder; use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand; @@ -85,19 +87,23 @@ public function execute(InputInterface $input, OutputInterface $output) $fileName = $file->getFilename(); $options = [ - 'entityName' => $this->getContainer()->getParameter('oro_tracking.tracking_data.class'), - 'processorAlias' => 'oro_tracking.processor.data', - 'file' => $pathName + ProcessorRegistry::TYPE_IMPORT => [ + 'entityName' => $this->getContainer()->getParameter('oro_tracking.tracking_data.class'), + 'processorAlias' => 'oro_tracking.processor.data', + 'file' => $pathName, + ], ]; if ($this->isFileProcessed($options)) { + $output->writeln(sprintf('"%s" already processed', $fileName)); + continue; } $jobResult = $this->getJobExecutor()->executeJob( ProcessorRegistry::TYPE_IMPORT, 'import_log_to_database', - ['import' => $options] + $options ); if ($jobResult->isSuccessful()) { @@ -132,26 +138,26 @@ protected function getJobExecutor() */ protected function isFileProcessed(array $options) { - $className = 'Akeneo\Bundle\BatchBundle\Entity\JobExecution'; + /** @var DoctrineJobRepository $repo */ + $repo = $this->getContainer()->get('akeneo_batch.job_repository'); + + $manager = $repo->getJobManager(); - $qb = $this - ->getContainer() - ->get('doctrine') - ->getManagerForClass($className) - ->getRepository($className) + $qb = $manager + ->getRepository('Akeneo\Bundle\BatchBundle\Entity\JobExecution') ->createQueryBuilder('je'); /** @var QueryBuilder $qb */ $result = $qb ->select('COUNT(je) as jobs') ->leftJoin('je.jobInstance', 'ji') - ->where('je.status NOT IN (:statuses)') + ->where($qb->expr()->lt('je.status', ':status')) + ->setParameter('status', BatchStatus::FAILED) + ->andWhere('ji.rawConfiguration = :rawConfiguration') ->setParameter( - 'statuses', - [BatchStatus::STARTING, BatchStatus::STARTED] + 'rawConfiguration', + $manager->getConnection()->convertToDatabaseValue($options, Type::TARRAY) ) - ->andWhere('ji.rawConfiguration = :rawConfiguration') - ->setParameter('rawConfiguration', serialize($options)) ->getQuery() ->getOneOrNullResult(); diff --git a/src/Oro/Bundle/TrackingBundle/Controller/TrackingDataController.php b/src/Oro/Bundle/TrackingBundle/Controller/TrackingDataController.php index 624adfe113e..2d7a0c9277f 100644 --- a/src/Oro/Bundle/TrackingBundle/Controller/TrackingDataController.php +++ b/src/Oro/Bundle/TrackingBundle/Controller/TrackingDataController.php @@ -29,7 +29,7 @@ public function createAction(Request $request) ProcessorRegistry::TYPE_IMPORT, 'import_request_to_database', [ - 'import' => [ + ProcessorRegistry::TYPE_IMPORT => [ 'entityName' => $this->container->getParameter('oro_tracking.tracking_data.class'), 'processorAlias' => 'oro_tracking.processor.data', 'data' => $request->query->all(), @@ -44,7 +44,8 @@ public function createAction(Request $request) $response['errors'] = $jobResult->getFailureExceptions(); } - if ($validationErrors = $jobResult->getContext()->getErrors()) { + $validationErrors = $jobResult->getContext()->getErrors(); + if ($validationErrors) { $isSuccessful = false; $response['validation'] = $validationErrors; diff --git a/src/Oro/Bundle/TrackingBundle/Tests/Functional/Command/ImportLogsCommandTest.php b/src/Oro/Bundle/TrackingBundle/Tests/Functional/Command/ImportLogsCommandTest.php index 84465a94f5f..ef247fc5930 100644 --- a/src/Oro/Bundle/TrackingBundle/Tests/Functional/Command/ImportLogsCommandTest.php +++ b/src/Oro/Bundle/TrackingBundle/Tests/Functional/Command/ImportLogsCommandTest.php @@ -4,6 +4,7 @@ use Symfony\Component\Filesystem\Filesystem; +use Oro\Bundle\ImportExportBundle\Processor\ProcessorRegistry; use Oro\Bundle\TestFrameworkBundle\Test\WebTestCase; /** @@ -33,10 +34,17 @@ protected function setUp() $this->fs = new Filesystem(); $this->directory = sys_get_temp_dir() . DIRECTORY_SEPARATOR . time(); + + $this->getContainer()->get('akeneo_batch.job_repository')->getJobManager()->beginTransaction(); } protected function tearDown() { + // clear DB from separate connection + $manager = $this->getContainer()->get('akeneo_batch.job_repository')->getJobManager(); + $manager->rollback(); + $manager->getConnection()->close(); + $this->fs->remove($this->directory); } @@ -87,4 +95,33 @@ public function testCurrentFileNotProcessed() $this->assertFileExists($this->directory . DIRECTORY_SEPARATOR . $file); $this->assertNotContains(sprintf('Successful: "%s"', $file), $result); } + + public function testIsFileProcessed() + { + $date = new \DateTime('now', new \DateTimeZone('UTC')); + $fileName = $date->modify('-1 day')->format('Ymd-H') . '-60-1.log'; + $file = $this->directory . DIRECTORY_SEPARATOR . $fileName; + + $this->fs->dumpFile($file, json_encode(['prop' => 'value'])); + + $jobResult = $this->getContainer()->get('oro_importexport.job_executor')->executeJob( + ProcessorRegistry::TYPE_IMPORT, + 'import_log_to_database', + [ + ProcessorRegistry::TYPE_IMPORT => [ + 'entityName' => $this->getContainer()->getParameter('oro_tracking.tracking_data.class'), + 'processorAlias' => 'oro_tracking.processor.data', + 'file' => $file, + ], + ] + ); + $this->assertTrue($jobResult->isSuccessful()); + + $result = $this->runCommand( + 'oro:cron:import-tracking', + ['--directory' => $this->directory] + ); + $this->assertFileNotExists($this->directory . DIRECTORY_SEPARATOR . $file); + $this->assertContains(sprintf('"%s" already processed', $fileName), $result); + } } diff --git a/src/Oro/Bundle/WorkflowBundle/Tests/Functional/Entity/Repository/ProcessJobRepositoryTest.php b/src/Oro/Bundle/WorkflowBundle/Tests/Functional/Entity/Repository/ProcessJobRepositoryTest.php index bbaaeac3e44..931b3819b91 100644 --- a/src/Oro/Bundle/WorkflowBundle/Tests/Functional/Entity/Repository/ProcessJobRepositoryTest.php +++ b/src/Oro/Bundle/WorkflowBundle/Tests/Functional/Entity/Repository/ProcessJobRepositoryTest.php @@ -37,30 +37,32 @@ class ProcessJobRepositoryTest extends WebTestCase protected function setUp() { $this->initClient(); + + $this->getContainer()->get('akeneo_batch.job_repository')->getJobManager()->beginTransaction(); + $this->dropJobsRecords(); $this->registry = $this->getContainer()->get('doctrine'); $this->entityManager = $this->registry->getManagerForClass('OroWorkflowBundle:ProcessJob'); $this->repository = $this->registry->getRepository('OroWorkflowBundle:ProcessJob'); - $this->loadFixtures(array('Oro\Bundle\WorkflowBundle\Tests\Functional\DataFixtures\LoadProcessEntities')); + $this->loadFixtures(['Oro\Bundle\WorkflowBundle\Tests\Functional\DataFixtures\LoadProcessEntities']); } protected function tearDown() { - parent::tearDown(); + // clear DB from separate connection + $manager = $this->getContainer()->get('akeneo_batch.job_repository')->getJobManager(); + $manager->rollback(); + $manager->getConnection()->close(); $this->dropJobsRecords(); + + parent::tearDown(); } protected function dropJobsRecords() { - // clear DB from separate connection - $batchJobManager = $this->getContainer()->get('akeneo_batch.job_repository')->getJobManager(); - $batchJobManager->createQuery('DELETE AkeneoBatchBundle:JobInstance')->execute(); - $batchJobManager->createQuery('DELETE AkeneoBatchBundle:JobExecution')->execute(); - $batchJobManager->createQuery('DELETE AkeneoBatchBundle:StepExecution')->execute(); - $this->getContainer() ->get('doctrine') ->getManager() @@ -89,7 +91,7 @@ public function testFindByIds() $this->assertCount($count, $expectedJobs); - $ids = array(); + $ids = []; /** @var ProcessJob $job */ foreach ($expectedJobs as $job) { $ids[] = $job->getId(); @@ -146,10 +148,10 @@ protected function createProcessJobs($count) ); $trigger = $this->entityManager->getRepository('OroWorkflowBundle:ProcessTrigger') - ->findOneBy(array('definition' => $definition)); + ->findOneBy(['definition' => $definition]); $entity = $this->getUser(); - $entityHashes = array(); + $entityHashes = []; for ($i = 0; $i < $count; $i++) { $processData = new ProcessData();