Skip to content

Commit

Permalink
Refactor data storage for RecordsWrite (#364)
Browse files Browse the repository at this point in the history
* Refactor data storage for RecordsWrite

* Clean up DwnErrorCodes

* Lint

* PR comments

* Fix readme

* requester -> author

* Update src/interfaces/records/handlers/records-delete.ts

Co-authored-by: Henry Tsai <[email protected]>

* Update src/interfaces/records/handlers/records-write.ts

Co-authored-by: Henry Tsai <[email protected]>

---------

Co-authored-by: Henry Tsai <[email protected]>
  • Loading branch information
Diane Huxley and thehenrytsai authored May 23, 2023
1 parent 051e04e commit 1cb9d79
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 167 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Decentralized Web Node (DWN) SDK

Code Coverage
![Statements](https://img.shields.io/badge/statements-94.71%25-brightgreen.svg?style=flat) ![Branches](https://img.shields.io/badge/branches-94.16%25-brightgreen.svg?style=flat) ![Functions](https://img.shields.io/badge/functions-93.15%25-brightgreen.svg?style=flat) ![Lines](https://img.shields.io/badge/lines-94.71%25-brightgreen.svg?style=flat)
![Statements](https://img.shields.io/badge/statements-94.65%25-brightgreen.svg?style=flat) ![Branches](https://img.shields.io/badge/branches-94.16%25-brightgreen.svg?style=flat) ![Functions](https://img.shields.io/badge/functions-93.06%25-brightgreen.svg?style=flat) ![Lines](https://img.shields.io/badge/lines-94.65%25-brightgreen.svg?style=flat)

## Introduction

Expand Down
5 changes: 3 additions & 2 deletions src/core/dwn-error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ export enum DwnErrorCode {
RecordsProtocolsDerivationSchemeMissingProtocol = 'RecordsProtocolsDerivationSchemeMissingProtocol',
RecordsSchemasDerivationSchemeMissingSchema = 'RecordsSchemasDerivationSchemeMissingSchema',
RecordsWriteGetEntryIdUndefinedAuthor = 'RecordsWriteGetEntryIdUndefinedAuthor',
RecordsWriteDataCidMismatch = 'RecordsWriteDataCidMismatch',
RecordsWriteDataSizeMismatch = 'RecordsWriteDataSizeMismatch',
RecordsWriteMissingData = 'RecordsWriterMissingData',
RecordsWriteMissingDataStream = 'RecordsWriteMissingDataStream',
RecordsWriteValidateIntegrityEncryptionCidMismatch = 'RecordsWriteValidateIntegrityEncryptionCidMismatch',
Secp256k1KeyNotValid = 'Secp256k1KeyNotValid',
StorageControllerDataCidMismatch = 'StorageControllerDataCidMismatch',
StorageControllerDataSizeMismatch = 'StorageControllerDataSizeMismatch',
UrlProtocolNotNormalized = 'UrlProtocolNotNormalized',
UrlProtocolNotNormalizable = 'UrlProtocolNotNormalizable',
UrlSchemaNotNormalized = 'UrlSchemaNotNormalized',
Expand Down
16 changes: 8 additions & 8 deletions src/dwn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { EventLog } from './event-log/event-log.js';
import type { MessageStore } from './store/message-store.js';
import type { MethodHandler } from './interfaces/types.js';
import type { Readable } from 'readable-stream';
import type { RecordsWriteHandlerOptions } from './interfaces/records/handlers/records-write.js';
import type { TenantGate } from './core/tenant-gate.js';
import type { MessagesGetMessage, MessagesGetReply } from './interfaces/messages/types.js';
import type { RecordsReadMessage, RecordsReadReply, RecordsWriteMessage } from './interfaces/records/types.js';
Expand All @@ -18,7 +19,6 @@ import { MessagesGetHandler } from './interfaces/messages/handlers/messages-get.
import { MessageStoreLevel } from './store/message-store-level.js';
import { ProtocolsConfigureHandler } from './interfaces/protocols/handlers/protocols-configure.js';
import { ProtocolsQueryHandler } from './interfaces/protocols/handlers/protocols-query.js';
import { PrunedInitialRecordsWriteHandler } from './interfaces/records/handlers/pruned-initial-records-write.js';
import { RecordsDeleteHandler } from './interfaces/records/handlers/records-delete.js';
import { RecordsQueryHandler } from './interfaces/records/handlers/records-query.js';
import { RecordsReadHandler } from './interfaces/records/handlers/records-read.js';
Expand All @@ -33,9 +33,6 @@ export class Dwn {
private eventLog: EventLog;
private tenantGate: TenantGate;

// used by sync
private prunedInitialRecordsWriteHandler: PrunedInitialRecordsWriteHandler;

private constructor(config: DwnConfig) {
this.didResolver = config.didResolver!;
this.messageStore = config.messageStore!;
Expand All @@ -55,8 +52,6 @@ export class Dwn {
[DwnInterfaceName.Records + DwnMethodName.Read] : new RecordsReadHandler(this.didResolver, this.messageStore, this.dataStore),
[DwnInterfaceName.Records + DwnMethodName.Write] : new RecordsWriteHandler(this.didResolver, this.messageStore, this.dataStore, this.eventLog),
};

this.prunedInitialRecordsWriteHandler = new PrunedInitialRecordsWriteHandler(this.didResolver, this.messageStore, this.dataStore, this.eventLog);
}

/**
Expand Down Expand Up @@ -137,13 +132,18 @@ export class Dwn {
/**
* Privileged method for writing a pruned initial `RecordsWrite` to a DWN without needing to supply associated data.
*/
public async synchronizePrunedInitialRecordsWrite(tenant: string, message: RecordsWriteMessage): Promise<MessagesGetReply> {
public async synchronizePrunedInitialRecordsWrite(tenant: string, message: RecordsWriteMessage): Promise<MessageReply> {
const errorMessageReply = await this.preprocessingChecks(tenant, message, DwnInterfaceName.Records, DwnMethodName.Write);
if (errorMessageReply !== undefined) {
return errorMessageReply;
}

const methodHandlerReply = await this.prunedInitialRecordsWriteHandler.handle({ tenant, message });
const options: RecordsWriteHandlerOptions = {
skipDataStorage: true,
};

const handler = new RecordsWriteHandler(this.didResolver, this.messageStore, this.dataStore, this.eventLog);
const methodHandlerReply = await handler.handle({ tenant, message, options });
return methodHandlerReply;
}

Expand Down
6 changes: 4 additions & 2 deletions src/interfaces/protocols/handlers/protocols-configure.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export class ProtocolsConfigureHandler implements MethodHandler {
public async handle({
tenant,
message,
dataStream
dataStream: _dataStream
}: {tenant: string, message: ProtocolsConfigureMessage, dataStream: _Readable.Readable}): Promise<MessageReply> {

let protocolsConfigure: ProtocolsConfigure;
Expand Down Expand Up @@ -55,7 +55,9 @@ export class ProtocolsConfigureHandler implements MethodHandler {
if (incomingMessageIsNewest) {
const indexes = ProtocolsConfigureHandler.constructProtocolsConfigureIndexes(protocolsConfigure);

await StorageController.put(this.messageStore, this.dataStore, this.eventLog, tenant, message, indexes, dataStream);
const messageCid = await Message.getCid(message);
await this.messageStore.put(tenant, message, indexes);
await this.eventLog.append(tenant, messageCid);

messageReply = new MessageReply({
status: { code: 202, detail: 'Accepted' }
Expand Down
40 changes: 0 additions & 40 deletions src/interfaces/records/handlers/pruned-initial-records-write.ts

This file was deleted.

111 changes: 72 additions & 39 deletions src/interfaces/records/handlers/records-write.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
import type { EventLog } from '../../../event-log/event-log.js';
import type { MethodHandler } from '../../types.js';
import type { Readable } from 'readable-stream';
import type { RecordsWriteMessage } from '../types.js';
import type { BaseMessage, TimestampedMessage } from '../../../core/types.js';
import type { TimestampedMessage } from '../../../core/types.js';
import type { DataStore, DidResolver, MessageStore } from '../../../index.js';

import { authenticate } from '../../../core/auth.js';
import { deleteAllOlderMessagesButKeepInitialWrite } from '../records-interface.js';
import { MessageReply } from '../../../core/message-reply.js';
import { RecordsWrite } from '../messages/records-write.js';
import { StorageController } from '../../../store/storage-controller.js';
import { DwnError, DwnErrorCode } from '../../../core/dwn-error.js';
import { DwnInterfaceName, Message } from '../../../core/message.js';

export type RecordsWriteHandlerOptions = {
skipDataStorage?: boolean; // used for DWN sync
};

export class RecordsWriteHandler implements MethodHandler {

constructor(private didResolver: DidResolver, private messageStore: MessageStore,private dataStore: DataStore, private eventLog: EventLog) { }
constructor(private didResolver: DidResolver, private messageStore: MessageStore, private dataStore: DataStore, private eventLog: EventLog) { }

public async handle({
tenant,
message,
options,
dataStream
}: { tenant: string, message: RecordsWriteMessage, dataStream?: _Readable.Readable}): Promise<MessageReply> {

}: { tenant: string, message: RecordsWriteMessage, options?: RecordsWriteHandlerOptions, dataStream?: _Readable.Readable}): Promise<MessageReply> {
let recordsWrite: RecordsWrite;
try {
recordsWrite = await RecordsWrite.parse(message);
Expand Down Expand Up @@ -77,21 +79,26 @@ export class RecordsWriteHandler implements MethodHandler {
const indexes = await constructRecordsWriteIndexes(recordsWrite, isLatestBaseState);

try {
this.validateUndefinedDataStream(dataStream, newestExistingMessage, message);

await this.storeMessage(this.messageStore, this.dataStore, this.eventLog, tenant, message, indexes, dataStream);
// try to store data, unless options explicitly say to skip storage
if (options === undefined || !options.skipDataStorage) {
await this.putData(tenant, message, dataStream, newestExistingMessage);
}
} catch (error) {
const e = error as any;
if (e.code === DwnErrorCode.StorageControllerDataCidMismatch ||
e.code === DwnErrorCode.StorageControllerDataSizeMismatch ||
e.code === DwnErrorCode.RecordsWriteMissingDataStream) {
if (e.code === DwnErrorCode.RecordsWriteMissingDataStream ||
e.code === DwnErrorCode.RecordsWriteMissingData ||
e.code === DwnErrorCode.RecordsWriteDataCidMismatch ||
e.code === DwnErrorCode.RecordsWriteDataSizeMismatch) {
return MessageReply.fromError(error, 400);
}

// else throw
throw error;
}

await this.messageStore.put(tenant, message, indexes);
await this.eventLog.append(tenant, await Message.getCid(message));

const messageReply = new MessageReply({
status: { code: 202, detail: 'Accepted' }
});
Expand All @@ -103,41 +110,67 @@ export class RecordsWriteHandler implements MethodHandler {
};

/**
* Further validation if data stream is undefined.
* NOTE: if data stream is not be provided but `dataCid` is provided,
* then we need to make sure that the existing record state is referencing the same data as the incoming message.
* Without this check will lead to unauthorized access of data (https://github.com/TBD54566975/dwn-sdk-js/issues/359)
* Puts the given data in storage unless tenant already has that data for the given recordId
*
* @throws {DwnError} with `DwnErrorCode.RecordsWriteMissingDataStream`
* if `dataStream` is absent AND the `dataCid` does not match the current data for the given recordId
* @throws {DwnError} with `DwnErrorCode.RecordsWriteMissingData`
* if `dataStream` is absent AND dataStore does not contain the given `dataCid`
* @throws {DwnError} with `DwnErrorCode.RecordsWriteDataCidMismatch`
* if the data stream resulted in a data CID that mismatches with `dataCid` in the given message
* @throws {DwnError} with `DwnErrorCode.RecordsWriteDataSizeMismatch`
* if `dataSize` in `descriptor` given mismatches the actual data size
*/
protected validateUndefinedDataStream(
dataStream: _Readable.Readable | undefined,
newestExistingMessage: TimestampedMessage | undefined,
incomingMessage: RecordsWriteMessage): void {
if (dataStream === undefined && incomingMessage.descriptor.dataCid !== undefined) {
if (newestExistingMessage?.descriptor.dataCid !== incomingMessage.descriptor.dataCid) {
public async putData(
tenant: string,
message: RecordsWriteMessage,
dataStream?: _Readable.Readable,
newestExistingMessage?: TimestampedMessage
): Promise<void> {
let result: { dataCid: string, dataSize: number };
const messageCid = await Message.getCid(message);

if (dataStream === undefined) {
// dataStream must be included if message contains a new dataCid
if (newestExistingMessage?.descriptor.dataCid !== message.descriptor.dataCid) {
throw new DwnError(
DwnErrorCode.RecordsWriteMissingDataStream,
'Data stream is not provided.'
);
}

const associateResult = await this.dataStore.associate(tenant, messageCid, message.descriptor.dataCid);
if (associateResult === undefined) {
throw new DwnError(DwnErrorCode.RecordsWriteMissingData, `Unable to associate dataCid ${message.descriptor.dataCid} ` +
`to messageCid ${messageCid} because dataStream was not provided and data was not found in dataStore`);
}

result = associateResult;
} else {
result = await this.dataStore.put(tenant, messageCid, message.descriptor.dataCid, dataStream);
}
}

/**
* Stores the given message and its data in the underlying database(s).
* NOTE: this method was created to allow a child class to override the default behavior for sync feature to work:
* ie. allow `RecordsWrite` to be written even if data stream is not provided to handle the case that:
* a `RecordsDelete` has happened, as a result a DWN would have pruned the data associated with the original write.
* This approach avoids the need to duplicate the entire handler.
*/
protected async storeMessage(
messageStore: MessageStore,
dataStore: DataStore,
eventLog: EventLog,
tenant: string,
message: BaseMessage,
indexes: Record<string, string>,
dataStream?: Readable): Promise<void> {
await StorageController.put(messageStore, dataStore, eventLog, tenant, message, indexes, dataStream);
// verify that given dataSize matches size of actual data
if (message.descriptor.dataSize !== result.dataSize) {
// there is an opportunity to improve here: handle the edge case of if the delete fails...
await this.dataStore.delete(tenant, messageCid, message.descriptor.dataCid);

throw new DwnError(
DwnErrorCode.RecordsWriteDataSizeMismatch,
`actual data size ${result.dataSize} bytes does not match dataSize in descriptor: ${message.descriptor.dataSize}`
);
}

// verify that given dataCid matches CID of actual data
if (message.descriptor.dataCid !== result.dataCid) {
// there is an opportunity to improve here: handle the edge cae of if the delete fails...
await this.dataStore.delete(tenant, messageCid, message.descriptor.dataCid);

throw new DwnError(
DwnErrorCode.RecordsWriteDataCidMismatch,
`actual data CID ${result.dataCid} does not match dataCid in descriptor: ${message.descriptor.dataCid}`
);
}
}
}

Expand Down
65 changes: 1 addition & 64 deletions src/store/storage-controller.ts
Original file line number Diff line number Diff line change
@@ -1,79 +1,16 @@
import type { DataStore } from './data-store.js';
import type { EventLog } from '../event-log/event-log.js';
import type { MessageStore } from './message-store.js';
import type { Readable } from 'readable-stream';
import type { BaseMessage, Filter } from '../core/types.js';

import { DwnConstant } from '../core/dwn-constant.js';
import { Message } from '../core/message.js';
import type { RecordsWriteMessage } from '../index.js';
import { DataStream, Encoder } from '../index.js';
import { DwnError, DwnErrorCode } from '../core/dwn-error.js';

/**
* A class that provides an abstraction for the usage of BlockStore and DataStore.
* A class that provides an abstraction for the usage of MessageStore, DataStore, and EventLog.
*/
export class StorageController {
/**
* Puts the given message and data in storage.
* @throws {DwnError} with `DwnErrorCode.StorageControllerDataCidMismatch`
* if the data stream resulted in a data CID that mismatches with `dataCid` in the given message
* @throws {DwnError} with `DwnErrorCode.StorageControllerDataSizeMismatch`
* if `dataSize` in `descriptor` given mismatches the actual data size
*/
public static async put(
messageStore: MessageStore,
dataStore: DataStore,
eventLog: EventLog,
tenant: string,
message: BaseMessage,
indexes: Record<string, string>,
dataStream?: Readable
): Promise<void> {
const messageCid = await Message.getCid(message);

// if `dataCid` is given, it means there is corresponding data associated with this message
if (message.descriptor.dataCid !== undefined) {
let result;
if (dataStream === undefined) {
// but NOTE: it is possible that a data stream is not given even when `dataCid` is given, for instance,
// a subsequent RecordsWrite that changes the `published` property, but the data hasn't changed,
// in this case requiring re-uploading of the data is extremely inefficient so we allow omission of data stream as a utility function,
// but this method assumes checks for the appropriate conditions already took place prior to calling this method.
result = await dataStore.associate(tenant, messageCid, message.descriptor.dataCid);
} else {
result = await dataStore.put(tenant, messageCid, message.descriptor.dataCid, dataStream);
}

// MUST verify that the size of the actual data matches with the given `dataSize`
// if data size is wrong, delete the data we just stored
if (message.descriptor.dataSize !== result.dataSize) {
// there is an opportunity to improve here: handle the edge cae of if the delete fails...
await dataStore.delete(tenant, messageCid, message.descriptor.dataCid);

throw new DwnError(
DwnErrorCode.StorageControllerDataSizeMismatch,
`actual data size ${result.dataSize} bytes does not match dataSize in descriptor: ${message.descriptor.dataSize}`
);
}

// MUST verify that the CID of the actual data matches with the given `dataCid`
// if data CID is wrong, delete the data we just stored
if (message.descriptor.dataCid !== result.dataCid) {
// there is an opportunity to improve here: handle the edge cae of if the delete fails...
await dataStore.delete(tenant, messageCid, message.descriptor.dataCid);

throw new DwnError(
DwnErrorCode.StorageControllerDataCidMismatch,
`actual data CID ${result.dataCid} does not match dataCid in descriptor: ${message.descriptor.dataCid}`
);
}
}

await messageStore.put(tenant, message, indexes);
await eventLog.append(tenant, messageCid);
}

public static async query(
messageStore: MessageStore,
dataStore: DataStore,
Expand Down
Loading

0 comments on commit 1cb9d79

Please sign in to comment.