diff --git a/core/src/main/java/org/polypheny/db/catalog/Catalog.java b/core/src/main/java/org/polypheny/db/catalog/Catalog.java index 9f1a88989c..b4518bdcd5 100644 --- a/core/src/main/java/org/polypheny/db/catalog/Catalog.java +++ b/core/src/main/java/org/polypheny/db/catalog/Catalog.java @@ -24,11 +24,9 @@ import java.util.Map; import java.util.Optional; import java.util.function.Function; -import java.util.function.Supplier; import org.apache.calcite.linq4j.tree.Expression; import org.apache.calcite.linq4j.tree.Expressions; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; import org.pf4j.ExtensionPoint; import org.polypheny.db.adapter.AbstractAdapterSetting; import org.polypheny.db.adapter.Adapter; @@ -51,7 +49,6 @@ import org.polypheny.db.catalog.snapshot.Snapshot; import org.polypheny.db.iface.QueryInterfaceManager.QueryInterfaceTemplate; import org.polypheny.db.transaction.Transaction; -import org.polypheny.db.util.Pair; import org.polypheny.db.util.RunMode; public abstract class Catalog implements ExtensionPoint { @@ -111,14 +108,8 @@ public static void afterInit( Runnable action ) { public abstract String getJson(); - public abstract void executeCommitActions(); - - public abstract void clearCommitActions(); - public abstract void commit(); - public abstract Pair<@NotNull Boolean, @Nullable String> checkIntegrity(); - public abstract void rollback(); public abstract LogicalRelationalCatalog getLogicalRel( long namespaceId ); @@ -282,9 +273,4 @@ public static Snapshot snapshot() { public abstract void restore( Transaction transaction ); - - public abstract void attachCommitConstraint( Supplier constraintChecker, String description ); - - public abstract void attachCommitAction( Runnable action ); - } diff --git a/core/src/main/java/org/polypheny/db/catalog/impl/PolyCatalog.java b/core/src/main/java/org/polypheny/db/catalog/impl/PolyCatalog.java index 4a97ed4515..67fae1924d 100644 --- a/core/src/main/java/org/polypheny/db/catalog/impl/PolyCatalog.java +++ b/core/src/main/java/org/polypheny/db/catalog/impl/PolyCatalog.java @@ -26,20 +26,15 @@ import io.activej.serializer.annotations.Deserialize; import io.activej.serializer.annotations.Serialize; import java.beans.PropertyChangeListener; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Supplier; import java.util.stream.Collectors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; import org.polypheny.db.adapter.AbstractAdapterSetting; import org.polypheny.db.adapter.Adapter; import org.polypheny.db.adapter.AdapterManager; @@ -79,7 +74,6 @@ import org.polypheny.db.catalog.persistance.Persister; import org.polypheny.db.catalog.snapshot.Snapshot; import org.polypheny.db.catalog.snapshot.impl.SnapshotBuilder; -import org.polypheny.db.catalog.util.ConstraintCondition; import org.polypheny.db.config.RuntimeConfig; import org.polypheny.db.iface.QueryInterfaceManager.QueryInterfaceTemplate; import org.polypheny.db.nodes.Node; @@ -109,12 +103,6 @@ public class PolyCatalog extends Catalog implements PolySerializable { @Getter private final BinarySerializer serializer = PolySerializable.buildSerializer( PolyCatalog.class ); - /** - * Constraints which have to be met before a commit can be executed. - */ - private final Collection commitConstraints = new ConcurrentLinkedDeque<>(); - private final Collection commitActions = new ConcurrentLinkedDeque<>(); - // indicates if the state has advanced and the snapshot has to be recreated or can be reused // trx without ddl private long lastCommitSnapshotId = 0; @@ -243,19 +231,6 @@ public String getJson() { } - @Override - public void executeCommitActions() { - // execute physical changes - commitActions.forEach( Runnable::run ); - } - - - @Override - public void clearCommitActions() { - commitActions.clear(); - } - - public synchronized void commit() { if ( !this.dirty.get() ) { log.debug( "Nothing changed" ); @@ -275,33 +250,12 @@ public synchronized void commit() { updateSnapshot(); persister.write( backup ); this.dirty.set( false ); - this.commitConstraints.clear(); - this.commitActions.clear(); this.lastCommitSnapshotId = snapshot.id(); } - @Override - public Pair<@NotNull Boolean, @Nullable String> checkIntegrity() { - // check constraints e.g. primary key constraints - List> fails = commitConstraints - .stream() - .map( c -> Pair.of( c.condition().get(), c.errorMessage() ) ) - .filter( c -> !c.left ) - .toList(); - - if ( !fails.isEmpty() ) { - commitConstraints.clear(); - return Pair.of( false, "DDL constraints not met: \n" + fails.stream().map( f -> f.right ).collect( Collectors.joining( ",\n " ) ) + "." ); - } - return Pair.of( true, null ); - } - - public void rollback() { - commitActions.clear(); - commitConstraints.clear(); restoreLastState(); @@ -570,18 +524,6 @@ private void restoreViews( Transaction transaction ) { } - @Override - public void attachCommitConstraint( Supplier constraintChecker, String description ) { - commitConstraints.add( new ConstraintCondition( constraintChecker, description ) ); - } - - - @Override - public void attachCommitAction( Runnable action ) { - commitActions.add( action ); - } - - @Override public void close() { log.error( "closing" ); diff --git a/core/src/main/java/org/polypheny/db/ddl/DdlManager.java b/core/src/main/java/org/polypheny/db/ddl/DdlManager.java index c4c8653198..951b5e4ccc 100644 --- a/core/src/main/java/org/polypheny/db/ddl/DdlManager.java +++ b/core/src/main/java/org/polypheny/db/ddl/DdlManager.java @@ -45,6 +45,7 @@ import org.polypheny.db.nodes.Node; import org.polypheny.db.partition.raw.RawPartitionInformation; import org.polypheny.db.transaction.Statement; +import org.polypheny.db.transaction.Transaction; import org.polypheny.db.transaction.TransactionException; import org.polypheny.db.type.PolyType; import org.polypheny.db.type.entity.PolyValue; @@ -116,6 +117,7 @@ public static DdlManager getInstance() { /** * Adds a new data source(adapter) * + * @param transaction current transaction * @param uniqueName unique name of the newly created source * @param adapterName name of source, which is used to create the source * @param namespace the target namespace for the adapter @@ -123,7 +125,7 @@ public static DdlManager getInstance() { * @param config configuration for the source * @param mode the deploy mode */ - public abstract void createSource( String uniqueName, String adapterName, long namespace, AdapterType adapterType, Map config, DeployMode mode ); + public abstract void createSource( Transaction transaction, String uniqueName, String adapterName, long namespace, AdapterType adapterType, Map config, DeployMode mode ); /** @@ -157,6 +159,7 @@ public static DdlManager getInstance() { /** * Add a column to an existing table * + * @param transaction current transaction * @param columnName the name of the new column * @param table the table * @param beforeColumnName the column before which the new column should be positioned; can be null @@ -166,7 +169,7 @@ public static DdlManager getInstance() { * @param defaultValue a default value for the column; can be null * @param statement the query statement */ - public abstract void createColumn( String columnName, LogicalTable table, String beforeColumnName, String afterColumnName, ColumnTypeInformation type, boolean nullable, PolyValue defaultValue, Statement statement ); + public abstract void createColumn( Transaction transaction, String columnName, LogicalTable table, String beforeColumnName, String afterColumnName, ColumnTypeInformation type, boolean nullable, PolyValue defaultValue, Statement statement ); /** * Add a foreign key to a table @@ -248,10 +251,11 @@ public static DdlManager getInstance() { /** * Drop a specific constraint from a table * + * @param transaction current transaction * @param table the table * @param constraintName the name of the constraint to be dropped */ - public abstract void dropConstraint( LogicalTable table, String constraintName ); + public abstract void dropConstraint( Transaction transaction, LogicalTable table, String constraintName ); /** * Drop a foreign key of a table diff --git a/core/src/main/java/org/polypheny/db/transaction/Transaction.java b/core/src/main/java/org/polypheny/db/transaction/Transaction.java index fc08399d50..d8e264999d 100644 --- a/core/src/main/java/org/polypheny/db/transaction/Transaction.java +++ b/core/src/main/java/org/polypheny/db/transaction/Transaction.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import org.jetbrains.annotations.Nullable; import org.polypheny.db.adapter.Adapter; import org.polypheny.db.adapter.java.JavaTypeFactory; @@ -44,6 +45,10 @@ public interface Transaction { LogicalUser getUser(); + void attachCommitAction( Runnable action ); + + void attachCommitConstraint( Supplier constraintChecker, String description ); + void commit() throws TransactionException; /** diff --git a/core/src/test/java/org/polypheny/db/catalog/MockCatalog.java b/core/src/test/java/org/polypheny/db/catalog/MockCatalog.java index 7a3f583fa3..e509d8e174 100644 --- a/core/src/test/java/org/polypheny/db/catalog/MockCatalog.java +++ b/core/src/test/java/org/polypheny/db/catalog/MockCatalog.java @@ -20,10 +20,7 @@ import java.util.Collection; import java.util.Map; import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.function.Supplier; import org.apache.commons.lang3.NotImplementedException; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; import org.polypheny.db.adapter.DeployMode; import org.polypheny.db.catalog.catalogs.AllocationDocumentCatalog; import org.polypheny.db.catalog.catalogs.AllocationGraphCatalog; @@ -37,7 +34,6 @@ import org.polypheny.db.catalog.entity.LogicalUser; import org.polypheny.db.catalog.logistic.DataModel; import org.polypheny.db.catalog.snapshot.Snapshot; -import org.polypheny.db.util.Pair; /** @@ -212,34 +208,4 @@ public void clear() { throw new NotImplementedException(); } - - @Override - public void executeCommitActions() { - this.commitActions.forEach( Runnable::run ); - } - - - @Override - public void clearCommitActions() { - this.commitActions.clear(); - } - - - @Override - public void attachCommitConstraint( Supplier constraintChecker, String description ) { - // empty on purpose - } - - - @Override - public void attachCommitAction( Runnable action ) { - commitActions.add( action ); - } - - - @Override - public Pair<@NotNull Boolean, @Nullable String> checkIntegrity() { - return Pair.of( true, null ); - } - } diff --git a/dbms/src/main/java/org/polypheny/db/PolyphenyDb.java b/dbms/src/main/java/org/polypheny/db/PolyphenyDb.java index bc175d51aa..b9f47f9b1b 100644 --- a/dbms/src/main/java/org/polypheny/db/PolyphenyDb.java +++ b/dbms/src/main/java/org/polypheny/db/PolyphenyDb.java @@ -559,7 +559,7 @@ private void restore( Authenticator authenticator, Catalog catalog ) { Catalog.defaultStore = AdapterTemplate.fromString( defaultStoreName, AdapterType.STORE ); Catalog.defaultSource = AdapterTemplate.fromString( defaultSourceName, AdapterType.SOURCE ); - restoreDefaults( catalog, mode ); + restoreDefaults( trx, catalog, mode ); QueryInterfaceManager.getInstance().restoreInterfaces( catalog.getSnapshot() ); @@ -585,9 +585,9 @@ private void commitRestore( Transaction trx ) { * @param catalog the current catalog * @param mode the current mode */ - private static void restoreDefaults( Catalog catalog, RunMode mode ) { + private static void restoreDefaults( Transaction transaction, Catalog catalog, RunMode mode ) { catalog.updateSnapshot(); - DefaultInserter.resetData( DdlManager.getInstance(), mode ); + DefaultInserter.resetData( transaction, DdlManager.getInstance(), mode ); DefaultInserter.restoreInterfacesIfNecessary( catalog ); } diff --git a/dbms/src/main/java/org/polypheny/db/ddl/DdlManagerImpl.java b/dbms/src/main/java/org/polypheny/db/ddl/DdlManagerImpl.java index 81f973d537..5d3d50c873 100644 --- a/dbms/src/main/java/org/polypheny/db/ddl/DdlManagerImpl.java +++ b/dbms/src/main/java/org/polypheny/db/ddl/DdlManagerImpl.java @@ -109,6 +109,7 @@ import org.polypheny.db.processing.DataMigrator; import org.polypheny.db.routing.RoutingManager; import org.polypheny.db.transaction.Statement; +import org.polypheny.db.transaction.Transaction; import org.polypheny.db.transaction.TransactionException; import org.polypheny.db.type.ArrayType; import org.polypheny.db.type.PolyType; @@ -209,7 +210,7 @@ public void createStore( String uniqueName, String adapterName, AdapterType adap @Override - public void createSource( String uniqueName, String adapterName, long namespace, AdapterType adapterType, Map config, DeployMode mode ) { + public void createSource( Transaction transaction, String uniqueName, String adapterName, long namespace, AdapterType adapterType, Map config, DeployMode mode ) { uniqueName = uniqueName.toLowerCase(); DataSource adapter = (DataSource) AdapterManager.getInstance().addAdapter( adapterName, uniqueName, adapterType, mode, config ); @@ -270,7 +271,7 @@ public void createSource( String uniqueName, String adapterName, long namespace, buildNamespace( Catalog.defaultNamespaceId, logical, adapter ); - catalog.attachCommitAction( () -> + transaction.attachCommitAction( () -> // we can execute with initial logical and allocation data as this is a source and this will not change adapter.createTable( null, LogicalTableWrapper.of( logical, columns, List.of() ), AllocationTableWrapper.of( allocation.unwrapOrThrow( AllocationTable.class ), aColumns ) ) ); catalog.updateSnapshot(); @@ -455,7 +456,7 @@ private void updateColumnPosition( LogicalTable table, LogicalColumn column, int @Override - public void createColumn( String columnName, LogicalTable table, String beforeColumnName, String afterColumnName, ColumnTypeInformation type, boolean nullable, PolyValue defaultValue, Statement statement ) { + public void createColumn( Transaction transaction, String columnName, LogicalTable table, String beforeColumnName, String afterColumnName, ColumnTypeInformation type, boolean nullable, PolyValue defaultValue, Statement statement ) { columnName = adjustNameIfNeeded( columnName, table.namespaceId ); // Check if the column either allows null values or has a default value defined. if ( defaultValue == null && !nullable ) { @@ -494,7 +495,7 @@ public void createColumn( String columnName, LogicalTable table, String beforeCo List> stores = RoutingManager.getInstance().getCreatePlacementStrategy().getDataStoresForNewRelField( addedColumn ); // Add column on underlying data stores and insert default value - catalog.attachCommitAction( () -> { + transaction.attachCommitAction( () -> { for ( DataStore store : stores ) { AllocationPlacement placement = catalog.getSnapshot().alloc().getPlacement( store.getAdapterId(), table.id ).orElseThrow(); @@ -760,7 +761,7 @@ public void createAllocationPlacement( LogicalTable table, List n catalog.updateSnapshot(); List finalColumns = adjustedColumns; - catalog.attachCommitAction( () -> { + statement.getTransaction().attachCommitAction( () -> { // Copy data to the newly added placements DataMigrator dataMigrator = statement.getTransaction().getDataMigrator(); dataMigrator.copyData( statement.getTransaction(), catalog.getSnapshot().getAdapter( dataStore.getAdapterId() ).orElseThrow(), table, finalColumns, placement ); @@ -792,7 +793,7 @@ public void createPrimaryKey( LogicalTable table, List columnNames, Stat } if ( oldPk != null ) { - dropConstraint( table, ConstraintType.PRIMARY.name() ); + dropConstraint( statement.getTransaction(), table, ConstraintType.PRIMARY.name() ); } catalog.getLogicalRel( table.namespaceId ).addPrimaryKey( table.id, columnIds, statement ); @@ -812,7 +813,7 @@ public void createPrimaryKey( LogicalTable table, List columnNames, Stat placement.adapterId, PlacementType.AUTOMATIC, 0 ); - catalog.attachCommitAction( () -> { + statement.getTransaction().attachCommitAction( () -> { for ( AllocationPartition partition : catalog.getSnapshot().alloc().getPartitionsFromLogical( table.id ) ) { AllocationEntity entity = catalog.getSnapshot().alloc().getAlloc( placement.id, partition.id ).orElseThrow(); AdapterManager.getInstance().getStore( placement.adapterId ).orElseThrow().addColumn( @@ -905,7 +906,7 @@ private void deleteAllocationColumn( LogicalTable table, Statement statement, Al if ( table.entityType == EntityType.ENTITY ) { // we use closure to cache the physical we have to delete later List allocsOfPlacement = catalog.getSnapshot().alloc().getAllocsOfPlacement( allocationColumn.placementId ); - catalog.attachCommitAction( () -> { + statement.getTransaction().attachCommitAction( () -> { for ( AllocationEntity allocation : allocsOfPlacement ) { AdapterManager.getInstance().getStore( allocationColumn.adapterId ) .orElseThrow() @@ -937,7 +938,7 @@ private void checkModelLogic( LogicalTable catalogTable, String columnName ) { @Override - public void dropConstraint( LogicalTable table, String constraintName ) { + public void dropConstraint( Transaction transaction, LogicalTable table, String constraintName ) { // Make sure that this is a table of type TABLE (and not SOURCE) checkIfDdlPossible( table.entityType ); @@ -946,7 +947,7 @@ public void dropConstraint( LogicalTable table, String constraintName ) { Supplier stillUsed = () -> getKeyUniqueCount( constraint.keyId ) < 2; if ( constraint.type == ConstraintType.UNIQUE && isForeignKey( constraint.key.id ) && stillUsed.get() ) { // maybe we delete multiple constraints in this transaction, so we need to check again - catalog.attachCommitConstraint( + transaction.attachCommitConstraint( stillUsed, "The constraint " + constraintName + " is used on a key which is referenced by at least one foreign key which requires this key to be unique. Unable to drop unique constraint." ); } @@ -1013,7 +1014,7 @@ public void dropIndex( LogicalTable table, String indexName, Statement statement DataStore store = AdapterManager.getInstance().getStore( index.location ).orElseThrow(); AllocationPlacement placement = catalog.getSnapshot().alloc().getPlacement( store.getAdapterId(), table.id ).orElseThrow(); - catalog.attachCommitAction( () -> { + statement.getTransaction().attachCommitAction( () -> { catalog.getSnapshot().alloc().getAllocsOfPlacement( placement.id ).forEach( allocation -> { store.dropIndex( statement.getPrepareContext(), index, List.of( allocation.id ) ); } ); @@ -1042,7 +1043,7 @@ public void dropPlacement( LogicalTable table, DataStore store, Statement sta // Delete polystore index IndexManager.getInstance().deleteIndex( index ); } else { - catalog.attachCommitAction( () -> { + statement.getTransaction().attachCommitAction( () -> { // Delete index on storeId AdapterManager.getInstance().getStore( index.location ) .orElseThrow() @@ -1074,7 +1075,7 @@ public void dropPlacement( LogicalTable table, DataStore store, Statement sta private void dropAllocation( long namespaceId, DataStore store, Statement statement, long allocId ) { // Physically delete the data from the storeId - catalog.attachCommitAction( () -> { + statement.getTransaction().attachCommitAction( () -> { store.dropTable( statement.getPrepareContext(), allocId ); } ); @@ -1116,7 +1117,7 @@ public void setColumnType( LogicalTable table, String columnName, ColumnTypeInfo type.cardinality() ); catalog.updateSnapshot(); for ( AllocationColumn allocationColumn : catalog.getSnapshot().alloc().getColumnFromLogical( logicalColumn.id ).orElseThrow() ) { - catalog.attachCommitAction( () -> { + statement.getTransaction().attachCommitAction( () -> { for ( AllocationEntity allocation : catalog.getSnapshot().alloc().getAllocsOfPlacement( allocationColumn.placementId ) ) { AdapterManager.getInstance().getStore( allocationColumn.adapterId ) .orElseThrow() @@ -1433,13 +1434,13 @@ public void modifyPartitionPlacement( LogicalTable table, List partitionId for ( long partitionId : addedPartitions ) { AllocationTable allocation = addAllocationTable( table.namespaceId, statement, table, placement.id, partitionId, store, true ); - catalog.attachCommitAction( () -> { + statement.getTransaction().attachCommitAction( () -> { dataMigrator.copyData( statement.getTransaction(), catalog.getSnapshot().getAdapter( storeId ).orElseThrow(), table, columns, allocation ); } ); } // Add indexes on this new Partition Placement if there is already an index - catalog.attachCommitAction( () -> { + statement.getTransaction().attachCommitAction( () -> { for ( LogicalIndex currentIndex : catalog.getSnapshot().rel().getIndexes( table.id, false ) ) { if ( currentIndex.location == storeId ) { store.addIndex( statement.getPrepareContext(), currentIndex, catalog.getSnapshot().alloc().getAllocsOfPlacement( placement.id ).stream().map( a -> a.unwrapOrThrow( AllocationTable.class ) ).toList() ); @@ -1451,7 +1452,7 @@ public void modifyPartitionPlacement( LogicalTable table, List partitionId if ( !removedPartitions.isEmpty() ) { // Remove indexes - catalog.attachCommitAction( () -> { + statement.getTransaction().attachCommitAction( () -> { for ( LogicalIndex currentIndex : catalog.getSnapshot().rel().getIndexes( table.id, false ) ) { if ( currentIndex.location == storeId ) { store.dropIndex( null, currentIndex, removedPartitions.stream().map( p -> p.partitionId ).toList() ); @@ -1497,7 +1498,7 @@ public void createColumnPlacement( LogicalTable table, LogicalColumn logicalColu PlacementType.MANUAL ); } else { - catalog.attachCommitAction( () -> { + statement.getTransaction().attachCommitAction( () -> { // Create column placement catalog.getAllocRel( table.namespaceId ).addColumn( placement.id, @@ -1557,7 +1558,7 @@ public void dropColumnPlacement( LogicalTable table, LogicalColumn column, DataS if ( primaryKey.fieldIds.contains( column.id ) ) { throw new GenericRuntimeException( "Cannot drop primary key" ); } - catalog.attachCommitAction( () -> { + statement.getTransaction().attachCommitAction( () -> { for ( AllocationPartition partition : catalog.getSnapshot().alloc().getPartitionsFromLogical( table.id ) ) { AllocationEntity allocation = catalog.getSnapshot().alloc().getAlloc( placement.id, partition.id ).orElseThrow(); // Drop Column on store @@ -1771,7 +1772,7 @@ public void createMaterializedView( String viewName, long namespaceId, AlgRoot a catalog.updateSnapshot(); List> finalStores = stores; - catalog.attachCommitAction( () -> { + statement.getTransaction().attachCommitAction( () -> { // Selected data from tables is added into the newly crated materialized view MaterializedViewManager materializedManager = MaterializedViewManager.getInstance(); materializedManager.addData( statement.getTransaction(), finalStores, algRoot, view ); @@ -2075,7 +2076,7 @@ public void createTable( long namespaceId, String name, List f if ( constraints.stream().noneMatch( c -> c.type == ConstraintType.PRIMARY ) ) { // no primary was set for now, we attach condition to check on commit - catalog.attachCommitConstraint( + statement.getTransaction().attachCommitConstraint( () -> logical.primaryKey != null && catalog.getSnapshot().rel().getPrimaryKey( logical.primaryKey ).isPresent(), "No primary key defined for table: " + name ); } @@ -2160,7 +2161,7 @@ private AllocationTable addAllocationTable( long namespaceId, Statement statemen if ( postpone ) { // normally we want to postpone - catalog.attachCommitAction( action ); + statement.getTransaction().attachCommitAction( action ); } else { // we are already committing currently action.run(); @@ -2361,7 +2362,7 @@ public void createTablePartition( PartitionInformation partitionInfo, List sourceAllocs = new ArrayList<>( catalog.getSnapshot().alloc().getFromLogical( unPartitionedTable.id ).stream().filter( a -> initialProperty.partitionIds.contains( a.partitionId ) ).toList() ); List> finalStores = stores; - catalog.attachCommitAction( () -> { + statement.getTransaction().attachCommitAction( () -> { for ( AllocationPlacement placement : placements ) { if ( !fillStores ) { continue; @@ -2401,7 +2402,7 @@ public void createTablePartition( PartitionInformation partitionInfo, List ds = AdapterManager.getInstance().getStore( index.location ).orElseThrow(); - catalog.attachCommitAction( () -> ds.dropIndex( statement.getPrepareContext(), index, result.right.partitionIds ) ); + statement.getTransaction().attachCommitAction( () -> ds.dropIndex( statement.getPrepareContext(), index, result.right.partitionIds ) ); catalog.getLogicalRel( partitionInfo.table.namespaceId ).deleteIndex( index.id ); // Add new index LogicalIndex newIndex = catalog.getLogicalRel( partitionInfo.table.namespaceId ).addIndex( @@ -2416,7 +2417,7 @@ public void createTablePartition( PartitionInformation partitionInfo, List { + statement.getTransaction().attachCommitAction( () -> { String physicalName = ds.addIndex( statement.getPrepareContext(), index, newAllocations.entrySet().stream().filter( e -> e.getKey().adapterId == ds.adapterId ).findFirst().orElseThrow().getValue() );//catalog.getSnapshot().alloc().getPartitionsOnDataPlacement( ds.getAdapterId(), unPartitionedTable.id ) ); @@ -2425,7 +2426,7 @@ public void createTablePartition( PartitionInformation partitionInfo, List { + statement.getTransaction().attachCommitAction( () -> { // Remove old tables sourceAllocs.forEach( s -> deleteAllocation( statement, s ) ); @@ -2686,7 +2687,7 @@ public void dropTablePartition( LogicalTable table, Statement statement ) throws AllocationTable targetTable = addAllocationTable( table.namespaceId, statement, table, placement.id, partitionProperty.left.id, store, true ); catalog.updateSnapshot(); - catalog.attachCommitAction( () -> { + statement.getTransaction().attachCommitAction( () -> { dataMigrator.copyAllocationData( statement.getTransaction(), catalog.getSnapshot().getAdapter( store.getAdapterId() ).orElseThrow(), @@ -2703,7 +2704,7 @@ public void dropTablePartition( LogicalTable table, Statement statement ) throws for ( LogicalIndex index : indexes ) { // Remove old index DataStore ds = AdapterManager.getInstance().getStore( index.location ).orElseThrow(); - catalog.attachCommitAction( () -> ds.dropIndex( statement.getPrepareContext(), index, property.partitionIds ) ); + statement.getTransaction().attachCommitAction( () -> ds.dropIndex( statement.getPrepareContext(), index, property.partitionIds ) ); catalog.getLogicalRel( table.namespaceId ).deleteIndex( index.id ); // Add new index LogicalIndex newIndex = catalog.getLogicalRel( table.namespaceId ).addIndex( @@ -2718,7 +2719,7 @@ public void dropTablePartition( LogicalTable table, Statement statement ) throws if ( index.location < 0 ) { IndexManager.getInstance().addIndex( newIndex, statement ); } else { - catalog.attachCommitAction( () -> { + statement.getTransaction().attachCommitAction( () -> { AllocationPlacement placement = catalog.getSnapshot().alloc().getPlacement( ds.adapterId, tableId ).orElseThrow(); ds.addIndex( statement.getPrepareContext(), @@ -2728,7 +2729,7 @@ public void dropTablePartition( LogicalTable table, Statement statement ) throws } } - catalog.attachCommitAction( () -> { + statement.getTransaction().attachCommitAction( () -> { // Needs to be separated from loop above. Otherwise, we loose data sources.forEach( s -> deleteAllocation( statement, s ) ); property.partitionIds.forEach( id -> catalog.getAllocRel( table.namespaceId ).deletePartition( id ) ); @@ -2924,7 +2925,7 @@ public void dropTable( LogicalTable table, Statement statement ) { // delete constraints for ( LogicalConstraint constraint : snapshot.rel().getConstraints( table.id ) ) { - dropConstraint( table, constraint.name ); + dropConstraint( statement.getTransaction(), table, constraint.name ); } // delete keys @@ -2959,7 +2960,7 @@ public void dropTable( LogicalTable table, Statement statement ) { private void deleteAllocation( Statement statement, AllocationEntity allocation ) { AdapterManager manager = AdapterManager.getInstance(); - catalog.attachCommitAction( () -> manager.getStore( allocation.adapterId ).orElseThrow().dropTable( statement.getPrepareContext(), allocation.id ) ); + statement.getTransaction().attachCommitAction( () -> manager.getStore( allocation.adapterId ).orElseThrow().dropTable( statement.getPrepareContext(), allocation.id ) ); catalog.getAllocRel( allocation.namespaceId ).deleteAllocation( allocation.id ); diff --git a/dbms/src/main/java/org/polypheny/db/ddl/DefaultInserter.java b/dbms/src/main/java/org/polypheny/db/ddl/DefaultInserter.java index d2c9a8a868..0f8bbf2aa9 100644 --- a/dbms/src/main/java/org/polypheny/db/ddl/DefaultInserter.java +++ b/dbms/src/main/java/org/polypheny/db/ddl/DefaultInserter.java @@ -22,6 +22,7 @@ import org.polypheny.db.catalog.Catalog; import org.polypheny.db.catalog.entity.LogicalAdapter.AdapterType; import org.polypheny.db.catalog.logistic.DataModel; +import org.polypheny.db.transaction.Transaction; import org.polypheny.db.util.PolyphenyHomeDirManager; import org.polypheny.db.util.RunMode; @@ -35,7 +36,7 @@ public class DefaultInserter { /** * Fills the catalog database with default data, skips if data is already inserted */ - public static void resetData( DdlManager ddlManager, RunMode mode ) { + public static void resetData( Transaction transaction, DdlManager ddlManager, RunMode mode ) { final Catalog catalog = Catalog.getInstance(); restoreUsers( catalog ); @@ -49,15 +50,13 @@ public static void resetData( DdlManager ddlManager, RunMode mode ) { ////////////// // init adapters - restoreAdapters( ddlManager, catalog, mode ); - - catalog.executeCommitActions(); - catalog.commit(); + restoreAdapters( transaction, ddlManager, catalog, mode ); + transaction.commit(); } - private static void restoreAdapters( DdlManager ddlManager, Catalog catalog, RunMode mode ) { + private static void restoreAdapters( Transaction transaction, DdlManager ddlManager, Catalog catalog, RunMode mode ) { if ( !catalog.getAdapters().isEmpty() ) { catalog.commit(); return; @@ -75,9 +74,7 @@ private static void restoreAdapters( DdlManager ddlManager, Catalog catalog, Run // Deploy default source (CSV with HR data) AdapterTemplate sourceTemplate = Catalog.snapshot().getAdapterTemplate( Catalog.defaultSource.getAdapterName(), AdapterType.SOURCE ).orElseThrow(); - ddlManager.createSource( "hr", Catalog.defaultSource.getAdapterName(), Catalog.defaultNamespaceId, AdapterType.SOURCE, sourceTemplate.getDefaultSettings(), sourceTemplate.getDefaultMode() ); - - + ddlManager.createSource( transaction, "hr", Catalog.defaultSource.getAdapterName(), Catalog.defaultNamespaceId, AdapterType.SOURCE, sourceTemplate.getDefaultSettings(), sourceTemplate.getDefaultMode() ); } diff --git a/dbms/src/main/java/org/polypheny/db/transaction/TransactionImpl.java b/dbms/src/main/java/org/polypheny/db/transaction/TransactionImpl.java index 2bbcce0a06..bb474cfe0c 100644 --- a/dbms/src/main/java/org/polypheny/db/transaction/TransactionImpl.java +++ b/dbms/src/main/java/org/polypheny/db/transaction/TransactionImpl.java @@ -18,6 +18,7 @@ import java.util.ArrayList; +import java.util.Collection; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -25,14 +26,17 @@ import java.util.Objects; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import java.util.stream.Collectors; import lombok.Getter; import lombok.NonNull; import lombok.Setter; import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.polypheny.db.PolyImplementation; import org.polypheny.db.adapter.Adapter; @@ -49,6 +53,7 @@ import org.polypheny.db.catalog.entity.logical.LogicalTable; import org.polypheny.db.catalog.exceptions.GenericRuntimeException; import org.polypheny.db.catalog.snapshot.Snapshot; +import org.polypheny.db.catalog.util.ConstraintCondition; import org.polypheny.db.config.RuntimeConfig; import org.polypheny.db.information.InformationManager; import org.polypheny.db.languages.QueryLanguage; @@ -119,6 +124,10 @@ public class TransactionImpl implements Transaction, Comparable { private final Catalog catalog = Catalog.getInstance(); + private final Collection commitActions = new ConcurrentLinkedDeque<>(); + + private final Collection commitConstraints = new ConcurrentLinkedDeque<>(); + TransactionImpl( PolyXid xid, @@ -157,6 +166,34 @@ public void registerInvolvedAdapter( Adapter adapter ) { } + public Pair<@NotNull Boolean, @Nullable String> checkIntegrity() { + // check constraints e.g. primary key constraints + List> fails = commitConstraints + .stream() + .map( c -> Pair.of( c.condition().get(), c.errorMessage() ) ) + .filter( c -> !c.left ) + .toList(); + + if ( !fails.isEmpty() ) { + commitConstraints.clear(); + return Pair.of( false, "DDL constraints not met: \n" + fails.stream().map( f -> f.right ).collect( Collectors.joining( ",\n " ) ) + "." ); + } + return Pair.of( true, null ); + } + + + @Override + public void attachCommitAction( Runnable action ) { + commitActions.add( action ); + } + + + @Override + public void attachCommitConstraint( Supplier constraintChecker, String description ) { + commitConstraints.add( new ConstraintCondition( constraintChecker, description ) ); + } + + @Override public void commit() throws TransactionException { if ( !isActive() ) { @@ -164,13 +201,13 @@ public void commit() throws TransactionException { return; } - Pair isValid = catalog.checkIntegrity(); + Pair isValid = checkIntegrity(); if ( !isValid.left ) { throw new TransactionException( isValid.right + "\nThere are violated constraints, the transaction was rolled back!" ); } // physical changes - catalog.executeCommitActions(); + commitActions.forEach( Runnable::run ); // Prepare to commit changes on all involved adapters and the catalog boolean okToCommit = true; diff --git a/dbms/src/test/java/org/polypheny/db/transaction/MockTransaction.java b/dbms/src/test/java/org/polypheny/db/transaction/MockTransaction.java new file mode 100644 index 0000000000..ffa330666e --- /dev/null +++ b/dbms/src/test/java/org/polypheny/db/transaction/MockTransaction.java @@ -0,0 +1,260 @@ + /* + * Copyright 2019-2024 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.polypheny.db.transaction; + + import java.util.List; + import java.util.Set; + import java.util.concurrent.atomic.AtomicBoolean; + import java.util.function.Supplier; + import org.jetbrains.annotations.Nullable; + import org.polypheny.db.adapter.Adapter; + import org.polypheny.db.adapter.java.JavaTypeFactory; + import org.polypheny.db.catalog.entity.LogicalConstraint; + import org.polypheny.db.catalog.entity.LogicalUser; + import org.polypheny.db.catalog.entity.logical.LogicalNamespace; + import org.polypheny.db.catalog.entity.logical.LogicalTable; + import org.polypheny.db.catalog.snapshot.Snapshot; + import org.polypheny.db.information.InformationManager; + import org.polypheny.db.languages.QueryLanguage; + import org.polypheny.db.processing.DataMigrator; + import org.polypheny.db.processing.Processor; + + public class MockTransaction implements Transaction { + + private long id; + + private boolean committed = false; + + + public MockTransaction( long id ) { + this.id = id; + } + + + @Override + public long getId() { + return id; + } + + + @Override + public PolyXid getXid() { + return null; + } + + + @Override + public Statement createStatement() { + return null; + } + + + @Override + public LogicalUser getUser() { + return null; + } + + + @Override + public void attachCommitAction( Runnable action ) { + } + + + @Override + public void attachCommitConstraint( Supplier constraintChecker, String description ) { + } + + + @Override + public void commit() throws TransactionException { + committed = true; + } + + + @Override + public void rollback( @Nullable String reason ) throws TransactionException { + } + + + @Override + public void registerInvolvedAdapter( Adapter adapter ) { + + } + + + @Override + public Set> getInvolvedAdapters() { + return Set.of(); + } + + + @Override + public Snapshot getSnapshot() { + return null; + } + + + @Override + public boolean isActive() { + return false; + } + + + @Override + public JavaTypeFactory getTypeFactory() { + return null; + } + + + @Override + public Processor getProcessor( QueryLanguage language ) { + return null; + } + + + @Override + public boolean isAnalyze() { + return false; + } + + + @Override + public void setAnalyze( boolean analyze ) { + + } + + + @Override + public InformationManager getQueryAnalyzer() { + return null; + } + + + @Override + public AtomicBoolean getCancelFlag() { + return null; + } + + + @Override + public LogicalNamespace getDefaultNamespace() { + return null; + } + + + @Override + public String getOrigin() { + return ""; + } + + + @Override + public MultimediaFlavor getFlavor() { + return null; + } + + + @Override + public long getNumberOfStatements() { + return 0; + } + + + @Override + public DataMigrator getDataMigrator() { + return null; + } + + + @Override + public void setUseCache( boolean useCache ) { + + } + + + @Override + public boolean getUseCache() { + return false; + } + + + @Override + public void addUsedTable( LogicalTable table ) { + + } + + + @Override + public void removeUsedTable( LogicalTable table ) { + + } + + + @Override + public void getNewEntityConstraints( long entity ) { + + } + + + @Override + public void addNewConstraint( long entityId, LogicalConstraint constraint ) { + + } + + + @Override + public void removeNewConstraint( long entityId, LogicalConstraint constraint ) { + + } + + + @Override + public void setAcceptsOutdated( boolean acceptsOutdated ) { + + } + + + @Override + public boolean acceptsOutdated() { + return false; + } + + + @Override + public AccessMode getAccessMode() { + return null; + } + + + @Override + public void updateAccessMode( AccessMode accessCandidate ) { + + } + + + @Override + public TransactionManager getTransactionManager() { + return null; + } + + + @Override + public List getUsedConstraints( long id ) { + return List.of(); + } + + } diff --git a/plugins/mql-language/src/test/java/org/polypheny/db/mql/mql2alg/MqlMockCatalog.java b/plugins/mql-language/src/test/java/org/polypheny/db/mql/mql2alg/MqlMockCatalog.java index 917e9ba1c8..6805a7016d 100644 --- a/plugins/mql-language/src/test/java/org/polypheny/db/mql/mql2alg/MqlMockCatalog.java +++ b/plugins/mql-language/src/test/java/org/polypheny/db/mql/mql2alg/MqlMockCatalog.java @@ -20,7 +20,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.function.Supplier; import org.jetbrains.annotations.NotNull; import org.polypheny.db.adapter.AbstractAdapterSetting; import org.polypheny.db.adapter.Adapter; @@ -105,10 +104,4 @@ public void restore( Transaction transaction ) { } - - @Override - public void attachCommitConstraint( Supplier constraintChecker, String description ) { - - } - } diff --git a/plugins/sql-language/src/main/java/org/polypheny/db/sql/language/ddl/SqlAlterAdaptersAdd.java b/plugins/sql-language/src/main/java/org/polypheny/db/sql/language/ddl/SqlAlterAdaptersAdd.java index a54a398d04..0a99855b65 100644 --- a/plugins/sql-language/src/main/java/org/polypheny/db/sql/language/ddl/SqlAlterAdaptersAdd.java +++ b/plugins/sql-language/src/main/java/org/polypheny/db/sql/language/ddl/SqlAlterAdaptersAdd.java @@ -106,7 +106,7 @@ public void execute( Context context, Statement statement, ParsedQueryContext pa if ( type == AdapterType.STORE ) { DdlManager.getInstance().createStore( uniqueName, adapterName, type, configMap, mode ); } else if ( type == AdapterType.SOURCE ) { - DdlManager.getInstance().createSource( uniqueName, adapterName, Catalog.defaultNamespaceId, type, configMap, mode ); + DdlManager.getInstance().createSource( statement.getTransaction(), uniqueName, adapterName, Catalog.defaultNamespaceId, type, configMap, mode ); } else { log.error( "Unknown adapter type: {}", type ); throw new GenericRuntimeException( "Unknown adapter type: " + type ); diff --git a/plugins/sql-language/src/main/java/org/polypheny/db/sql/language/ddl/altertable/SqlAlterTableAddColumn.java b/plugins/sql-language/src/main/java/org/polypheny/db/sql/language/ddl/altertable/SqlAlterTableAddColumn.java index 8032afc309..a72e8c0412 100644 --- a/plugins/sql-language/src/main/java/org/polypheny/db/sql/language/ddl/altertable/SqlAlterTableAddColumn.java +++ b/plugins/sql-language/src/main/java/org/polypheny/db/sql/language/ddl/altertable/SqlAlterTableAddColumn.java @@ -141,6 +141,7 @@ public void execute( Context context, Statement statement, ParsedQueryContext pa } DdlManager.getInstance().createColumn( + statement.getTransaction(), column.getSimple(), logicalTable, beforeColumnName == null ? null : beforeColumnName.getSimple(), @@ -152,4 +153,3 @@ public void execute( Context context, Statement statement, ParsedQueryContext pa } } - diff --git a/plugins/sql-language/src/main/java/org/polypheny/db/sql/language/ddl/altertable/SqlAlterTableDropConstraint.java b/plugins/sql-language/src/main/java/org/polypheny/db/sql/language/ddl/altertable/SqlAlterTableDropConstraint.java index b71a348927..5aaeac1849 100644 --- a/plugins/sql-language/src/main/java/org/polypheny/db/sql/language/ddl/altertable/SqlAlterTableDropConstraint.java +++ b/plugins/sql-language/src/main/java/org/polypheny/db/sql/language/ddl/altertable/SqlAlterTableDropConstraint.java @@ -82,8 +82,7 @@ public void execute( Context context, Statement statement, ParsedQueryContext pa throw new GenericRuntimeException( "Not possible to use ALTER TABLE because " + logicalTable.name + " is not a table." ); } - DdlManager.getInstance().dropConstraint( logicalTable, constraintName.getSimple() ); + DdlManager.getInstance().dropConstraint( statement.getTransaction(), logicalTable, constraintName.getSimple() ); } } -