Skip to content

Commit

Permalink
feat(vms): update all ops to use retry client and effects
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-hm committed Nov 5, 2024
1 parent 7b21397 commit 6ed7666
Show file tree
Hide file tree
Showing 12 changed files with 720 additions and 234 deletions.
33 changes: 19 additions & 14 deletions client-vms/src/util.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
export const collapse = <T>(list: T[]): T | never => {
if (list.length === 0) {
throw new Error("Cannot collapse empty list");
}
import { Effect as E, pipe } from "effect";
import { UnknownException } from "effect/Cause";

return list.reduce((acc, cur) => {
if (acc === undefined) {
return cur;
}
// TODO: serialized to account for objects but could be improved
if (JSON.stringify(acc) !== JSON.stringify(cur)) {
throw new Error(`Element mismatch: ${JSON.stringify(list)}`);
}
return cur;
});
export const collapse = <T>(list: T[]): E.Effect<T, UnknownException> => {
return pipe(
E.succeed(list),
E.filterOrFail(
(ls) => ls.length > 0,
() => new UnknownException("Empty list"),
),
E.map((ls) => ({
first: ls[0],
asStrings: ls.map((e) => JSON.stringify(e)),
})),
E.filterOrFail(
({ asStrings }) => asStrings.every((str) => str === asStrings[0]),
() => new UnknownException("Not all elements are equal"),
),
E.map(() => list[0] as T),
);
};
85 changes: 63 additions & 22 deletions client-vms/src/vm/operation/delete-values.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
import { create } from "@bufbuild/protobuf";
import { createClient } from "@connectrpc/connect";
import { parse as parseUuid } from "uuid";
import { type Client, createClient } from "@connectrpc/connect";
import { Effect as E, pipe } from "effect";
import type { UnknownException } from "effect/Cause";
import { parse } from "uuid";
import { z } from "zod";
import { DeleteValuesRequestSchema } from "#/gen-proto/nillion/values/v1/delete_pb";
import {
type DeleteValuesRequest,
DeleteValuesRequestSchema,
} from "#/gen-proto/nillion/values/v1/delete_pb";
import { Values } from "#/gen-proto/nillion/values/v1/service_pb";
import { Uuid } from "#/types/types";
import { Log } from "#/logger";
import { type PartyId, Uuid } from "#/types/types";
import { collapse } from "#/util";
import type { VmClient } from "#/vm/client";
import type { Operation } from "#/vm/operation/operation";
import { retryGrpcRequestIfRecoverable } from "#/vm/operation/retry-client";

export const DeleteValuesConfig = z.object({
// due to import resolution order we cannot use instanceof because VmClient isn't defined first
Expand All @@ -15,32 +23,65 @@ export const DeleteValuesConfig = z.object({
});
export type DeleteValuesConfig = z.infer<typeof DeleteValuesConfig>;

type NodeRequestOptions = {
nodeId: PartyId;
client: Client<typeof Values>;
request: DeleteValuesRequest;
};

export class DeleteValues implements Operation<Uuid> {
private constructor(private readonly config: DeleteValuesConfig) {}

async invoke(): Promise<Uuid> {
const {
vm: { nodes },
id,
} = this.config;
invoke(): Promise<Uuid> {
return pipe(
this.prepareRequestPerNode(),
E.all,
E.map((requests) =>
requests.map((request) =>
retryGrpcRequestIfRecoverable<Uuid>(
"DeleteValues",
this.invokeNodeRequest(request),
),
),
),
E.flatMap((effects) =>
E.all(effects, { concurrency: this.config.vm.nodes.length }),
),
E.flatMap(collapse),
E.tapBoth({
onFailure: (e) =>
E.sync(() => Log.error("Values delete failed: %O", e)),
onSuccess: (id) => E.sync(() => Log.info(`Values deleted: ${id}`)),
}),
E.runPromise,
);
}

const valuesId = parseUuid(id);
prepareRequestPerNode(): E.Effect<NodeRequestOptions, UnknownException>[] {
const valuesId = parse(this.config.id);

const promises = nodes.map((node) => {
const client = createClient(Values, node.transport);
return client.deleteValues(
create(DeleteValuesRequestSchema, {
return this.config.vm.nodes.map((node) =>
E.succeed({
nodeId: node.id,
client: createClient(Values, node.transport),
request: create(DeleteValuesRequestSchema, {
valuesId,
}),
);
});

const results = await Promise.all(promises);
if (results.length !== nodes.length) {
throw new Error("Results length does not match nodes length");
}
}),
);
}

return id;
invokeNodeRequest(
options: NodeRequestOptions,
): E.Effect<Uuid, UnknownException> {
const { nodeId, client, request } = options;
return pipe(
E.tryPromise(() => client.deleteValues(request)),
E.map((_response) => this.config.id),
E.tap((id) =>
Log.debug(`Values deleted: node=${nodeId.toBase64()} values=${id} `),
),
);
}

static new(config: DeleteValuesConfig): DeleteValues {
Expand Down
91 changes: 70 additions & 21 deletions client-vms/src/vm/operation/invoke-compute.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
import { create } from "@bufbuild/protobuf";
import { createClient } from "@connectrpc/connect";
import { type Client, createClient } from "@connectrpc/connect";
import {
type NadaValue,
NadaValues,
compute_values_size,
encode_values,
} from "@nillion/client-wasm";
import { parse as parseUuid, stringify as stringifyUuid } from "uuid";
import { Effect as E, pipe } from "effect";
import { UnknownException } from "effect/Cause";
import { parse, stringify } from "uuid";
import { z } from "zod";
import {
InputPartyBindingSchema,
type InvokeComputeRequest,
InvokeComputeRequestSchema,
OutputPartyBindingSchema,
} from "#/gen-proto/nillion/compute/v1/invoke_pb";
import { Compute } from "#/gen-proto/nillion/compute/v1/service_pb";
import { PriceQuoteRequestSchema } from "#/gen-proto/nillion/payments/v1/quote_pb";
import type { SignedReceipt } from "#/gen-proto/nillion/payments/v1/receipt_pb";
import { Log } from "#/logger";
import {
InputBindings,
OutputBindings,
Expand All @@ -27,6 +31,7 @@ import type { UserId } from "#/types/user-id";
import { collapse } from "#/util";
import type { VmClient } from "#/vm/client";
import type { Operation } from "#/vm/operation/operation";
import { retryGrpcRequestIfRecoverable } from "#/vm/operation/retry-client";

export const InvokeComputeConfig = z.object({
// due to import resolution order we cannot use instanceof because VmClient isn't defined first
Expand All @@ -39,18 +44,46 @@ export const InvokeComputeConfig = z.object({
});
export type InvokeComputeConfig = z.infer<typeof InvokeComputeConfig>;

type NodeRequestOptions = {
nodeId: PartyId;
client: Client<typeof Compute>;
request: InvokeComputeRequest;
};

export class InvokeCompute implements Operation<Uuid> {
private constructor(private readonly config: InvokeComputeConfig) {}

async invoke(): Promise<Uuid> {
const { nodes, masker } = this.config.vm;
return pipe(
E.tryPromise(() => this.pay()),
E.map((receipt) => this.prepareRequestPerNode(receipt)),
E.flatMap(E.all),
E.map((requests) =>
requests.map((request) =>
retryGrpcRequestIfRecoverable<Uuid>(
"InvokeCompute",
this.invokeNodeRequest(request),
),
),
),
E.flatMap((effects) =>
E.all(effects, { concurrency: this.config.vm.nodes.length }),
),
E.flatMap(collapse),
E.tapBoth({
onFailure: (e) =>
E.sync(() => Log.error("Invoke compute failed: %O", e)),
onSuccess: (id) => E.sync(() => Log.info(`Invoke compute: ${id}`)),
}),
E.runPromise,
);
}

const signedReceipt = await this.pay();
const shares = masker.mask(this.config.computeTimeValues).map((share) => ({
node: PartyId.from(share.party.to_byte_array()),
bincodeValues: encode_values(share.shares),
}));
const valueIds = this.config.valueIds.map(parseUuid);
prepareRequestPerNode(
signedReceipt: SignedReceipt,
): E.Effect<NodeRequestOptions, UnknownException>[] {
const shares = this.config.vm.masker.mask(this.config.computeTimeValues);
const valueIds = this.config.valueIds.map(parse);

const inputBindings = this.config.inputBindings.map((bindings) =>
create(InputPartyBindingSchema, {
Expand All @@ -66,29 +99,45 @@ export class InvokeCompute implements Operation<Uuid> {
}),
);

const promises = nodes.map((node) => {
const client = createClient(Compute, node.transport);
const share = shares.find(
(share) => share.node.toBase64() === node.id.toBase64(),
return shares.map((share) => {
const nodeId = PartyId.from(share.party.to_byte_array());
const node = this.config.vm.nodes.find(
(n) => n.id.toBase64() === nodeId.toBase64(),
);

if (!share) {
throw new Error("Failed to match share.party with a known node.id");
if (!node) {
return E.fail(
new UnknownException(
`Failed to match configured nodes with share's party id:${nodeId}`,
),
);
}

return client.invokeCompute(
create(InvokeComputeRequestSchema, {
return E.succeed({
nodeId: node.id,
client: createClient(Compute, node.transport),
request: create(InvokeComputeRequestSchema, {
signedReceipt,
valueIds,
bincodeValues: share.bincodeValues,
bincodeValues: encode_values(share.shares),
inputBindings,
outputBindings,
}),
);
});
});
}

const results = (await Promise.all(promises)).map((e) => e.computeId);
return stringifyUuid(collapse(results));
invokeNodeRequest(
options: NodeRequestOptions,
): E.Effect<Uuid, UnknownException> {
const { nodeId, client, request } = options;
return pipe(
E.tryPromise(() => client.invokeCompute(request)),
E.map((response) => stringify(response.computeId)),
E.tap((id) =>
Log.debug(`Invoked compute: node=${nodeId.toBase64()} id=${id}`),
),
);
}

private async pay(): Promise<SignedReceipt> {
Expand Down
Loading

0 comments on commit 6ed7666

Please sign in to comment.