From 6ed76662b1ea5e8daf14483b4229acd9ab01f6c2 Mon Sep 17 00:00:00 2001 From: Tim Holmes-Mitra Date: Tue, 5 Nov 2024 17:26:52 +0000 Subject: [PATCH] feat(vms): update all ops to use retry client and effects --- client-vms/src/util.ts | 33 +++-- client-vms/src/vm/operation/delete-values.ts | 85 ++++++++---- client-vms/src/vm/operation/invoke-compute.ts | 91 ++++++++++--- .../src/vm/operation/overwrite-permissions.ts | 84 +++++++++--- .../src/vm/operation/query-pool-status.ts | 53 +++++++- .../vm/operation/retrieve-compute-result.ts | 123 +++++++++++++----- .../src/vm/operation/retrieve-permissions.ts | 85 +++++++++--- .../src/vm/operation/retrieve-values.ts | 98 ++++++++++---- client-vms/src/vm/operation/store-program.ts | 81 +++++++++--- client-vms/src/vm/operation/store-values.ts | 105 ++++++++++----- .../src/vm/operation/update-permissions.ts | 80 ++++++++++-- client-vms/tests/client.test.ts | 36 ++--- 12 files changed, 720 insertions(+), 234 deletions(-) diff --git a/client-vms/src/util.ts b/client-vms/src/util.ts index f799497..c8c4237 100644 --- a/client-vms/src/util.ts +++ b/client-vms/src/util.ts @@ -1,16 +1,21 @@ -export const collapse = (list: T[]): T | never => { - if (list.length === 0) { - throw new Error("Cannot collapse empty list"); - } +import { Effect as E, pipe } from "effect"; +import { UnknownException } from "effect/Cause"; - return list.reduce((acc, cur) => { - if (acc === undefined) { - return cur; - } - // TODO: serialized to account for objects but could be improved - if (JSON.stringify(acc) !== JSON.stringify(cur)) { - throw new Error(`Element mismatch: ${JSON.stringify(list)}`); - } - return cur; - }); +export const collapse = (list: T[]): E.Effect => { + return pipe( + E.succeed(list), + E.filterOrFail( + (ls) => ls.length > 0, + () => new UnknownException("Empty list"), + ), + E.map((ls) => ({ + first: ls[0], + asStrings: ls.map((e) => JSON.stringify(e)), + })), + E.filterOrFail( + ({ asStrings }) => asStrings.every((str) => str === asStrings[0]), + () => new UnknownException("Not all elements are equal"), + ), + E.map(() => list[0] as T), + ); }; diff --git a/client-vms/src/vm/operation/delete-values.ts b/client-vms/src/vm/operation/delete-values.ts index a69721d..187a582 100644 --- a/client-vms/src/vm/operation/delete-values.ts +++ b/client-vms/src/vm/operation/delete-values.ts @@ -1,12 +1,20 @@ import { create } from "@bufbuild/protobuf"; -import { createClient } from "@connectrpc/connect"; -import { parse as parseUuid } from "uuid"; +import { type Client, createClient } from "@connectrpc/connect"; +import { Effect as E, pipe } from "effect"; +import type { UnknownException } from "effect/Cause"; +import { parse } from "uuid"; import { z } from "zod"; -import { DeleteValuesRequestSchema } from "#/gen-proto/nillion/values/v1/delete_pb"; +import { + type DeleteValuesRequest, + DeleteValuesRequestSchema, +} from "#/gen-proto/nillion/values/v1/delete_pb"; import { Values } from "#/gen-proto/nillion/values/v1/service_pb"; -import { Uuid } from "#/types/types"; +import { Log } from "#/logger"; +import { type PartyId, Uuid } from "#/types/types"; +import { collapse } from "#/util"; import type { VmClient } from "#/vm/client"; import type { Operation } from "#/vm/operation/operation"; +import { retryGrpcRequestIfRecoverable } from "#/vm/operation/retry-client"; export const DeleteValuesConfig = z.object({ // due to import resolution order we cannot use instanceof because VmClient isn't defined first @@ -15,32 +23,65 @@ export const DeleteValuesConfig = z.object({ }); export type DeleteValuesConfig = z.infer; +type NodeRequestOptions = { + nodeId: PartyId; + client: Client; + request: DeleteValuesRequest; +}; + export class DeleteValues implements Operation { private constructor(private readonly config: DeleteValuesConfig) {} - async invoke(): Promise { - const { - vm: { nodes }, - id, - } = this.config; + invoke(): Promise { + return pipe( + this.prepareRequestPerNode(), + E.all, + E.map((requests) => + requests.map((request) => + retryGrpcRequestIfRecoverable( + "DeleteValues", + this.invokeNodeRequest(request), + ), + ), + ), + E.flatMap((effects) => + E.all(effects, { concurrency: this.config.vm.nodes.length }), + ), + E.flatMap(collapse), + E.tapBoth({ + onFailure: (e) => + E.sync(() => Log.error("Values delete failed: %O", e)), + onSuccess: (id) => E.sync(() => Log.info(`Values deleted: ${id}`)), + }), + E.runPromise, + ); + } - const valuesId = parseUuid(id); + prepareRequestPerNode(): E.Effect[] { + const valuesId = parse(this.config.id); - const promises = nodes.map((node) => { - const client = createClient(Values, node.transport); - return client.deleteValues( - create(DeleteValuesRequestSchema, { + return this.config.vm.nodes.map((node) => + E.succeed({ + nodeId: node.id, + client: createClient(Values, node.transport), + request: create(DeleteValuesRequestSchema, { valuesId, }), - ); - }); - - const results = await Promise.all(promises); - if (results.length !== nodes.length) { - throw new Error("Results length does not match nodes length"); - } + }), + ); + } - return id; + invokeNodeRequest( + options: NodeRequestOptions, + ): E.Effect { + const { nodeId, client, request } = options; + return pipe( + E.tryPromise(() => client.deleteValues(request)), + E.map((_response) => this.config.id), + E.tap((id) => + Log.debug(`Values deleted: node=${nodeId.toBase64()} values=${id} `), + ), + ); } static new(config: DeleteValuesConfig): DeleteValues { diff --git a/client-vms/src/vm/operation/invoke-compute.ts b/client-vms/src/vm/operation/invoke-compute.ts index 619ad32..83b2fc8 100644 --- a/client-vms/src/vm/operation/invoke-compute.ts +++ b/client-vms/src/vm/operation/invoke-compute.ts @@ -1,21 +1,25 @@ import { create } from "@bufbuild/protobuf"; -import { createClient } from "@connectrpc/connect"; +import { type Client, createClient } from "@connectrpc/connect"; import { type NadaValue, NadaValues, compute_values_size, encode_values, } from "@nillion/client-wasm"; -import { parse as parseUuid, stringify as stringifyUuid } from "uuid"; +import { Effect as E, pipe } from "effect"; +import { UnknownException } from "effect/Cause"; +import { parse, stringify } from "uuid"; import { z } from "zod"; import { InputPartyBindingSchema, + type InvokeComputeRequest, InvokeComputeRequestSchema, OutputPartyBindingSchema, } from "#/gen-proto/nillion/compute/v1/invoke_pb"; import { Compute } from "#/gen-proto/nillion/compute/v1/service_pb"; import { PriceQuoteRequestSchema } from "#/gen-proto/nillion/payments/v1/quote_pb"; import type { SignedReceipt } from "#/gen-proto/nillion/payments/v1/receipt_pb"; +import { Log } from "#/logger"; import { InputBindings, OutputBindings, @@ -27,6 +31,7 @@ import type { UserId } from "#/types/user-id"; import { collapse } from "#/util"; import type { VmClient } from "#/vm/client"; import type { Operation } from "#/vm/operation/operation"; +import { retryGrpcRequestIfRecoverable } from "#/vm/operation/retry-client"; export const InvokeComputeConfig = z.object({ // due to import resolution order we cannot use instanceof because VmClient isn't defined first @@ -39,18 +44,46 @@ export const InvokeComputeConfig = z.object({ }); export type InvokeComputeConfig = z.infer; +type NodeRequestOptions = { + nodeId: PartyId; + client: Client; + request: InvokeComputeRequest; +}; + export class InvokeCompute implements Operation { private constructor(private readonly config: InvokeComputeConfig) {} async invoke(): Promise { - const { nodes, masker } = this.config.vm; + return pipe( + E.tryPromise(() => this.pay()), + E.map((receipt) => this.prepareRequestPerNode(receipt)), + E.flatMap(E.all), + E.map((requests) => + requests.map((request) => + retryGrpcRequestIfRecoverable( + "InvokeCompute", + this.invokeNodeRequest(request), + ), + ), + ), + E.flatMap((effects) => + E.all(effects, { concurrency: this.config.vm.nodes.length }), + ), + E.flatMap(collapse), + E.tapBoth({ + onFailure: (e) => + E.sync(() => Log.error("Invoke compute failed: %O", e)), + onSuccess: (id) => E.sync(() => Log.info(`Invoke compute: ${id}`)), + }), + E.runPromise, + ); + } - const signedReceipt = await this.pay(); - const shares = masker.mask(this.config.computeTimeValues).map((share) => ({ - node: PartyId.from(share.party.to_byte_array()), - bincodeValues: encode_values(share.shares), - })); - const valueIds = this.config.valueIds.map(parseUuid); + prepareRequestPerNode( + signedReceipt: SignedReceipt, + ): E.Effect[] { + const shares = this.config.vm.masker.mask(this.config.computeTimeValues); + const valueIds = this.config.valueIds.map(parse); const inputBindings = this.config.inputBindings.map((bindings) => create(InputPartyBindingSchema, { @@ -66,29 +99,45 @@ export class InvokeCompute implements Operation { }), ); - const promises = nodes.map((node) => { - const client = createClient(Compute, node.transport); - const share = shares.find( - (share) => share.node.toBase64() === node.id.toBase64(), + return shares.map((share) => { + const nodeId = PartyId.from(share.party.to_byte_array()); + const node = this.config.vm.nodes.find( + (n) => n.id.toBase64() === nodeId.toBase64(), ); - if (!share) { - throw new Error("Failed to match share.party with a known node.id"); + if (!node) { + return E.fail( + new UnknownException( + `Failed to match configured nodes with share's party id:${nodeId}`, + ), + ); } - return client.invokeCompute( - create(InvokeComputeRequestSchema, { + return E.succeed({ + nodeId: node.id, + client: createClient(Compute, node.transport), + request: create(InvokeComputeRequestSchema, { signedReceipt, valueIds, - bincodeValues: share.bincodeValues, + bincodeValues: encode_values(share.shares), inputBindings, outputBindings, }), - ); + }); }); + } - const results = (await Promise.all(promises)).map((e) => e.computeId); - return stringifyUuid(collapse(results)); + invokeNodeRequest( + options: NodeRequestOptions, + ): E.Effect { + const { nodeId, client, request } = options; + return pipe( + E.tryPromise(() => client.invokeCompute(request)), + E.map((response) => stringify(response.computeId)), + E.tap((id) => + Log.debug(`Invoked compute: node=${nodeId.toBase64()} id=${id}`), + ), + ); } private async pay(): Promise { diff --git a/client-vms/src/vm/operation/overwrite-permissions.ts b/client-vms/src/vm/operation/overwrite-permissions.ts index 0233cfd..bf4f1db 100644 --- a/client-vms/src/vm/operation/overwrite-permissions.ts +++ b/client-vms/src/vm/operation/overwrite-permissions.ts @@ -1,16 +1,23 @@ import { create } from "@bufbuild/protobuf"; -import { createClient } from "@connectrpc/connect"; +import { type Client, createClient } from "@connectrpc/connect"; +import { Effect as E, pipe } from "effect"; +import type { UnknownException } from "effect/Cause"; import { parse as parseUuid } from "uuid"; import { z } from "zod"; import { PriceQuoteRequestSchema } from "#/gen-proto/nillion/payments/v1/quote_pb"; import type { SignedReceipt } from "#/gen-proto/nillion/payments/v1/receipt_pb"; -import { OverwritePermissionsRequestSchema } from "#/gen-proto/nillion/permissions/v1/overwrite_pb"; +import { + type OverwritePermissionsRequest, + OverwritePermissionsRequestSchema, +} from "#/gen-proto/nillion/permissions/v1/overwrite_pb"; import { Permissions as PermissionsService } from "#/gen-proto/nillion/permissions/v1/service_pb"; -import { Uuid } from "#/types/types"; +import { Log } from "#/logger"; +import { type PartyId, Uuid } from "#/types/types"; import type { ValuesPermissions } from "#/types/values-permissions"; import { collapse } from "#/util"; import type { VmClient } from "#/vm/client"; import type { Operation } from "#/vm/operation/operation"; +import { retryGrpcRequestIfRecoverable } from "#/vm/operation/retry-client"; export const OverwritePermissionsConfig = z.object({ // due to import resolution order we cannot use instanceof because VmClient isn't defined first @@ -22,29 +29,70 @@ export type OverwritePermissionsConfig = z.infer< typeof OverwritePermissionsConfig >; +type NodeRequestOptions = { + nodeId: PartyId; + client: Client; + request: OverwritePermissionsRequest; +}; + export class OverwritePermissions implements Operation { private constructor(private readonly config: OverwritePermissionsConfig) {} async invoke(): Promise { - const { - permissions, - vm: { nodes }, - } = this.config; - - const signedReceipt = await this.pay(); + return pipe( + E.tryPromise(() => this.pay()), + E.map((receipt) => this.prepareRequestPerNode(receipt)), + E.flatMap(E.all), + E.map((requests) => + requests.map((request) => + retryGrpcRequestIfRecoverable( + "OverwritePermissions", + this.invokeNodeRequest(request), + ), + ), + ), + E.flatMap((effects) => + E.all(effects, { concurrency: this.config.vm.nodes.length }), + ), + E.flatMap(collapse), + E.tapBoth({ + onFailure: (e) => + E.sync(() => Log.error("Overwrite permissions failed: %O", e)), + onSuccess: (id) => + E.sync(() => Log.info(`Overwrote permissions: ${id}`)), + }), + E.runPromise, + ); + } - const promises = nodes.map((node) => { - const client = createClient(PermissionsService, node.transport); - return client.overwritePermissions( - create(OverwritePermissionsRequestSchema, { + prepareRequestPerNode( + signedReceipt: SignedReceipt, + ): E.Effect[] { + return this.config.vm.nodes.map((node) => + E.succeed({ + nodeId: node.id, + client: createClient(PermissionsService, node.transport), + request: create(OverwritePermissionsRequestSchema, { signedReceipt, - permissions: permissions.toProto(), + permissions: this.config.permissions.toProto(), }), - ); - }); + }), + ); + } - collapse(await Promise.all(promises)); - return permissions; + invokeNodeRequest( + options: NodeRequestOptions, + ): E.Effect { + const { nodeId, client, request } = options; + return pipe( + E.tryPromise(() => client.overwritePermissions(request)), + E.map((_response) => this.config.permissions), + E.tap((_permissions) => + Log.debug( + `Overwrote permissions: node=${nodeId.toBase64()} values=${this.config.id} `, + ), + ), + ); } private pay(): Promise { diff --git a/client-vms/src/vm/operation/query-pool-status.ts b/client-vms/src/vm/operation/query-pool-status.ts index a971a39..747fe7c 100644 --- a/client-vms/src/vm/operation/query-pool-status.ts +++ b/client-vms/src/vm/operation/query-pool-status.ts @@ -1,29 +1,70 @@ import { create } from "@bufbuild/protobuf"; -import { createClient } from "@connectrpc/connect"; +import { type Client, createClient } from "@connectrpc/connect"; +import { Effect as E, pipe } from "effect"; +import type { UnknownException } from "effect/Cause"; import { z } from "zod"; -import type { PoolStatusResponse } from "#/gen-proto/nillion/leader_queries/v1/pool_status_pb"; +import { + type PoolStatusRequest, + PoolStatusRequestSchema, + type PoolStatusResponse, +} from "#/gen-proto/nillion/leader_queries/v1/pool_status_pb"; import { LeaderQueries } from "#/gen-proto/nillion/leader_queries/v1/service_pb"; import { PriceQuoteRequestSchema } from "#/gen-proto/nillion/payments/v1/quote_pb"; import type { SignedReceipt } from "#/gen-proto/nillion/payments/v1/receipt_pb"; +import { Log } from "#/logger"; import type { VmClient } from "#/vm/client"; import type { Operation } from "#/vm/operation/operation"; +import { retryGrpcRequestIfRecoverable } from "#/vm/operation/retry-client"; export const QueryPoolStatusConfig = z.object({ vm: z.custom(), }); export type QueryPoolStatusConfig = z.infer; +type NodeRequestOptions = { + client: Client; + request: PoolStatusRequest; +}; + export class QueryPoolStatus implements Operation { private constructor(private readonly config: QueryPoolStatusConfig) {} async invoke(): Promise { - const client = createClient(LeaderQueries, this.config.vm.leader.transport); - const signedReceipt = await this.pay(); - return client.poolStatus({ - signedReceipt, + return pipe( + E.tryPromise(() => this.pay()), + E.flatMap((receipt) => this.prepareLeaderRequest(receipt)), + E.flatMap((request) => + retryGrpcRequestIfRecoverable( + "QueryPoolStatus", + this.invokeNodeRequest(request), + ), + ), + E.tapBoth({ + onFailure: (e) => + E.sync(() => Log.error("Query pool status failed: %O", e)), + onSuccess: (data) => + E.sync(() => Log.info("Got pool status: %O", data)), + }), + E.runPromise, + ); + } + + prepareLeaderRequest( + signedReceipt: SignedReceipt, + ): E.Effect { + return E.succeed({ + client: createClient(LeaderQueries, this.config.vm.leader.transport), + request: create(PoolStatusRequestSchema, { signedReceipt }), }); } + invokeNodeRequest( + options: NodeRequestOptions, + ): E.Effect { + const { client, request } = options; + return pipe(E.tryPromise(() => client.poolStatus(request))); + } + pay(): Promise { return this.config.vm.payer.payForOperation( create(PriceQuoteRequestSchema, { diff --git a/client-vms/src/vm/operation/retrieve-compute-result.ts b/client-vms/src/vm/operation/retrieve-compute-result.ts index 0a1c19b..23b0acd 100644 --- a/client-vms/src/vm/operation/retrieve-compute-result.ts +++ b/client-vms/src/vm/operation/retrieve-compute-result.ts @@ -1,12 +1,20 @@ -import { createClient } from "@connectrpc/connect"; +import { create } from "@bufbuild/protobuf"; +import { type Client, createClient } from "@connectrpc/connect"; import { PartyShares, decode_values } from "@nillion/client-wasm"; -import { parse as parseUuid } from "uuid"; +import { Effect as E, pipe } from "effect"; +import type { UnknownException } from "effect/Cause"; +import { parse } from "uuid"; import { z } from "zod"; +import { + type RetrieveResultsRequest, + RetrieveResultsRequestSchema, +} from "#/gen-proto/nillion/compute/v1/retrieve_pb"; import { Compute } from "#/gen-proto/nillion/compute/v1/service_pb"; import { Log } from "#/logger"; -import { NadaValuesRecord, Uuid } from "#/types/types"; +import { NadaValuesRecord, type PartyId, Uuid } from "#/types/types"; import type { VmClient } from "#/vm/client"; import type { Operation } from "#/vm/operation/operation"; +import { retryGrpcRequestIfRecoverable } from "#/vm/operation/retry-client"; export const RetrieveComputeResultConfig = z.object({ // due to import resolution order we cannot use instanceof because VmClient isn't defined first @@ -17,45 +25,92 @@ export type RetrieveComputeResultConfig = z.infer< typeof RetrieveComputeResultConfig >; +type NodeRequestOptions = { + nodeId: PartyId; + client: Client; + request: RetrieveResultsRequest; +}; + export class RetrieveComputeResult implements Operation { private constructor(private readonly config: RetrieveComputeResultConfig) {} async invoke(): Promise { - const { nodes, masker } = this.config.vm; - const computeId = parseUuid(this.config.id); - - const promises = nodes.map(async (node) => { - const client = createClient(Compute, node.transport); - const asyncIterable = client.retrieveResults({ - computeId, - }); - - for await (const response of asyncIterable) { - const state = response.state.case; - - if (state !== "success" && state !== "waitingComputation") { - throw new Error("Compute result failure from node", { - cause: response, - }); - } + return pipe( + this.prepareRequestPerNode(), + E.all, + E.map((requests) => + requests.map((request) => + retryGrpcRequestIfRecoverable( + "RetrieveComputeResults", + this.invokeNodeRequest(request), + ), + ), + ), + E.flatMap((effects) => + E.all(effects, { concurrency: this.config.vm.nodes.length }), + ), + E.map((results) => { + const shares = results.map((values) => values); + const values = this.config.vm.masker.unmask(shares); + const record = values.to_record() as unknown; + return NadaValuesRecord.parse(record); + }), + E.tapBoth({ + onFailure: (e) => + E.sync(() => Log.error("Retrieve compute results failed: %O", e)), + onSuccess: (data) => + E.sync(() => Log.info("Retrieved compute results: %O", data)), + }), + E.runPromise, + ); + } - if (response.state.case === "success") { - return new PartyShares( - node.id.toWasm(), - decode_values(response.state.value.bincodeValues), - ); - } + prepareRequestPerNode(): E.Effect[] { + const computeId = parse(this.config.id); - Log.info("Waiting for compute result from: %s", node.id.toBase64()); - } - }); + return this.config.vm.nodes.map((node) => + E.succeed({ + nodeId: node.id, + client: createClient(Compute, node.transport), + request: create(RetrieveResultsRequestSchema, { + computeId, + }), + }), + ); + } + + invokeNodeRequest( + options: NodeRequestOptions, + ): E.Effect { + const { nodeId, client, request } = options; + return pipe( + E.tryPromise(async () => { + const asyncIterable = client.retrieveResults(request); + + for await (const response of asyncIterable) { + const state = response.state.case; - const results = (await Promise.all(promises)) as PartyShares[]; - const shares = results.map((values) => values); - const values = masker.unmask(shares); + if (state !== "success" && state !== "waitingComputation") { + throw new Error("Compute result failure from node", { + cause: response, + }); + } - const record = values.to_record() as unknown; - return NadaValuesRecord.parse(record); + if (response.state.case === "success") { + return new PartyShares( + nodeId.toWasm(), + decode_values(response.state.value.bincodeValues), + ); + } + + Log.debug(`Compute result waiting on: node=${nodeId.toBase64()}`); + } + }), + E.map((shares) => shares as PartyShares), + E.tap(() => + Log.debug(`Compute result shares retrieved: node=${nodeId.toBase64()}`), + ), + ); } static new(config: RetrieveComputeResultConfig): RetrieveComputeResult { diff --git a/client-vms/src/vm/operation/retrieve-permissions.ts b/client-vms/src/vm/operation/retrieve-permissions.ts index 54afb86..6b5734c 100644 --- a/client-vms/src/vm/operation/retrieve-permissions.ts +++ b/client-vms/src/vm/operation/retrieve-permissions.ts @@ -1,17 +1,23 @@ import { create } from "@bufbuild/protobuf"; -import { createClient } from "@connectrpc/connect"; +import { type Client, createClient } from "@connectrpc/connect"; +import { Effect as E, pipe } from "effect"; +import type { UnknownException } from "effect/Cause"; import { parse as parseUuid } from "uuid"; import { z } from "zod"; import { PriceQuoteRequestSchema } from "#/gen-proto/nillion/payments/v1/quote_pb"; import type { SignedReceipt } from "#/gen-proto/nillion/payments/v1/receipt_pb"; -import type { Permissions as PermissionsProto } from "#/gen-proto/nillion/permissions/v1/permissions_pb"; -import { RetrievePermissionsRequestSchema } from "#/gen-proto/nillion/permissions/v1/retrieve_pb"; -import { Permissions } from "#/gen-proto/nillion/permissions/v1/service_pb"; -import { Uuid } from "#/types/types"; +import { + type RetrievePermissionsRequest, + RetrievePermissionsRequestSchema, +} from "#/gen-proto/nillion/permissions/v1/retrieve_pb"; +import { Permissions as PermissionsService } from "#/gen-proto/nillion/permissions/v1/service_pb"; +import { Log } from "#/logger"; +import { type PartyId, Uuid } from "#/types/types"; import { ValuesPermissions } from "#/types/values-permissions"; import { collapse } from "#/util"; import type { VmClient } from "#/vm/client"; import type { Operation } from "#/vm/operation/operation"; +import { retryGrpcRequestIfRecoverable } from "#/vm/operation/retry-client"; export const RetrievePermissionsConfig = z.object({ // due to import resolution order we cannot use instanceof because VmClient isn't defined first @@ -22,26 +28,69 @@ export type RetrievePermissionsConfig = z.infer< typeof RetrievePermissionsConfig >; +type NodeRequestOptions = { + nodeId: PartyId; + client: Client; + request: RetrievePermissionsRequest; +}; + export class RetrievePermissions implements Operation { private constructor(private readonly config: RetrievePermissionsConfig) {} - async invoke(): Promise { - const { nodes } = this.config.vm.config; - - const signedReceipt = await this.pay(); + invoke(): Promise { + return pipe( + E.tryPromise(() => this.pay()), + E.map((receipt) => this.prepareRequestPerNode(receipt)), + E.flatMap(E.all), + E.map((requests) => + requests.map((request) => + retryGrpcRequestIfRecoverable( + "RetrievePermissions", + this.invokeNodeRequest(request), + ), + ), + ), + E.flatMap((effects) => + E.all(effects, { concurrency: this.config.vm.nodes.length }), + ), + E.flatMap(collapse), + E.tapBoth({ + onFailure: (e) => + E.sync(() => Log.error("Retrieve permissions failed: %O", e)), + onSuccess: (data) => + E.sync(() => Log.info("Retrieved permissions: %O", data)), + }), + E.runPromise, + ); + } - const promises = nodes.map((node) => { - const client = createClient(Permissions, node.transport); - return client.retrievePermissions( - create(RetrievePermissionsRequestSchema, { + prepareRequestPerNode( + signedReceipt: SignedReceipt, + ): E.Effect[] { + return this.config.vm.nodes.map((node) => + E.succeed({ + nodeId: node.id, + client: createClient(PermissionsService, node.transport), + request: create(RetrievePermissionsRequestSchema, { signedReceipt, }), - ); - }); + }), + ); + } - const results = await Promise.all(promises); - const result = collapse(results); - return ValuesPermissions.from(result); + invokeNodeRequest( + options: NodeRequestOptions, + ): E.Effect { + const { nodeId, client, request } = options; + return pipe( + E.tryPromise(() => client.retrievePermissions(request)), + E.map((response) => ValuesPermissions.from(response)), + E.tap((_permissions) => + Log.debug( + `Retrieved permissions: node=${nodeId.toBase64()} values=${this.config.id} `, + ), + ), + ); } private async pay(): Promise { diff --git a/client-vms/src/vm/operation/retrieve-values.ts b/client-vms/src/vm/operation/retrieve-values.ts index c863ad0..a843d14 100644 --- a/client-vms/src/vm/operation/retrieve-values.ts +++ b/client-vms/src/vm/operation/retrieve-values.ts @@ -1,15 +1,22 @@ import { create } from "@bufbuild/protobuf"; -import { createClient } from "@connectrpc/connect"; +import { type Client, createClient } from "@connectrpc/connect"; import { PartyShares, decode_values } from "@nillion/client-wasm"; +import { Effect as E, pipe } from "effect"; +import type { UnknownException } from "effect/Cause"; import { parse as parseUuid } from "uuid"; import { z } from "zod"; import { PriceQuoteRequestSchema } from "#/gen-proto/nillion/payments/v1/quote_pb"; import type { SignedReceipt } from "#/gen-proto/nillion/payments/v1/receipt_pb"; -import { RetrieveValuesRequestSchema } from "#/gen-proto/nillion/values/v1/retrieve_pb"; +import { + type RetrieveValuesRequest, + RetrieveValuesRequestSchema, +} from "#/gen-proto/nillion/values/v1/retrieve_pb"; import { Values } from "#/gen-proto/nillion/values/v1/service_pb"; -import { NadaValuesRecord, Uuid } from "#/types/types"; +import { Log } from "#/logger"; +import { NadaValuesRecord, type PartyId, Uuid } from "#/types/types"; import type { VmClient } from "#/vm/client"; import type { Operation } from "#/vm/operation/operation"; +import { retryGrpcRequestIfRecoverable } from "#/vm/operation/retry-client"; export const RetrieveValuesConfig = z.object({ // due to import resolution order we cannot use instanceof because VmClient isn't defined first @@ -18,34 +25,79 @@ export const RetrieveValuesConfig = z.object({ }); export type RetrieveValuesConfig = z.infer; +type NodeRequestOptions = { + nodeId: PartyId; + client: Client; + request: RetrieveValuesRequest; +}; + export class RetrieveValues implements Operation { private constructor(private readonly config: RetrieveValuesConfig) {} async invoke(): Promise { - const { nodes, masker } = this.config.vm; - const signedReceipt = await this.pay(); + return pipe( + E.tryPromise(() => this.pay()), + E.map((receipt) => this.prepareRequestPerNode(receipt)), + E.flatMap(E.all), + E.map((requests) => + requests.map((request) => + retryGrpcRequestIfRecoverable( + "RetrieveValues", + this.invokeNodeRequest(request), + ), + ), + ), + E.flatMap((effects) => + E.all(effects, { concurrency: this.config.vm.nodes.length }), + ), + E.map((shares) => { + const values = this.config.vm.masker.unmask(shares); + const record = values.to_record() as unknown; + return NadaValuesRecord.parse(record); + }), + E.tapBoth({ + onFailure: (e) => + E.sync(() => Log.error("Retrieve values failed: %O", e)), + onSuccess: (data) => + E.sync(() => Log.info("Retrieved values: %O", data)), + }), + E.runPromise, + ); + } - const promises = nodes.map(async (node) => { - const client = createClient(Values, node.transport); - const result = await client.retrieveValues( - create(RetrieveValuesRequestSchema, { + prepareRequestPerNode( + signedReceipt: SignedReceipt, + ): E.Effect[] { + return this.config.vm.nodes.map((node) => + E.succeed({ + nodeId: node.id, + client: createClient(Values, node.transport), + request: create(RetrieveValuesRequestSchema, { signedReceipt, }), - ); - - return new PartyShares( - node.id.toWasm(), - decode_values(result.bincodeValues), - ); - }); - - const results = await Promise.all(promises); - if (results.length !== nodes.length) { - throw new Error("Results length does not match nodes length"); - } + }), + ); + } - const record = masker.unmask(results).to_record() as unknown; - return NadaValuesRecord.parse(record); + invokeNodeRequest( + options: NodeRequestOptions, + ): E.Effect { + const { nodeId, client, request } = options; + return pipe( + E.tryPromise(() => client.retrieveValues(request)), + E.map( + (response) => + new PartyShares( + nodeId.toWasm(), + decode_values(response.bincodeValues), + ), + ), + E.tap((id) => + Log.debug( + `Retrieved values shares: node=${nodeId.toBase64()} values=${id}`, + ), + ), + ); } private pay(): Promise { diff --git a/client-vms/src/vm/operation/store-program.ts b/client-vms/src/vm/operation/store-program.ts index 2da3747..6a4f18f 100644 --- a/client-vms/src/vm/operation/store-program.ts +++ b/client-vms/src/vm/operation/store-program.ts @@ -1,7 +1,9 @@ import { create } from "@bufbuild/protobuf"; -import { createClient } from "@connectrpc/connect"; +import { type Client, createClient } from "@connectrpc/connect"; import { ProgramMetadata } from "@nillion/client-wasm"; import { sha256 } from "@noble/hashes/sha2"; +import { Effect as E, pipe } from "effect"; +import type { UnknownException } from "effect/Cause"; import { z } from "zod"; import { type PreprocessingRequirement, @@ -10,11 +12,16 @@ import { } from "#/gen-proto/nillion/payments/v1/quote_pb"; import type { SignedReceipt } from "#/gen-proto/nillion/payments/v1/receipt_pb"; import { Programs } from "#/gen-proto/nillion/programs/v1/service_pb"; -import { StoreProgramRequestSchema } from "#/gen-proto/nillion/programs/v1/store_pb"; +import { + type StoreProgramRequest, + StoreProgramRequestSchema, +} from "#/gen-proto/nillion/programs/v1/store_pb"; +import { Log } from "#/logger"; import type { PaymentClient } from "#/payment/client"; -import { ProgramId, ProgramName } from "#/types/types"; +import { type PartyId, ProgramId, ProgramName } from "#/types/types"; import { collapse } from "#/util"; import type { VmClient } from "#/vm/client"; +import { retryGrpcRequestIfRecoverable } from "#/vm/operation/retry-client"; import type { Operation } from "./operation"; export const StoreProgramConfig = z.object({ @@ -25,6 +32,12 @@ export const StoreProgramConfig = z.object({ }); export type StoreProgramConfig = z.infer; +type NodeRequestOptions = { + nodeId: PartyId; + client: Client; + request: StoreProgramRequest; +}; + export class StoreProgram implements Operation { private constructor(private readonly config: StoreProgramConfig) {} @@ -33,25 +46,57 @@ export class StoreProgram implements Operation { } async invoke(): Promise { - const { - program, - vm: { nodes }, - } = this.config; - const signedReceipt = await this.pay(); + return pipe( + E.tryPromise(() => this.pay()), + E.map((receipt) => this.prepareRequestPerNode(receipt)), + E.flatMap(E.all), + E.map((requests) => + requests.map((request) => + retryGrpcRequestIfRecoverable( + "StoreProgram", + this.invokeNodeRequest(request), + ), + ), + ), + E.flatMap((effects) => + E.all(effects, { concurrency: this.config.vm.nodes.length }), + ), + E.flatMap(collapse), + E.tapBoth({ + onFailure: (e) => + E.sync(() => Log.error("Store program failed: %O", e)), + onSuccess: (id) => E.sync(() => Log.info(`Stored program: ${id}`)), + }), + E.runPromise, + ); + } - const promises = nodes.map((node) => { - const client = createClient(Programs, node.transport); - return client.storeProgram( - create(StoreProgramRequestSchema, { + prepareRequestPerNode( + signedReceipt: SignedReceipt, + ): E.Effect[] { + return this.config.vm.nodes.map((node) => + E.succeed({ + nodeId: node.id, + client: createClient(Programs, node.transport), + request: create(StoreProgramRequestSchema, { signedReceipt, - program, + program: this.config.program, }), - ); - }); + }), + ); + } - const results = (await Promise.all(promises)).map((e) => e.programId); - const value = collapse(results); - return ProgramId.parse(value); + invokeNodeRequest( + options: NodeRequestOptions, + ): E.Effect { + const { nodeId, client, request } = options; + return pipe( + E.tryPromise(() => client.storeProgram(request)), + E.map((response) => ProgramId.parse(response.programId)), + E.tap((id) => + Log.debug(`Stored program: node=${nodeId.toBase64()} values=${id} `), + ), + ); } private pay(): Promise { diff --git a/client-vms/src/vm/operation/store-values.ts b/client-vms/src/vm/operation/store-values.ts index 5ee79ed..6abd1c8 100644 --- a/client-vms/src/vm/operation/store-values.ts +++ b/client-vms/src/vm/operation/store-values.ts @@ -1,17 +1,23 @@ import { create } from "@bufbuild/protobuf"; -import { createClient } from "@connectrpc/connect"; +import { type Client, createClient } from "@connectrpc/connect"; import { type NadaValue, NadaValues, compute_values_size, encode_values, } from "@nillion/client-wasm"; -import { stringify as stringifyUuid } from "uuid"; +import { Effect as E, pipe } from "effect"; +import { UnknownException } from "effect/Cause"; +import { stringify } from "uuid"; import { z } from "zod"; import { PriceQuoteRequestSchema } from "#/gen-proto/nillion/payments/v1/quote_pb"; import type { SignedReceipt } from "#/gen-proto/nillion/payments/v1/receipt_pb"; import { Values } from "#/gen-proto/nillion/values/v1/service_pb"; -import { StoreValuesRequestSchema } from "#/gen-proto/nillion/values/v1/store_pb"; +import { + type StoreValuesRequest, + StoreValuesRequestSchema, +} from "#/gen-proto/nillion/values/v1/store_pb"; +import { Log } from "#/logger"; import { PartyId, TtlDays, Uuid } from "#/types/types"; import { type ValuesPermissions, @@ -20,6 +26,7 @@ import { import { collapse } from "#/util"; import type { VmClient } from "#/vm/client"; import type { Operation } from "#/vm/operation/operation"; +import { retryGrpcRequestIfRecoverable } from "#/vm/operation/retry-client"; export const StoreValuesConfig = z.object({ // due to import resolution order we cannot use instanceof because VmClient isn't defined first @@ -31,54 +38,94 @@ export const StoreValuesConfig = z.object({ }); export type StoreValuesConfig = z.infer; +type NodeRequestOptions = { + nodeId: PartyId; + client: Client; + request: StoreValuesRequest; +}; + export class StoreValues implements Operation { private constructor(private readonly config: StoreValuesConfig) {} - async invoke(): Promise { - const signedReceipt = await this.pay(); + get isUpdate(): boolean { + return Boolean(this.config.id); + } + + invoke(): Promise { + return pipe( + E.tryPromise(() => this.pay()), + E.map((receipt) => this.prepareRequestPerNode(receipt)), + E.flatMap(E.all), + E.map((requests) => + requests.map((request) => + retryGrpcRequestIfRecoverable( + "StoreValues", + this.invokeNodeRequest(request), + ), + ), + ), + E.flatMap((effects) => + E.all(effects, { concurrency: this.config.vm.nodes.length }), + ), + E.flatMap(collapse), + E.tapBoth({ + onFailure: (e) => E.sync(() => Log.error("Values store failed: %O", e)), + onSuccess: (id) => E.sync(() => Log.info(`Values stored: ${id}`)), + }), + E.runPromise, + ); + } + prepareRequestPerNode( + signedReceipt: SignedReceipt, + ): E.Effect[] { const { values, - vm: { masker, nodes }, + vm: { nodes, masker }, } = this.config; - const shares = masker.mask(values).map((share) => ({ - node: PartyId.from(share.party.to_byte_array()), - bincodeValues: encode_values(share.shares), - })); - + const permissions = this.config.permissions.toProto(); + const shares = masker.mask(values); const updateIdentifier = this.isUpdate ? new TextEncoder().encode(this.config.id ?? undefined) : undefined; - const permissions = this.config.permissions.toProto(); - - const promises = nodes.map((node) => { - const client = createClient(Values, node.transport); - const share = shares.find( - (share) => share.node.toBase64() === node.id.toBase64(), - ); + return shares.map((share) => { + const nodeId = PartyId.from(share.party.to_byte_array()); + const node = nodes.find((n) => n.id.toBase64() === nodeId.toBase64()); - if (!share) { - throw new Error("Failed to match share.party with a known node.id"); + if (!node) { + return E.fail( + new UnknownException( + `Failed to match configured nodes with share's party id:${nodeId}`, + ), + ); } - return client.storeValues( - create(StoreValuesRequestSchema, { + return E.succeed({ + nodeId, + client: createClient(Values, node.transport), + request: create(StoreValuesRequestSchema, { signedReceipt, - bincodeValues: share.bincodeValues, + bincodeValues: encode_values(share.shares), permissions, updateIdentifier, }), - ); + }); }); - - const results = (await Promise.all(promises)).map((e) => e.valuesId); - return stringifyUuid(collapse(results)); } - get isUpdate(): boolean { - return Boolean(this.config.id); + invokeNodeRequest( + options: NodeRequestOptions, + ): E.Effect { + const { nodeId, client, request } = options; + return pipe( + E.tryPromise(() => client.storeValues(request)), + E.map((response) => stringify(response.valuesId)), + E.tap((id) => + Log.debug(`Values stored: node=${nodeId.toBase64()} values=${id}`), + ), + ); } private pay(): Promise { diff --git a/client-vms/src/vm/operation/update-permissions.ts b/client-vms/src/vm/operation/update-permissions.ts index 53c3a87..c5f1263 100644 --- a/client-vms/src/vm/operation/update-permissions.ts +++ b/client-vms/src/vm/operation/update-permissions.ts @@ -1,11 +1,17 @@ import { create } from "@bufbuild/protobuf"; -import { createClient } from "@connectrpc/connect"; +import { type Client, createClient } from "@connectrpc/connect"; +import { Effect as E, pipe } from "effect"; +import type { UnknownException } from "effect/Cause"; import { parse as parseUuid } from "uuid"; import { z } from "zod"; import { PriceQuoteRequestSchema } from "#/gen-proto/nillion/payments/v1/quote_pb"; import type { SignedReceipt } from "#/gen-proto/nillion/payments/v1/receipt_pb"; import { Permissions as PermissionsService } from "#/gen-proto/nillion/permissions/v1/service_pb"; -import { UpdatePermissionsRequestSchema } from "#/gen-proto/nillion/permissions/v1/update_pb"; +import { + type UpdatePermissionsRequest, + UpdatePermissionsRequestSchema, +} from "#/gen-proto/nillion/permissions/v1/update_pb"; +import { Log } from "#/logger"; import { ComputePermissionCommand, ComputePermissionCommandBuilder, @@ -14,11 +20,12 @@ import { PermissionCommand, PermissionCommandBuilder, } from "#/types/permission-command"; -import { type ProgramId, Uuid } from "#/types/types"; +import { type PartyId, type ProgramId, Uuid } from "#/types/types"; import type { UserId } from "#/types/user-id"; import { collapse } from "#/util"; import type { VmClient } from "#/vm/client"; import type { Operation } from "#/vm/operation/operation"; +import { retryGrpcRequestIfRecoverable } from "#/vm/operation/retry-client"; export const UpdatePermissionsConfig = z.object({ // due to import resolution order we cannot use instanceof because VmClient isn't defined first @@ -31,32 +38,77 @@ export const UpdatePermissionsConfig = z.object({ }); export type UpdatePermissionsConfig = z.infer; -export class UpdatePermissions implements Operation { +type NodeRequestOptions = { + nodeId: PartyId; + client: Client; + request: UpdatePermissionsRequest; +}; + +export class UpdatePermissions implements Operation { private constructor(private readonly config: UpdatePermissionsConfig) {} - async invoke(): Promise { - const signedReceipt = await this.pay(); + invoke(): Promise { + return pipe( + E.tryPromise(() => this.pay()), + E.map((receipt) => this.prepareRequestPerNode(receipt)), + E.flatMap(E.all), + E.map((requests) => + requests.map((request) => + retryGrpcRequestIfRecoverable( + "UpdatePermissions", + this.invokeNodeRequest(request), + ), + ), + ), + E.flatMap((effects) => + E.all(effects, { concurrency: this.config.vm.nodes.length }), + ), + E.flatMap(collapse), + E.tapBoth({ + onFailure: (e) => + E.sync(() => Log.error("Update permissions failed: %O", e)), + onSuccess: (id) => E.sync(() => Log.info(`Updated permissions: ${id}`)), + }), + E.runPromise, + ); + } - const { nodes } = this.config.vm; + prepareRequestPerNode( + signedReceipt: SignedReceipt, + ): E.Effect[] { const retrieve = this.config.retrieve.toProto(); const update = this.config.update.toProto(); const _delete = this.config._delete.toProto(); const compute = this.config.compute.toProto(); - const promises = nodes.map((node) => { - const client = createClient(PermissionsService, node.transport); - return client.updatePermissions( - create(UpdatePermissionsRequestSchema, { + return this.config.vm.nodes.map((node) => + E.succeed({ + nodeId: node.id, + client: createClient(PermissionsService, node.transport), + request: create(UpdatePermissionsRequestSchema, { signedReceipt, retrieve, update, delete: _delete, compute, }), - ); - }); + }), + ); + } - collapse(await Promise.all(promises)); + invokeNodeRequest( + options: NodeRequestOptions, + ): E.Effect { + const { nodeId, client, request } = options; + return pipe( + E.tryPromise(() => client.updatePermissions(request)), + E.map((_response) => this.config.id), + E.tap((id) => + Log.debug( + `Updated permissions: node=${nodeId.toBase64()} values=${id} `, + ), + ), + ); } private pay(): Promise { diff --git a/client-vms/tests/client.test.ts b/client-vms/tests/client.test.ts index bb906e1..80d57f5 100644 --- a/client-vms/tests/client.test.ts +++ b/client-vms/tests/client.test.ts @@ -1,6 +1,8 @@ import { NadaValue } from "@nillion/client-wasm"; -import { describe, expect, it } from "vitest"; +import { afterAll, beforeAll, describe, expect, it } from "vitest"; import { ZodError } from "zod"; +import { Log } from "#/logger"; +import { createSignerFromKey } from "#/payment/wallet"; import type { ProgramId, Uuid } from "#/types/types"; import { type ValuesPermissions, @@ -8,25 +10,12 @@ import { } from "#/types/values-permissions"; import { VmClientBuilder } from "#/vm/builder"; import type { VmClient } from "#/vm/client"; - -import { createSignerFromKey } from "#/payment/wallet"; import { Env, PrivateKeyPerSuite, loadProgram } from "./helpers"; -describe("VmClient", () => { +describe("Client", () => { let client: VmClient; - it("builder rejects if missing values", async () => { - try { - const builder = new VmClientBuilder(); - await builder.build(); - } catch (e) { - expect(e).toBeInstanceOf(ZodError); - expect((e as ZodError).issues).toHaveLength(5); - } - expect.assertions(2); - }); - - it("builder can create client", async () => { + beforeAll(async () => { const signer = await createSignerFromKey(PrivateKeyPerSuite.VmClient); client = await new VmClientBuilder() @@ -36,8 +25,21 @@ describe("VmClient", () => { .chainUrl(Env.nilChainUrl) .signer(signer) .build(); + }); + + afterAll(async () => { + await new Promise((resolve) => Log.flush(resolve)); + }); - expect(client).toBeDefined(); + it("builder rejects if missing values", async () => { + try { + const builder = new VmClientBuilder(); + await builder.build(); + } catch (e) { + expect(e).toBeInstanceOf(ZodError); + expect((e as ZodError).issues).toHaveLength(5); + } + expect.assertions(2); }); it("can query pool status", async () => {