diff --git a/internal/db/collection.go b/internal/db/collection.go index 4b9c988288..19cb42cb86 100644 --- a/internal/db/collection.go +++ b/internal/db/collection.go @@ -13,18 +13,14 @@ package db import ( "bytes" "context" - "encoding/json" "fmt" - "reflect" "strconv" "strings" - jsonpatch "github.com/evanphx/json-patch/v5" "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/query" cidlink "github.com/ipld/go-ipld-prime/linking/cid" - "github.com/lens-vm/lens/host-go/config/model" "github.com/sourcenetwork/immutable" "github.com/sourcenetwork/defradb/acp" @@ -81,1119 +77,6 @@ func (c *collection) newFetcher() fetcher.Fetcher { return lens.NewFetcher(innerFetcher, c.db.LensRegistry()) } -// createCollection creates a collection and saves it to the database in its system store. -// Note: Collection.ID is an auto-incrementing value that is generated by the database. -func (db *db) createCollection( - ctx context.Context, - def client.CollectionDefinition, - newDefinitions []client.CollectionDefinition, -) (client.Collection, error) { - schema := def.Schema - desc := def.Description - txn := mustGetContextTxn(ctx) - - if desc.Name.HasValue() { - exists, err := description.HasCollectionByName(ctx, txn, desc.Name.Value()) - if err != nil { - return nil, err - } - if exists { - return nil, ErrCollectionAlreadyExists - } - } - - existingDefinitions, err := db.getAllActiveDefinitions(ctx) - if err != nil { - return nil, err - } - - schemaByName := map[string]client.SchemaDescription{} - for _, existingDefinition := range existingDefinitions { - schemaByName[existingDefinition.Schema.Name] = existingDefinition.Schema - } - for _, newDefinition := range newDefinitions { - schemaByName[newDefinition.Schema.Name] = newDefinition.Schema - } - - _, err = validateUpdateSchemaFields(schemaByName, client.SchemaDescription{}, schema) - if err != nil { - return nil, err - } - - definitionsByName := map[string]client.CollectionDefinition{} - for _, existingDefinition := range existingDefinitions { - definitionsByName[existingDefinition.GetName()] = existingDefinition - } - for _, newDefinition := range newDefinitions { - definitionsByName[newDefinition.GetName()] = newDefinition - } - err = db.validateNewCollection(def, definitionsByName) - if err != nil { - return nil, err - } - - colSeq, err := db.getSequence(ctx, core.CollectionIDSequenceKey{}) - if err != nil { - return nil, err - } - colID, err := colSeq.next(ctx) - if err != nil { - return nil, err - } - - fieldSeq, err := db.getSequence(ctx, core.NewFieldIDSequenceKey(uint32(colID))) - if err != nil { - return nil, err - } - - desc.ID = uint32(colID) - desc.RootID = desc.ID - - schema, err = description.CreateSchemaVersion(ctx, txn, schema) - if err != nil { - return nil, err - } - desc.SchemaVersionID = schema.VersionID - for _, localField := range desc.Fields { - var fieldID uint64 - if localField.Name == request.DocIDFieldName { - // There is no hard technical requirement for this, we just think it looks nicer - // if the doc id is at the zero index. It makes it look a little nicer in commit - // queries too. - fieldID = 0 - } else { - fieldID, err = fieldSeq.next(ctx) - if err != nil { - return nil, err - } - } - - for i := range desc.Fields { - if desc.Fields[i].Name == localField.Name { - desc.Fields[i].ID = client.FieldID(fieldID) - break - } - } - } - - desc, err = description.SaveCollection(ctx, txn, desc) - if err != nil { - return nil, err - } - - col := db.newCollection(desc, schema) - - for _, index := range desc.Indexes { - if _, err := col.createIndex(ctx, index); err != nil { - return nil, err - } - } - - return db.getCollectionByID(ctx, desc.ID) -} - -// validateCollectionDefinitionPolicyDesc validates that the policy definition is valid, beyond syntax. -// -// Ensures that the information within the policy definition makes sense, -// this function might also make relevant remote calls using the acp system. -func (db *db) validateCollectionDefinitionPolicyDesc( - ctx context.Context, - policyDesc immutable.Option[client.PolicyDescription], -) error { - if !policyDesc.HasValue() { - // No policy validation needed, whether acp exists or not doesn't matter. - return nil - } - - // If there is a policy specified, but the database does not have - // acp enabled/available return an error, database must have an acp available - // to enable access control (inorder to adhere to the policy specified). - if !db.acp.HasValue() { - return ErrCanNotHavePolicyWithoutACP - } - - // If we have the policy specified on the collection, and acp is available/enabled, - // then using the acp system we need to ensure the policy id specified - // actually exists as a policy, and the resource name exists on that policy - // and that the resource is a valid DPI. - return db.acp.Value().ValidateResourceExistsOnValidDPI( - ctx, - policyDesc.Value().ID, - policyDesc.Value().ResourceName, - ) -} - -// updateSchema updates the persisted schema description matching the name of the given -// description, to the values in the given description. -// -// It will validate the given description using [validateUpdateSchema] before updating it. -// -// The schema (including the schema version ID) will only be updated if any changes have actually -// been made, if the given description matches the current persisted description then no changes will be -// applied. -func (db *db) updateSchema( - ctx context.Context, - existingSchemaByName map[string]client.SchemaDescription, - proposedDescriptionsByName map[string]client.SchemaDescription, - schema client.SchemaDescription, - migration immutable.Option[model.Lens], - setAsActiveVersion bool, -) error { - hasChanged, err := db.validateUpdateSchema( - existingSchemaByName, - proposedDescriptionsByName, - schema, - ) - if err != nil { - return err - } - - if !hasChanged { - return nil - } - - for _, field := range schema.Fields { - if field.Kind.IsObject() && !field.Kind.IsArray() { - idFieldName := field.Name + "_id" - if _, ok := schema.GetFieldByName(idFieldName); !ok { - schema.Fields = append(schema.Fields, client.SchemaFieldDescription{ - Name: idFieldName, - Kind: client.FieldKind_DocID, - }) - } - } - } - - for i, field := range schema.Fields { - if field.Typ == client.NONE_CRDT { - // If no CRDT Type has been provided, default to LWW_REGISTER. - field.Typ = client.LWW_REGISTER - schema.Fields[i] = field - } - } - - txn := mustGetContextTxn(ctx) - previousVersionID := schema.VersionID - schema, err = description.CreateSchemaVersion(ctx, txn, schema) - if err != nil { - return err - } - - // After creating the new schema version, we need to create new collection versions for - // any collection using the previous version. These will be inactive unless [setAsActiveVersion] - // is true. - - cols, err := description.GetCollectionsBySchemaVersionID(ctx, txn, previousVersionID) - if err != nil { - return err - } - - colSeq, err := db.getSequence(ctx, core.CollectionIDSequenceKey{}) - if err != nil { - return err - } - - for _, col := range cols { - previousID := col.ID - - existingCols, err := description.GetCollectionsBySchemaVersionID(ctx, txn, schema.VersionID) - if err != nil { - return err - } - - // The collection version may exist before the schema version was created locally. This is - // because migrations for the globally known schema version may have been registered locally - // (typically to handle documents synced over P2P at higher versions) before the local schema - // was updated. We need to check for them now, and update them instead of creating new ones - // if they exist. - var isExistingCol bool - existingColLoop: - for _, existingCol := range existingCols { - sources := existingCol.CollectionSources() - for _, source := range sources { - // Make sure that this collection is the parent of the current [col], and not part of - // another collection set that happens to be using the same schema. - if source.SourceCollectionID == previousID { - if existingCol.RootID == client.OrphanRootID { - existingCol.RootID = col.RootID - } - - fieldSeq, err := db.getSequence(ctx, core.NewFieldIDSequenceKey(existingCol.RootID)) - if err != nil { - return err - } - - for _, globalField := range schema.Fields { - var fieldID client.FieldID - // We must check the source collection if the field already exists, and take its ID - // from there, otherwise the field must be generated by the sequence. - existingField, ok := col.GetFieldByName(globalField.Name) - if ok { - fieldID = existingField.ID - } else { - nextFieldID, err := fieldSeq.next(ctx) - if err != nil { - return err - } - fieldID = client.FieldID(nextFieldID) - } - - existingCol.Fields = append( - existingCol.Fields, - client.CollectionFieldDescription{ - Name: globalField.Name, - ID: fieldID, - }, - ) - } - existingCol, err = description.SaveCollection(ctx, txn, existingCol) - if err != nil { - return err - } - isExistingCol = true - break existingColLoop - } - } - } - - if !isExistingCol { - colID, err := colSeq.next(ctx) - if err != nil { - return err - } - - fieldSeq, err := db.getSequence(ctx, core.NewFieldIDSequenceKey(col.RootID)) - if err != nil { - return err - } - - // Create any new collections without a name (inactive), if [setAsActiveVersion] is true - // they will be activated later along with any existing collection versions. - col.Name = immutable.None[string]() - col.ID = uint32(colID) - col.SchemaVersionID = schema.VersionID - col.Sources = []any{ - &client.CollectionSource{ - SourceCollectionID: previousID, - Transform: migration, - }, - } - - for _, globalField := range schema.Fields { - _, exists := col.GetFieldByName(globalField.Name) - if !exists { - fieldID, err := fieldSeq.next(ctx) - if err != nil { - return err - } - - col.Fields = append( - col.Fields, - client.CollectionFieldDescription{ - Name: globalField.Name, - ID: client.FieldID(fieldID), - }, - ) - } - } - - _, err = description.SaveCollection(ctx, txn, col) - if err != nil { - return err - } - - if migration.HasValue() { - err = db.LensRegistry().SetMigration(ctx, col.ID, migration.Value()) - if err != nil { - return err - } - } - } - } - - if setAsActiveVersion { - // activate collection versions using the new schema ID. This call must be made after - // all new collection versions have been saved. - err = db.setActiveSchemaVersion(ctx, schema.VersionID) - if err != nil { - return err - } - } - - return nil -} - -// validateUpdateSchema validates that the given schema description is a valid update. -// -// Will return true if the given description differs from the current persisted state of the -// schema. Will return an error if it fails validation. -func (db *db) validateUpdateSchema( - existingDescriptionsByName map[string]client.SchemaDescription, - proposedDescriptionsByName map[string]client.SchemaDescription, - proposedDesc client.SchemaDescription, -) (bool, error) { - if proposedDesc.Name == "" { - return false, ErrSchemaNameEmpty - } - - existingDesc, collectionExists := existingDescriptionsByName[proposedDesc.Name] - if !collectionExists { - return false, NewErrAddCollectionWithPatch(proposedDesc.Name) - } - - if proposedDesc.Root != existingDesc.Root { - return false, NewErrSchemaRootDoesntMatch( - proposedDesc.Name, - existingDesc.Root, - proposedDesc.Root, - ) - } - - if proposedDesc.Name != existingDesc.Name { - // There is actually little reason to not support this atm besides controlling the surface area - // of the new feature. Changing this should not break anything, but it should be tested first. - return false, NewErrCannotModifySchemaName(existingDesc.Name, proposedDesc.Name) - } - - if proposedDesc.VersionID != "" && proposedDesc.VersionID != existingDesc.VersionID { - // If users specify this it will be overwritten, an error is preferred to quietly ignoring it. - return false, ErrCannotSetVersionID - } - - hasChangedFields, err := validateUpdateSchemaFields(proposedDescriptionsByName, existingDesc, proposedDesc) - if err != nil { - return hasChangedFields, err - } - - return hasChangedFields, err -} - -func validateUpdateSchemaFields( - descriptionsByName map[string]client.SchemaDescription, - existingDesc client.SchemaDescription, - proposedDesc client.SchemaDescription, -) (bool, error) { - hasChanged := false - existingFieldsByName := map[string]client.SchemaFieldDescription{} - existingFieldIndexesByName := map[string]int{} - for i, field := range existingDesc.Fields { - existingFieldIndexesByName[field.Name] = i - existingFieldsByName[field.Name] = field - } - - newFieldNames := map[string]struct{}{} - for proposedIndex, proposedField := range proposedDesc.Fields { - existingField, fieldAlreadyExists := existingFieldsByName[proposedField.Name] - - // If the field is new, then the collection has changed - hasChanged = hasChanged || !fieldAlreadyExists - - if !fieldAlreadyExists && proposedField.Kind.IsObject() { - _, relatedDescFound := descriptionsByName[proposedField.Kind.Underlying()] - - if !relatedDescFound { - return false, NewErrFieldKindNotFound(proposedField.Name, proposedField.Kind.Underlying()) - } - - if proposedField.Kind.IsObject() && !proposedField.Kind.IsArray() { - idFieldName := proposedField.Name + request.RelatedObjectID - idField, idFieldFound := proposedDesc.GetFieldByName(idFieldName) - if idFieldFound { - if idField.Kind != client.FieldKind_DocID { - return false, NewErrRelationalFieldIDInvalidType(idField.Name, client.FieldKind_DocID, idField.Kind) - } - } - } - } - - if proposedField.Kind.IsObjectArray() { - return false, NewErrSecondaryFieldOnSchema(proposedField.Name) - } - - if _, isDuplicate := newFieldNames[proposedField.Name]; isDuplicate { - return false, NewErrDuplicateField(proposedField.Name) - } - - if fieldAlreadyExists && proposedField != existingField { - return false, NewErrCannotMutateField(proposedField.Name) - } - - if existingIndex := existingFieldIndexesByName[proposedField.Name]; fieldAlreadyExists && - proposedIndex != existingIndex { - return false, NewErrCannotMoveField(proposedField.Name, proposedIndex, existingIndex) - } - - if !proposedField.Typ.IsSupportedFieldCType() { - return false, client.NewErrInvalidCRDTType(proposedField.Name, proposedField.Typ.String()) - } - - if !proposedField.Typ.IsCompatibleWith(proposedField.Kind) { - return false, client.NewErrCRDTKindMismatch(proposedField.Typ.String(), proposedField.Kind.String()) - } - - newFieldNames[proposedField.Name] = struct{}{} - } - - for _, field := range existingDesc.Fields { - if _, stillExists := newFieldNames[field.Name]; !stillExists { - return false, NewErrCannotDeleteField(field.Name) - } - } - return hasChanged, nil -} - -func (db *db) patchCollection( - ctx context.Context, - patchString string, -) error { - patch, err := jsonpatch.DecodePatch([]byte(patchString)) - if err != nil { - return err - } - txn := mustGetContextTxn(ctx) - cols, err := description.GetCollections(ctx, txn) - if err != nil { - return err - } - - existingColsByID := map[uint32]client.CollectionDescription{} - for _, col := range cols { - existingColsByID[col.ID] = col - } - - existingDescriptionJson, err := json.Marshal(existingColsByID) - if err != nil { - return err - } - - newDescriptionJson, err := patch.Apply(existingDescriptionJson) - if err != nil { - return err - } - - var newColsByID map[uint32]client.CollectionDescription - decoder := json.NewDecoder(strings.NewReader(string(newDescriptionJson))) - decoder.DisallowUnknownFields() - err = decoder.Decode(&newColsByID) - if err != nil { - return err - } - - err = db.validateCollectionChanges(existingColsByID, newColsByID) - if err != nil { - return err - } - - for _, col := range newColsByID { - _, err := description.SaveCollection(ctx, txn, col) - if err != nil { - return err - } - - existingCol, ok := existingColsByID[col.ID] - if ok { - // Clear any existing migrations in the registry, using this semi-hacky way - // to avoid adding more functions to a public interface that we wish to remove. - - for _, src := range existingCol.CollectionSources() { - if src.Transform.HasValue() { - err = db.LensRegistry().SetMigration(ctx, existingCol.ID, model.Lens{}) - if err != nil { - return err - } - } - } - for _, src := range existingCol.QuerySources() { - if src.Transform.HasValue() { - err = db.LensRegistry().SetMigration(ctx, existingCol.ID, model.Lens{}) - if err != nil { - return err - } - } - } - } - - for _, src := range col.CollectionSources() { - if src.Transform.HasValue() { - err = db.LensRegistry().SetMigration(ctx, col.ID, src.Transform.Value()) - if err != nil { - return err - } - } - } - - for _, src := range col.QuerySources() { - if src.Transform.HasValue() { - err = db.LensRegistry().SetMigration(ctx, col.ID, src.Transform.Value()) - if err != nil { - return err - } - } - } - } - - return db.loadSchema(ctx) -} - -var patchCollectionValidators = []func( - map[uint32]client.CollectionDescription, - map[uint32]client.CollectionDescription, -) error{ - validateCollectionNameUnique, - validateSingleVersionActive, - validateSourcesNotRedefined, - validateIndexesNotModified, - validateFieldsNotModified, - validatePolicyNotModified, - validateIDNotZero, - validateIDUnique, - validateIDExists, - validateRootIDNotMutated, - validateSchemaVersionIDNotMutated, - validateCollectionNotRemoved, -} - -func (db *db) validateCollectionChanges( - oldColsByID map[uint32]client.CollectionDescription, - newColsByID map[uint32]client.CollectionDescription, -) error { - for _, validators := range patchCollectionValidators { - err := validators(oldColsByID, newColsByID) - if err != nil { - return err - } - } - - return nil -} - -var newCollectionValidators = []func( - client.CollectionDefinition, - map[string]client.CollectionDefinition, -) error{ - validateSecondaryFieldsPairUp, - validateRelationPointsToValidKind, - validateSingleSidePrimary, -} - -func (db *db) validateNewCollection( - def client.CollectionDefinition, - defsByName map[string]client.CollectionDefinition, -) error { - for _, validators := range newCollectionValidators { - err := validators(def, defsByName) - if err != nil { - return err - } - } - - return nil -} - -func validateRelationPointsToValidKind( - def client.CollectionDefinition, - defsByName map[string]client.CollectionDefinition, -) error { - for _, field := range def.Description.Fields { - if !field.Kind.HasValue() { - continue - } - - if !field.Kind.Value().IsObject() { - continue - } - - underlying := field.Kind.Value().Underlying() - _, ok := defsByName[underlying] - if !ok { - return NewErrFieldKindNotFound(field.Name, underlying) - } - } - - return nil -} - -func validateSecondaryFieldsPairUp( - def client.CollectionDefinition, - defsByName map[string]client.CollectionDefinition, -) error { - for _, field := range def.Description.Fields { - if !field.Kind.HasValue() { - continue - } - - if !field.Kind.Value().IsObject() { - continue - } - - if !field.RelationName.HasValue() { - continue - } - - _, hasSchemaField := def.Schema.GetFieldByName(field.Name) - if hasSchemaField { - continue - } - - underlying := field.Kind.Value().Underlying() - otherDef, ok := defsByName[underlying] - if !ok { - continue - } - - if len(otherDef.Description.Fields) == 0 { - // Views/embedded objects do not require both sides of the relation to be defined. - continue - } - - otherField, ok := otherDef.Description.GetFieldByRelation( - field.RelationName.Value(), - def.GetName(), - field.Name, - ) - if !ok { - return NewErrRelationMissingField(underlying, field.RelationName.Value()) - } - - _, ok = otherDef.Schema.GetFieldByName(otherField.Name) - if !ok { - // This secondary is paired with another secondary, which is invalid - return NewErrRelationMissingField(underlying, field.RelationName.Value()) - } - } - - return nil -} - -func validateSingleSidePrimary( - def client.CollectionDefinition, - defsByName map[string]client.CollectionDefinition, -) error { - for _, field := range def.Description.Fields { - if !field.Kind.HasValue() { - continue - } - - if !field.Kind.Value().IsObject() { - continue - } - - if !field.RelationName.HasValue() { - continue - } - - _, hasSchemaField := def.Schema.GetFieldByName(field.Name) - if !hasSchemaField { - // This is a secondary field and thus passes this rule - continue - } - - underlying := field.Kind.Value().Underlying() - otherDef, ok := defsByName[underlying] - if !ok { - continue - } - - otherField, ok := otherDef.Description.GetFieldByRelation( - field.RelationName.Value(), - def.GetName(), - field.Name, - ) - if !ok { - // This must be a one-sided relation, in which case it passes this rule - continue - } - - _, ok = otherDef.Schema.GetFieldByName(otherField.Name) - if ok { - // This primary is paired with another primary, which is invalid - return ErrMultipleRelationPrimaries - } - } - - return nil -} - -func validateCollectionNameUnique( - oldColsByID map[uint32]client.CollectionDescription, - newColsByID map[uint32]client.CollectionDescription, -) error { - names := map[string]struct{}{} - for _, col := range newColsByID { - if !col.Name.HasValue() { - continue - } - - if _, ok := names[col.Name.Value()]; ok { - return NewErrCollectionAlreadyExists(col.Name.Value()) - } - names[col.Name.Value()] = struct{}{} - } - - return nil -} - -func validateSingleVersionActive( - oldColsByID map[uint32]client.CollectionDescription, - newColsByID map[uint32]client.CollectionDescription, -) error { - rootsWithActiveCol := map[uint32]struct{}{} - for _, col := range newColsByID { - if !col.Name.HasValue() { - continue - } - - if _, ok := rootsWithActiveCol[col.RootID]; ok { - return NewErrMultipleActiveCollectionVersions(col.Name.Value(), col.RootID) - } - rootsWithActiveCol[col.RootID] = struct{}{} - } - - return nil -} - -// validateSourcesNotRedefined specifies the limitations on how the collection sources -// can be mutated. -// -// Currently new sources cannot be added, existing cannot be removed, and CollectionSources -// cannot be redirected to other collections. -func validateSourcesNotRedefined( - oldColsByID map[uint32]client.CollectionDescription, - newColsByID map[uint32]client.CollectionDescription, -) error { - for _, newCol := range newColsByID { - oldCol, ok := oldColsByID[newCol.ID] - if !ok { - continue - } - - newColSources := newCol.CollectionSources() - oldColSources := oldCol.CollectionSources() - - if len(newColSources) != len(oldColSources) { - return NewErrCollectionSourcesCannotBeAddedRemoved(newCol.ID) - } - - for i := range newColSources { - if newColSources[i].SourceCollectionID != oldColSources[i].SourceCollectionID { - return NewErrCollectionSourceIDMutated( - newCol.ID, - newColSources[i].SourceCollectionID, - oldColSources[i].SourceCollectionID, - ) - } - } - - newQuerySources := newCol.QuerySources() - oldQuerySources := oldCol.QuerySources() - - if len(newQuerySources) != len(oldQuerySources) { - return NewErrCollectionSourcesCannotBeAddedRemoved(newCol.ID) - } - } - - return nil -} - -func validateIndexesNotModified( - oldColsByID map[uint32]client.CollectionDescription, - newColsByID map[uint32]client.CollectionDescription, -) error { - for _, newCol := range newColsByID { - oldCol, ok := oldColsByID[newCol.ID] - if !ok { - continue - } - - // DeepEqual is temporary, as this validation is temporary - if !reflect.DeepEqual(oldCol.Indexes, newCol.Indexes) { - return NewErrCollectionIndexesCannotBeMutated(newCol.ID) - } - } - - return nil -} - -func validateFieldsNotModified( - oldColsByID map[uint32]client.CollectionDescription, - newColsByID map[uint32]client.CollectionDescription, -) error { - for _, newCol := range newColsByID { - oldCol, ok := oldColsByID[newCol.ID] - if !ok { - continue - } - - // DeepEqual is temporary, as this validation is temporary - if !reflect.DeepEqual(oldCol.Fields, newCol.Fields) { - return NewErrCollectionFieldsCannotBeMutated(newCol.ID) - } - } - - return nil -} - -func validatePolicyNotModified( - oldColsByID map[uint32]client.CollectionDescription, - newColsByID map[uint32]client.CollectionDescription, -) error { - for _, newCol := range newColsByID { - oldCol, ok := oldColsByID[newCol.ID] - if !ok { - continue - } - - // DeepEqual is temporary, as this validation is temporary - if !reflect.DeepEqual(oldCol.Policy, newCol.Policy) { - return NewErrCollectionPolicyCannotBeMutated(newCol.ID) - } - } - - return nil -} - -func validateIDNotZero( - oldColsByID map[uint32]client.CollectionDescription, - newColsByID map[uint32]client.CollectionDescription, -) error { - for _, newCol := range newColsByID { - if newCol.ID == 0 { - return ErrCollectionIDCannotBeZero - } - } - - return nil -} - -func validateIDUnique( - oldColsByID map[uint32]client.CollectionDescription, - newColsByID map[uint32]client.CollectionDescription, -) error { - colIds := map[uint32]struct{}{} - for _, newCol := range newColsByID { - if _, ok := colIds[newCol.ID]; ok { - return NewErrCollectionIDAlreadyExists(newCol.ID) - } - colIds[newCol.ID] = struct{}{} - } - - return nil -} - -func validateIDExists( - oldColsByID map[uint32]client.CollectionDescription, - newColsByID map[uint32]client.CollectionDescription, -) error { - for _, newCol := range newColsByID { - if _, ok := oldColsByID[newCol.ID]; !ok { - return NewErrAddCollectionIDWithPatch(newCol.ID) - } - } - - return nil -} - -func validateRootIDNotMutated( - oldColsByID map[uint32]client.CollectionDescription, - newColsByID map[uint32]client.CollectionDescription, -) error { - for _, newCol := range newColsByID { - oldCol, ok := oldColsByID[newCol.ID] - if !ok { - continue - } - - if newCol.RootID != oldCol.RootID { - return NewErrCollectionRootIDCannotBeMutated(newCol.ID) - } - } - - return nil -} - -func validateSchemaVersionIDNotMutated( - oldColsByID map[uint32]client.CollectionDescription, - newColsByID map[uint32]client.CollectionDescription, -) error { - for _, newCol := range newColsByID { - oldCol, ok := oldColsByID[newCol.ID] - if !ok { - continue - } - - if newCol.SchemaVersionID != oldCol.SchemaVersionID { - return NewErrCollectionSchemaVersionIDCannotBeMutated(newCol.ID) - } - } - - return nil -} - -func validateCollectionNotRemoved( - oldColsByID map[uint32]client.CollectionDescription, - newColsByID map[uint32]client.CollectionDescription, -) error { -oldLoop: - for _, oldCol := range oldColsByID { - for _, newCol := range newColsByID { - // It is not enough to just match by the map index, in case the index does not pair - // up with the ID (this can happen if a user moves the collection within the map) - if newCol.ID == oldCol.ID { - continue oldLoop - } - } - - return NewErrCollectionsCannotBeDeleted(oldCol.ID) - } - - return nil -} - -// SetActiveSchemaVersion activates all collection versions with the given schema version, and deactivates all -// those without it (if they share the same schema root). -// -// This will affect all operations interacting with the schema where a schema version is not explicitly -// provided. This includes GQL queries and Collection operations. -// -// It will return an error if the provided schema version ID does not exist. -func (db *db) setActiveSchemaVersion( - ctx context.Context, - schemaVersionID string, -) error { - if schemaVersionID == "" { - return ErrSchemaVersionIDEmpty - } - txn := mustGetContextTxn(ctx) - cols, err := description.GetCollectionsBySchemaVersionID(ctx, txn, schemaVersionID) - if err != nil { - return err - } - - schema, err := description.GetSchemaVersion(ctx, txn, schemaVersionID) - if err != nil { - return err - } - - colsWithRoot, err := description.GetCollectionsBySchemaRoot(ctx, txn, schema.Root) - if err != nil { - return err - } - - colsBySourceID := map[uint32][]client.CollectionDescription{} - colsByID := make(map[uint32]client.CollectionDescription, len(colsWithRoot)) - for _, col := range colsWithRoot { - colsByID[col.ID] = col - - sources := col.CollectionSources() - if len(sources) > 0 { - // For now, we assume that each collection can only have a single source. This will likely need - // to change later. - slice := colsBySourceID[sources[0].SourceCollectionID] - slice = append(slice, col) - colsBySourceID[sources[0].SourceCollectionID] = slice - } - } - - for _, col := range cols { - if col.Name.HasValue() { - // The collection is already active, so we can skip it and continue - continue - } - sources := col.CollectionSources() - - var activeCol client.CollectionDescription - var rootCol client.CollectionDescription - var isActiveFound bool - if len(sources) > 0 { - // For now, we assume that each collection can only have a single source. This will likely need - // to change later. - activeCol, rootCol, isActiveFound = db.getActiveCollectionDown(ctx, colsByID, sources[0].SourceCollectionID) - } - if !isActiveFound { - // We need to look both down and up for the active version - the most recent is not necessarily the active one. - activeCol, isActiveFound = db.getActiveCollectionUp(ctx, colsBySourceID, rootCol.ID) - } - - var newName string - if isActiveFound { - newName = activeCol.Name.Value() - } else { - // If there are no active versions in the collection set, take the name of the schema to be the name of the - // collection. - newName = schema.Name - } - col.Name = immutable.Some(newName) - - _, err = description.SaveCollection(ctx, txn, col) - if err != nil { - return err - } - - if isActiveFound { - // Deactivate the currently active collection by setting its name to none. - activeCol.Name = immutable.None[string]() - _, err = description.SaveCollection(ctx, txn, activeCol) - if err != nil { - return err - } - } - } - - // Load the schema into the clients (e.g. GQL) - return db.loadSchema(ctx) -} - -func (db *db) getActiveCollectionDown( - ctx context.Context, - colsByID map[uint32]client.CollectionDescription, - id uint32, -) (client.CollectionDescription, client.CollectionDescription, bool) { - col, ok := colsByID[id] - if !ok { - return client.CollectionDescription{}, client.CollectionDescription{}, false - } - - if col.Name.HasValue() { - return col, client.CollectionDescription{}, true - } - - sources := col.CollectionSources() - if len(sources) == 0 { - // If a collection has zero sources it is likely the initial collection version, or - // this collection set is currently orphaned (can happen when setting migrations that - // do not yet link all the way back to a non-orphaned set) - return client.CollectionDescription{}, col, false - } - - // For now, we assume that each collection can only have a single source. This will likely need - // to change later. - return db.getActiveCollectionDown(ctx, colsByID, sources[0].SourceCollectionID) -} - -func (db *db) getActiveCollectionUp( - ctx context.Context, - colsBySourceID map[uint32][]client.CollectionDescription, - id uint32, -) (client.CollectionDescription, bool) { - cols, ok := colsBySourceID[id] - if !ok { - // We have reached the top of the set, and have not found an active collection - return client.CollectionDescription{}, false - } - - for _, col := range cols { - if col.Name.HasValue() { - return col, true - } - activeCol, isFound := db.getActiveCollectionUp(ctx, colsBySourceID, col.ID) - if isFound { - return activeCol, isFound - } - } - - return client.CollectionDescription{}, false -} - func (db *db) getCollectionByID(ctx context.Context, id uint32) (client.Collection, error) { txn := mustGetContextTxn(ctx) diff --git a/internal/db/collection_define.go b/internal/db/collection_define.go new file mode 100644 index 0000000000..4712911399 --- /dev/null +++ b/internal/db/collection_define.go @@ -0,0 +1,373 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package db + +import ( + "context" + "encoding/json" + "strings" + + jsonpatch "github.com/evanphx/json-patch/v5" + "github.com/lens-vm/lens/host-go/config/model" + "github.com/sourcenetwork/immutable" + + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/client/request" + "github.com/sourcenetwork/defradb/internal/core" + "github.com/sourcenetwork/defradb/internal/db/description" +) + +func (db *db) createCollection( + ctx context.Context, + def client.CollectionDefinition, + newDefinitions []client.CollectionDefinition, +) (client.Collection, error) { + schema := def.Schema + desc := def.Description + txn := mustGetContextTxn(ctx) + + if desc.Name.HasValue() { + exists, err := description.HasCollectionByName(ctx, txn, desc.Name.Value()) + if err != nil { + return nil, err + } + if exists { + return nil, ErrCollectionAlreadyExists + } + } + + existingDefinitions, err := db.getAllActiveDefinitions(ctx) + if err != nil { + return nil, err + } + + schemaByName := map[string]client.SchemaDescription{} + for _, existingDefinition := range existingDefinitions { + schemaByName[existingDefinition.Schema.Name] = existingDefinition.Schema + } + for _, newDefinition := range newDefinitions { + schemaByName[newDefinition.Schema.Name] = newDefinition.Schema + } + + _, err = validateUpdateSchemaFields(schemaByName, client.SchemaDescription{}, schema) + if err != nil { + return nil, err + } + + definitionsByName := map[string]client.CollectionDefinition{} + for _, existingDefinition := range existingDefinitions { + definitionsByName[existingDefinition.GetName()] = existingDefinition + } + for _, newDefinition := range newDefinitions { + definitionsByName[newDefinition.GetName()] = newDefinition + } + err = db.validateNewCollection(def, definitionsByName) + if err != nil { + return nil, err + } + + colSeq, err := db.getSequence(ctx, core.CollectionIDSequenceKey{}) + if err != nil { + return nil, err + } + colID, err := colSeq.next(ctx) + if err != nil { + return nil, err + } + + fieldSeq, err := db.getSequence(ctx, core.NewFieldIDSequenceKey(uint32(colID))) + if err != nil { + return nil, err + } + + desc.ID = uint32(colID) + desc.RootID = desc.ID + + schema, err = description.CreateSchemaVersion(ctx, txn, schema) + if err != nil { + return nil, err + } + desc.SchemaVersionID = schema.VersionID + for _, localField := range desc.Fields { + var fieldID uint64 + if localField.Name == request.DocIDFieldName { + // There is no hard technical requirement for this, we just think it looks nicer + // if the doc id is at the zero index. It makes it look a little nicer in commit + // queries too. + fieldID = 0 + } else { + fieldID, err = fieldSeq.next(ctx) + if err != nil { + return nil, err + } + } + + for i := range desc.Fields { + if desc.Fields[i].Name == localField.Name { + desc.Fields[i].ID = client.FieldID(fieldID) + break + } + } + } + + desc, err = description.SaveCollection(ctx, txn, desc) + if err != nil { + return nil, err + } + + col := db.newCollection(desc, schema) + + for _, index := range desc.Indexes { + if _, err := col.createIndex(ctx, index); err != nil { + return nil, err + } + } + + return db.getCollectionByID(ctx, desc.ID) +} + +func (db *db) patchCollection( + ctx context.Context, + patchString string, +) error { + patch, err := jsonpatch.DecodePatch([]byte(patchString)) + if err != nil { + return err + } + txn := mustGetContextTxn(ctx) + cols, err := description.GetCollections(ctx, txn) + if err != nil { + return err + } + + existingColsByID := map[uint32]client.CollectionDescription{} + for _, col := range cols { + existingColsByID[col.ID] = col + } + + existingDescriptionJson, err := json.Marshal(existingColsByID) + if err != nil { + return err + } + + newDescriptionJson, err := patch.Apply(existingDescriptionJson) + if err != nil { + return err + } + + var newColsByID map[uint32]client.CollectionDescription + decoder := json.NewDecoder(strings.NewReader(string(newDescriptionJson))) + decoder.DisallowUnknownFields() + err = decoder.Decode(&newColsByID) + if err != nil { + return err + } + + err = db.validateCollectionChanges(existingColsByID, newColsByID) + if err != nil { + return err + } + + for _, col := range newColsByID { + _, err := description.SaveCollection(ctx, txn, col) + if err != nil { + return err + } + + existingCol, ok := existingColsByID[col.ID] + if ok { + // Clear any existing migrations in the registry, using this semi-hacky way + // to avoid adding more functions to a public interface that we wish to remove. + + for _, src := range existingCol.CollectionSources() { + if src.Transform.HasValue() { + err = db.LensRegistry().SetMigration(ctx, existingCol.ID, model.Lens{}) + if err != nil { + return err + } + } + } + for _, src := range existingCol.QuerySources() { + if src.Transform.HasValue() { + err = db.LensRegistry().SetMigration(ctx, existingCol.ID, model.Lens{}) + if err != nil { + return err + } + } + } + } + + for _, src := range col.CollectionSources() { + if src.Transform.HasValue() { + err = db.LensRegistry().SetMigration(ctx, col.ID, src.Transform.Value()) + if err != nil { + return err + } + } + } + + for _, src := range col.QuerySources() { + if src.Transform.HasValue() { + err = db.LensRegistry().SetMigration(ctx, col.ID, src.Transform.Value()) + if err != nil { + return err + } + } + } + } + + return db.loadSchema(ctx) +} + +// SetActiveSchemaVersion activates all collection versions with the given schema version, and deactivates all +// those without it (if they share the same schema root). +// +// This will affect all operations interacting with the schema where a schema version is not explicitly +// provided. This includes GQL queries and Collection operations. +// +// It will return an error if the provided schema version ID does not exist. +func (db *db) setActiveSchemaVersion( + ctx context.Context, + schemaVersionID string, +) error { + if schemaVersionID == "" { + return ErrSchemaVersionIDEmpty + } + txn := mustGetContextTxn(ctx) + cols, err := description.GetCollectionsBySchemaVersionID(ctx, txn, schemaVersionID) + if err != nil { + return err + } + + schema, err := description.GetSchemaVersion(ctx, txn, schemaVersionID) + if err != nil { + return err + } + + colsWithRoot, err := description.GetCollectionsBySchemaRoot(ctx, txn, schema.Root) + if err != nil { + return err + } + + colsBySourceID := map[uint32][]client.CollectionDescription{} + colsByID := make(map[uint32]client.CollectionDescription, len(colsWithRoot)) + for _, col := range colsWithRoot { + colsByID[col.ID] = col + + sources := col.CollectionSources() + if len(sources) > 0 { + // For now, we assume that each collection can only have a single source. This will likely need + // to change later. + slice := colsBySourceID[sources[0].SourceCollectionID] + slice = append(slice, col) + colsBySourceID[sources[0].SourceCollectionID] = slice + } + } + + for _, col := range cols { + if col.Name.HasValue() { + // The collection is already active, so we can skip it and continue + continue + } + sources := col.CollectionSources() + + var activeCol client.CollectionDescription + var rootCol client.CollectionDescription + var isActiveFound bool + if len(sources) > 0 { + // For now, we assume that each collection can only have a single source. This will likely need + // to change later. + activeCol, rootCol, isActiveFound = db.getActiveCollectionDown(ctx, colsByID, sources[0].SourceCollectionID) + } + if !isActiveFound { + // We need to look both down and up for the active version - the most recent is not necessarily the active one. + activeCol, isActiveFound = db.getActiveCollectionUp(ctx, colsBySourceID, rootCol.ID) + } + + var newName string + if isActiveFound { + newName = activeCol.Name.Value() + } else { + // If there are no active versions in the collection set, take the name of the schema to be the name of the + // collection. + newName = schema.Name + } + col.Name = immutable.Some(newName) + + _, err = description.SaveCollection(ctx, txn, col) + if err != nil { + return err + } + + if isActiveFound { + // Deactivate the currently active collection by setting its name to none. + activeCol.Name = immutable.None[string]() + _, err = description.SaveCollection(ctx, txn, activeCol) + if err != nil { + return err + } + } + } + + // Load the schema into the clients (e.g. GQL) + return db.loadSchema(ctx) +} + +func (db *db) getActiveCollectionDown( + ctx context.Context, + colsByID map[uint32]client.CollectionDescription, + id uint32, +) (client.CollectionDescription, client.CollectionDescription, bool) { + col, ok := colsByID[id] + if !ok { + return client.CollectionDescription{}, client.CollectionDescription{}, false + } + + if col.Name.HasValue() { + return col, client.CollectionDescription{}, true + } + + sources := col.CollectionSources() + if len(sources) == 0 { + // If a collection has zero sources it is likely the initial collection version, or + // this collection set is currently orphaned (can happen when setting migrations that + // do not yet link all the way back to a non-orphaned set) + return client.CollectionDescription{}, col, false + } + + // For now, we assume that each collection can only have a single source. This will likely need + // to change later. + return db.getActiveCollectionDown(ctx, colsByID, sources[0].SourceCollectionID) +} + +func (db *db) getActiveCollectionUp( + ctx context.Context, + colsBySourceID map[uint32][]client.CollectionDescription, + id uint32, +) (client.CollectionDescription, bool) { + cols, ok := colsBySourceID[id] + if !ok { + // We have reached the top of the set, and have not found an active collection + return client.CollectionDescription{}, false + } + + for _, col := range cols { + if col.Name.HasValue() { + return col, true + } + activeCol, isFound := db.getActiveCollectionUp(ctx, colsBySourceID, col.ID) + if isFound { + return activeCol, isFound + } + } + + return client.CollectionDescription{}, false +} diff --git a/internal/db/definition_validation.go b/internal/db/definition_validation.go new file mode 100644 index 0000000000..988ebeb15c --- /dev/null +++ b/internal/db/definition_validation.go @@ -0,0 +1,585 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package db + +import ( + "context" + "reflect" + + "github.com/sourcenetwork/immutable" + + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/client/request" +) + +var patchCollectionValidators = []func( + map[uint32]client.CollectionDescription, + map[uint32]client.CollectionDescription, +) error{ + validateCollectionNameUnique, + validateSingleVersionActive, + validateSourcesNotRedefined, + validateIndexesNotModified, + validateFieldsNotModified, + validatePolicyNotModified, + validateIDNotZero, + validateIDUnique, + validateIDExists, + validateRootIDNotMutated, + validateSchemaVersionIDNotMutated, + validateCollectionNotRemoved, +} + +var newCollectionValidators = []func( + client.CollectionDefinition, + map[string]client.CollectionDefinition, +) error{ + validateSecondaryFieldsPairUp, + validateRelationPointsToValidKind, + validateSingleSidePrimary, +} + +func (db *db) validateCollectionChanges( + oldColsByID map[uint32]client.CollectionDescription, + newColsByID map[uint32]client.CollectionDescription, +) error { + for _, validators := range patchCollectionValidators { + err := validators(oldColsByID, newColsByID) + if err != nil { + return err + } + } + + return nil +} + +func (db *db) validateNewCollection( + def client.CollectionDefinition, + defsByName map[string]client.CollectionDefinition, +) error { + for _, validators := range newCollectionValidators { + err := validators(def, defsByName) + if err != nil { + return err + } + } + + return nil +} + +func validateRelationPointsToValidKind( + def client.CollectionDefinition, + defsByName map[string]client.CollectionDefinition, +) error { + for _, field := range def.Description.Fields { + if !field.Kind.HasValue() { + continue + } + + if !field.Kind.Value().IsObject() { + continue + } + + underlying := field.Kind.Value().Underlying() + _, ok := defsByName[underlying] + if !ok { + return NewErrFieldKindNotFound(field.Name, underlying) + } + } + + return nil +} + +func validateSecondaryFieldsPairUp( + def client.CollectionDefinition, + defsByName map[string]client.CollectionDefinition, +) error { + for _, field := range def.Description.Fields { + if !field.Kind.HasValue() { + continue + } + + if !field.Kind.Value().IsObject() { + continue + } + + if !field.RelationName.HasValue() { + continue + } + + _, hasSchemaField := def.Schema.GetFieldByName(field.Name) + if hasSchemaField { + continue + } + + underlying := field.Kind.Value().Underlying() + otherDef, ok := defsByName[underlying] + if !ok { + continue + } + + if len(otherDef.Description.Fields) == 0 { + // Views/embedded objects do not require both sides of the relation to be defined. + continue + } + + otherField, ok := otherDef.Description.GetFieldByRelation( + field.RelationName.Value(), + def.GetName(), + field.Name, + ) + if !ok { + return NewErrRelationMissingField(underlying, field.RelationName.Value()) + } + + _, ok = otherDef.Schema.GetFieldByName(otherField.Name) + if !ok { + // This secondary is paired with another secondary, which is invalid + return NewErrRelationMissingField(underlying, field.RelationName.Value()) + } + } + + return nil +} + +func validateSingleSidePrimary( + def client.CollectionDefinition, + defsByName map[string]client.CollectionDefinition, +) error { + for _, field := range def.Description.Fields { + if !field.Kind.HasValue() { + continue + } + + if !field.Kind.Value().IsObject() { + continue + } + + if !field.RelationName.HasValue() { + continue + } + + _, hasSchemaField := def.Schema.GetFieldByName(field.Name) + if !hasSchemaField { + // This is a secondary field and thus passes this rule + continue + } + + underlying := field.Kind.Value().Underlying() + otherDef, ok := defsByName[underlying] + if !ok { + continue + } + + otherField, ok := otherDef.Description.GetFieldByRelation( + field.RelationName.Value(), + def.GetName(), + field.Name, + ) + if !ok { + // This must be a one-sided relation, in which case it passes this rule + continue + } + + _, ok = otherDef.Schema.GetFieldByName(otherField.Name) + if ok { + // This primary is paired with another primary, which is invalid + return ErrMultipleRelationPrimaries + } + } + + return nil +} + +func validateCollectionNameUnique( + oldColsByID map[uint32]client.CollectionDescription, + newColsByID map[uint32]client.CollectionDescription, +) error { + names := map[string]struct{}{} + for _, col := range newColsByID { + if !col.Name.HasValue() { + continue + } + + if _, ok := names[col.Name.Value()]; ok { + return NewErrCollectionAlreadyExists(col.Name.Value()) + } + names[col.Name.Value()] = struct{}{} + } + + return nil +} + +func validateSingleVersionActive( + oldColsByID map[uint32]client.CollectionDescription, + newColsByID map[uint32]client.CollectionDescription, +) error { + rootsWithActiveCol := map[uint32]struct{}{} + for _, col := range newColsByID { + if !col.Name.HasValue() { + continue + } + + if _, ok := rootsWithActiveCol[col.RootID]; ok { + return NewErrMultipleActiveCollectionVersions(col.Name.Value(), col.RootID) + } + rootsWithActiveCol[col.RootID] = struct{}{} + } + + return nil +} + +// validateSourcesNotRedefined specifies the limitations on how the collection sources +// can be mutated. +// +// Currently new sources cannot be added, existing cannot be removed, and CollectionSources +// cannot be redirected to other collections. +func validateSourcesNotRedefined( + oldColsByID map[uint32]client.CollectionDescription, + newColsByID map[uint32]client.CollectionDescription, +) error { + for _, newCol := range newColsByID { + oldCol, ok := oldColsByID[newCol.ID] + if !ok { + continue + } + + newColSources := newCol.CollectionSources() + oldColSources := oldCol.CollectionSources() + + if len(newColSources) != len(oldColSources) { + return NewErrCollectionSourcesCannotBeAddedRemoved(newCol.ID) + } + + for i := range newColSources { + if newColSources[i].SourceCollectionID != oldColSources[i].SourceCollectionID { + return NewErrCollectionSourceIDMutated( + newCol.ID, + newColSources[i].SourceCollectionID, + oldColSources[i].SourceCollectionID, + ) + } + } + + newQuerySources := newCol.QuerySources() + oldQuerySources := oldCol.QuerySources() + + if len(newQuerySources) != len(oldQuerySources) { + return NewErrCollectionSourcesCannotBeAddedRemoved(newCol.ID) + } + } + + return nil +} + +func validateIndexesNotModified( + oldColsByID map[uint32]client.CollectionDescription, + newColsByID map[uint32]client.CollectionDescription, +) error { + for _, newCol := range newColsByID { + oldCol, ok := oldColsByID[newCol.ID] + if !ok { + continue + } + + // DeepEqual is temporary, as this validation is temporary + if !reflect.DeepEqual(oldCol.Indexes, newCol.Indexes) { + return NewErrCollectionIndexesCannotBeMutated(newCol.ID) + } + } + + return nil +} + +func validateFieldsNotModified( + oldColsByID map[uint32]client.CollectionDescription, + newColsByID map[uint32]client.CollectionDescription, +) error { + for _, newCol := range newColsByID { + oldCol, ok := oldColsByID[newCol.ID] + if !ok { + continue + } + + // DeepEqual is temporary, as this validation is temporary + if !reflect.DeepEqual(oldCol.Fields, newCol.Fields) { + return NewErrCollectionFieldsCannotBeMutated(newCol.ID) + } + } + + return nil +} + +func validatePolicyNotModified( + oldColsByID map[uint32]client.CollectionDescription, + newColsByID map[uint32]client.CollectionDescription, +) error { + for _, newCol := range newColsByID { + oldCol, ok := oldColsByID[newCol.ID] + if !ok { + continue + } + + // DeepEqual is temporary, as this validation is temporary + if !reflect.DeepEqual(oldCol.Policy, newCol.Policy) { + return NewErrCollectionPolicyCannotBeMutated(newCol.ID) + } + } + + return nil +} + +func validateIDNotZero( + oldColsByID map[uint32]client.CollectionDescription, + newColsByID map[uint32]client.CollectionDescription, +) error { + for _, newCol := range newColsByID { + if newCol.ID == 0 { + return ErrCollectionIDCannotBeZero + } + } + + return nil +} + +func validateIDUnique( + oldColsByID map[uint32]client.CollectionDescription, + newColsByID map[uint32]client.CollectionDescription, +) error { + colIds := map[uint32]struct{}{} + for _, newCol := range newColsByID { + if _, ok := colIds[newCol.ID]; ok { + return NewErrCollectionIDAlreadyExists(newCol.ID) + } + colIds[newCol.ID] = struct{}{} + } + + return nil +} + +func validateIDExists( + oldColsByID map[uint32]client.CollectionDescription, + newColsByID map[uint32]client.CollectionDescription, +) error { + for _, newCol := range newColsByID { + if _, ok := oldColsByID[newCol.ID]; !ok { + return NewErrAddCollectionIDWithPatch(newCol.ID) + } + } + + return nil +} + +func validateRootIDNotMutated( + oldColsByID map[uint32]client.CollectionDescription, + newColsByID map[uint32]client.CollectionDescription, +) error { + for _, newCol := range newColsByID { + oldCol, ok := oldColsByID[newCol.ID] + if !ok { + continue + } + + if newCol.RootID != oldCol.RootID { + return NewErrCollectionRootIDCannotBeMutated(newCol.ID) + } + } + + return nil +} + +func validateSchemaVersionIDNotMutated( + oldColsByID map[uint32]client.CollectionDescription, + newColsByID map[uint32]client.CollectionDescription, +) error { + for _, newCol := range newColsByID { + oldCol, ok := oldColsByID[newCol.ID] + if !ok { + continue + } + + if newCol.SchemaVersionID != oldCol.SchemaVersionID { + return NewErrCollectionSchemaVersionIDCannotBeMutated(newCol.ID) + } + } + + return nil +} + +func validateCollectionNotRemoved( + oldColsByID map[uint32]client.CollectionDescription, + newColsByID map[uint32]client.CollectionDescription, +) error { +oldLoop: + for _, oldCol := range oldColsByID { + for _, newCol := range newColsByID { + // It is not enough to just match by the map index, in case the index does not pair + // up with the ID (this can happen if a user moves the collection within the map) + if newCol.ID == oldCol.ID { + continue oldLoop + } + } + + return NewErrCollectionsCannotBeDeleted(oldCol.ID) + } + + return nil +} + +// validateCollectionDefinitionPolicyDesc validates that the policy definition is valid, beyond syntax. +// +// Ensures that the information within the policy definition makes sense, +// this function might also make relevant remote calls using the acp system. +func (db *db) validateCollectionDefinitionPolicyDesc( + ctx context.Context, + policyDesc immutable.Option[client.PolicyDescription], +) error { + if !policyDesc.HasValue() { + // No policy validation needed, whether acp exists or not doesn't matter. + return nil + } + + // If there is a policy specified, but the database does not have + // acp enabled/available return an error, database must have an acp available + // to enable access control (inorder to adhere to the policy specified). + if !db.acp.HasValue() { + return ErrCanNotHavePolicyWithoutACP + } + + // If we have the policy specified on the collection, and acp is available/enabled, + // then using the acp system we need to ensure the policy id specified + // actually exists as a policy, and the resource name exists on that policy + // and that the resource is a valid DPI. + return db.acp.Value().ValidateResourceExistsOnValidDPI( + ctx, + policyDesc.Value().ID, + policyDesc.Value().ResourceName, + ) +} + +// validateUpdateSchema validates that the given schema description is a valid update. +// +// Will return true if the given description differs from the current persisted state of the +// schema. Will return an error if it fails validation. +func (db *db) validateUpdateSchema( + existingDescriptionsByName map[string]client.SchemaDescription, + proposedDescriptionsByName map[string]client.SchemaDescription, + proposedDesc client.SchemaDescription, +) (bool, error) { + if proposedDesc.Name == "" { + return false, ErrSchemaNameEmpty + } + + existingDesc, collectionExists := existingDescriptionsByName[proposedDesc.Name] + if !collectionExists { + return false, NewErrAddCollectionWithPatch(proposedDesc.Name) + } + + if proposedDesc.Root != existingDesc.Root { + return false, NewErrSchemaRootDoesntMatch( + proposedDesc.Name, + existingDesc.Root, + proposedDesc.Root, + ) + } + + if proposedDesc.Name != existingDesc.Name { + // There is actually little reason to not support this atm besides controlling the surface area + // of the new feature. Changing this should not break anything, but it should be tested first. + return false, NewErrCannotModifySchemaName(existingDesc.Name, proposedDesc.Name) + } + + if proposedDesc.VersionID != "" && proposedDesc.VersionID != existingDesc.VersionID { + // If users specify this it will be overwritten, an error is preferred to quietly ignoring it. + return false, ErrCannotSetVersionID + } + + hasChangedFields, err := validateUpdateSchemaFields(proposedDescriptionsByName, existingDesc, proposedDesc) + if err != nil { + return hasChangedFields, err + } + + return hasChangedFields, err +} + +func validateUpdateSchemaFields( + descriptionsByName map[string]client.SchemaDescription, + existingDesc client.SchemaDescription, + proposedDesc client.SchemaDescription, +) (bool, error) { + hasChanged := false + existingFieldsByName := map[string]client.SchemaFieldDescription{} + existingFieldIndexesByName := map[string]int{} + for i, field := range existingDesc.Fields { + existingFieldIndexesByName[field.Name] = i + existingFieldsByName[field.Name] = field + } + + newFieldNames := map[string]struct{}{} + for proposedIndex, proposedField := range proposedDesc.Fields { + existingField, fieldAlreadyExists := existingFieldsByName[proposedField.Name] + + // If the field is new, then the collection has changed + hasChanged = hasChanged || !fieldAlreadyExists + + if !fieldAlreadyExists && proposedField.Kind.IsObject() { + _, relatedDescFound := descriptionsByName[proposedField.Kind.Underlying()] + + if !relatedDescFound { + return false, NewErrFieldKindNotFound(proposedField.Name, proposedField.Kind.Underlying()) + } + + if proposedField.Kind.IsObject() && !proposedField.Kind.IsArray() { + idFieldName := proposedField.Name + request.RelatedObjectID + idField, idFieldFound := proposedDesc.GetFieldByName(idFieldName) + if idFieldFound { + if idField.Kind != client.FieldKind_DocID { + return false, NewErrRelationalFieldIDInvalidType(idField.Name, client.FieldKind_DocID, idField.Kind) + } + } + } + } + + if proposedField.Kind.IsObjectArray() { + return false, NewErrSecondaryFieldOnSchema(proposedField.Name) + } + + if _, isDuplicate := newFieldNames[proposedField.Name]; isDuplicate { + return false, NewErrDuplicateField(proposedField.Name) + } + + if fieldAlreadyExists && proposedField != existingField { + return false, NewErrCannotMutateField(proposedField.Name) + } + + if existingIndex := existingFieldIndexesByName[proposedField.Name]; fieldAlreadyExists && + proposedIndex != existingIndex { + return false, NewErrCannotMoveField(proposedField.Name, proposedIndex, existingIndex) + } + + if !proposedField.Typ.IsSupportedFieldCType() { + return false, client.NewErrInvalidCRDTType(proposedField.Name, proposedField.Typ.String()) + } + + if !proposedField.Typ.IsCompatibleWith(proposedField.Kind) { + return false, client.NewErrCRDTKindMismatch(proposedField.Typ.String(), proposedField.Kind.String()) + } + + newFieldNames[proposedField.Name] = struct{}{} + } + + for _, field := range existingDesc.Fields { + if _, stillExists := newFieldNames[field.Name]; !stillExists { + return false, NewErrCannotDeleteField(field.Name) + } + } + return hasChanged, nil +} diff --git a/internal/db/schema.go b/internal/db/schema.go index eca05f2a1f..8c0ba074dc 100644 --- a/internal/db/schema.go +++ b/internal/db/schema.go @@ -23,6 +23,7 @@ import ( "github.com/sourcenetwork/immutable" "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/internal/core" "github.com/sourcenetwork/defradb/internal/db/description" ) @@ -323,3 +324,203 @@ func containsLetter(s string) bool { } return false } + +// updateSchema updates the persisted schema description matching the name of the given +// description, to the values in the given description. +// +// It will validate the given description using [validateUpdateSchema] before updating it. +// +// The schema (including the schema version ID) will only be updated if any changes have actually +// been made, if the given description matches the current persisted description then no changes will be +// applied. +func (db *db) updateSchema( + ctx context.Context, + existingSchemaByName map[string]client.SchemaDescription, + proposedDescriptionsByName map[string]client.SchemaDescription, + schema client.SchemaDescription, + migration immutable.Option[model.Lens], + setAsActiveVersion bool, +) error { + hasChanged, err := db.validateUpdateSchema( + existingSchemaByName, + proposedDescriptionsByName, + schema, + ) + if err != nil { + return err + } + + if !hasChanged { + return nil + } + + for _, field := range schema.Fields { + if field.Kind.IsObject() && !field.Kind.IsArray() { + idFieldName := field.Name + "_id" + if _, ok := schema.GetFieldByName(idFieldName); !ok { + schema.Fields = append(schema.Fields, client.SchemaFieldDescription{ + Name: idFieldName, + Kind: client.FieldKind_DocID, + }) + } + } + } + + for i, field := range schema.Fields { + if field.Typ == client.NONE_CRDT { + // If no CRDT Type has been provided, default to LWW_REGISTER. + field.Typ = client.LWW_REGISTER + schema.Fields[i] = field + } + } + + txn := mustGetContextTxn(ctx) + previousVersionID := schema.VersionID + schema, err = description.CreateSchemaVersion(ctx, txn, schema) + if err != nil { + return err + } + + // After creating the new schema version, we need to create new collection versions for + // any collection using the previous version. These will be inactive unless [setAsActiveVersion] + // is true. + + cols, err := description.GetCollectionsBySchemaVersionID(ctx, txn, previousVersionID) + if err != nil { + return err + } + + colSeq, err := db.getSequence(ctx, core.CollectionIDSequenceKey{}) + if err != nil { + return err + } + + for _, col := range cols { + previousID := col.ID + + existingCols, err := description.GetCollectionsBySchemaVersionID(ctx, txn, schema.VersionID) + if err != nil { + return err + } + + // The collection version may exist before the schema version was created locally. This is + // because migrations for the globally known schema version may have been registered locally + // (typically to handle documents synced over P2P at higher versions) before the local schema + // was updated. We need to check for them now, and update them instead of creating new ones + // if they exist. + var isExistingCol bool + existingColLoop: + for _, existingCol := range existingCols { + sources := existingCol.CollectionSources() + for _, source := range sources { + // Make sure that this collection is the parent of the current [col], and not part of + // another collection set that happens to be using the same schema. + if source.SourceCollectionID == previousID { + if existingCol.RootID == client.OrphanRootID { + existingCol.RootID = col.RootID + } + + fieldSeq, err := db.getSequence(ctx, core.NewFieldIDSequenceKey(existingCol.RootID)) + if err != nil { + return err + } + + for _, globalField := range schema.Fields { + var fieldID client.FieldID + // We must check the source collection if the field already exists, and take its ID + // from there, otherwise the field must be generated by the sequence. + existingField, ok := col.GetFieldByName(globalField.Name) + if ok { + fieldID = existingField.ID + } else { + nextFieldID, err := fieldSeq.next(ctx) + if err != nil { + return err + } + fieldID = client.FieldID(nextFieldID) + } + + existingCol.Fields = append( + existingCol.Fields, + client.CollectionFieldDescription{ + Name: globalField.Name, + ID: fieldID, + }, + ) + } + existingCol, err = description.SaveCollection(ctx, txn, existingCol) + if err != nil { + return err + } + isExistingCol = true + break existingColLoop + } + } + } + + if !isExistingCol { + colID, err := colSeq.next(ctx) + if err != nil { + return err + } + + fieldSeq, err := db.getSequence(ctx, core.NewFieldIDSequenceKey(col.RootID)) + if err != nil { + return err + } + + // Create any new collections without a name (inactive), if [setAsActiveVersion] is true + // they will be activated later along with any existing collection versions. + col.Name = immutable.None[string]() + col.ID = uint32(colID) + col.SchemaVersionID = schema.VersionID + col.Sources = []any{ + &client.CollectionSource{ + SourceCollectionID: previousID, + Transform: migration, + }, + } + + for _, globalField := range schema.Fields { + _, exists := col.GetFieldByName(globalField.Name) + if !exists { + fieldID, err := fieldSeq.next(ctx) + if err != nil { + return err + } + + col.Fields = append( + col.Fields, + client.CollectionFieldDescription{ + Name: globalField.Name, + ID: client.FieldID(fieldID), + }, + ) + } + } + + _, err = description.SaveCollection(ctx, txn, col) + if err != nil { + return err + } + + if migration.HasValue() { + err = db.LensRegistry().SetMigration(ctx, col.ID, migration.Value()) + if err != nil { + return err + } + } + } + } + + if setAsActiveVersion { + // activate collection versions using the new schema ID. This call must be made after + // all new collection versions have been saved. + err = db.setActiveSchemaVersion(ctx, schema.VersionID) + if err != nil { + return err + } + } + + return nil +}