Skip to content

Commit

Permalink
feat: add eventmessage for emitting models & start refactoring emit m…
Browse files Browse the repository at this point in the history
…acro (starkware-libs#1656)

* feat: add eventmessage for emitting models & start refactoring emit macro

* refactor: emit multiple models event

* feat: event message processor

* feat: emit_message world func emit event mssage

* refactor: catch all event processor

* refactor: check model key as name

* feat: model name keccak as id & rework model events

* fix: storing entities with model hash

* refactor: catch all event message  and store

* feat: emit model evrnt from spawn and move spawn

* fix: pass keys as array

* feat: fix emit macro and correctly index model events

* feat: event messages migrations & set

* feat: store events messages and new id system

* chore: fmt

* fix: keys array

* feat: add grpc endpoint for event messages

* feat: graphql schema for event messages

* cadd comments for moel name hash

* revert world.cario changes

* fix: graphql entity and model connection

* refactor: only test models name ordering

* fix: entity/modeldata relation

* refactor: remove event testing

* fix: cairo code

* refactor: add back event rxample

* fix: merge

* chore: migration

* fix: subscription test

* fix: tests

* fix: subscription test

* fix: sql test

* chore: format model id correctly in test

* fix: model subscription with id

* fix: schema for model

* chore: revert modelmmeber type

* fix: event message query
  • Loading branch information
Larkooo authored Mar 21, 2024
1 parent f4c8111 commit 1bed704
Show file tree
Hide file tree
Showing 27 changed files with 911 additions and 105 deletions.
2 changes: 2 additions & 0 deletions bin/torii/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use tokio::sync::broadcast;
use tokio::sync::broadcast::Sender;
use tokio_stream::StreamExt;
use torii_core::engine::{Engine, EngineConfig, Processors};
use torii_core::processors::event_message::EventMessageProcessor;
use torii_core::processors::metadata_update::MetadataUpdateProcessor;
use torii_core::processors::register_model::RegisterModelProcessor;
use torii_core::processors::store_del_record::StoreDelRecordProcessor;
Expand Down Expand Up @@ -160,6 +161,7 @@ async fn main() -> anyhow::Result<()> {
Box::new(StoreSetRecordProcessor),
Box::new(MetadataUpdateProcessor),
Box::new(StoreDelRecordProcessor),
Box::new(EventMessageProcessor),
],
transaction: vec![Box::new(StoreTransactionProcessor)],
..Processors::default()
Expand Down
94 changes: 76 additions & 18 deletions crates/dojo-lang/src/inline_macros/emit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use cairo_lang_diagnostics::Severity;
use cairo_lang_semantic::inline_macros::unsupported_bracket_diagnostic;
use cairo_lang_syntax::node::{ast, TypedSyntaxNode};

use super::unsupported_arg_diagnostic;

#[derive(Debug, Default)]
pub struct EmitMacro;

Expand All @@ -23,11 +25,7 @@ impl InlineMacroExprPlugin for EmitMacro {
return unsupported_bracket_diagnostic(db, syntax);
};
let mut builder = PatchBuilder::new(db);
builder.add_str(
"{
let mut keys = Default::<core::array::Array>::default();
let mut data = Default::<core::array::Array>::default();",
);
builder.add_str("{");

let args = arg_list.arguments(db).elements(db);

Expand All @@ -36,25 +34,85 @@ impl InlineMacroExprPlugin for EmitMacro {
code: None,
diagnostics: vec![PluginDiagnostic {
stable_ptr: arg_list.arguments(db).stable_ptr().untyped(),
message: "Invalid arguments. Expected \"emit!(world, event)\"".to_string(),
message: "Invalid arguments. Expected \"emit!(world, models,)\"".to_string(),
severity: Severity::Error,
}],
};
}

let world = &args[0];
let event = &args[1];

builder.add_str(
"\n starknet::Event::append_keys_and_data(@core::traits::Into::<_, \
Event>::into(",
);
builder.add_node(event.as_syntax_node());
builder.add_str("), ref keys, ref data);");

builder.add_str("\n ");
builder.add_node(world.as_syntax_node());
builder.add_str(".emit(keys, data.span());");

let ast::ArgClause::Unnamed(models) = args[1].arg_clause(db) else {
return unsupported_arg_diagnostic(db, syntax);
};

let mut bundle = vec![];

match models.value(db) {
ast::Expr::Parenthesized(parens) => {
let syntax_node = parens.expr(db).as_syntax_node();
bundle.push((syntax_node.get_text(db), syntax_node));
}
ast::Expr::Tuple(list) => {
list.expressions(db).elements(db).into_iter().for_each(|expr| {
let syntax_node = expr.as_syntax_node();
bundle.push((syntax_node.get_text(db), syntax_node));
})
}
ast::Expr::StructCtorCall(ctor) => {
let syntax_node = ctor.as_syntax_node();
bundle.push((syntax_node.get_text(db), syntax_node));
}
_ => {
return InlinePluginResult {
code: None,
diagnostics: vec![PluginDiagnostic {
message: "Invalid arguments. Expected \"(world, (models,))\"".to_string(),
stable_ptr: arg_list.arguments(db).stable_ptr().untyped(),
severity: Severity::Error,
}],
};
}
}

if bundle.is_empty() {
return InlinePluginResult {
code: None,
diagnostics: vec![PluginDiagnostic {
message: "Invalid arguments: No models provided.".to_string(),
stable_ptr: arg_list.arguments(db).stable_ptr().untyped(),
severity: Severity::Error,
}],
};
}

for (event, _) in bundle {
builder.add_str("{");

builder.add_str(
"
let mut keys = Default::<core::array::Array>::default();
let mut data = Default::<core::array::Array>::default();",
);

builder.add_str(&format!(
"keys.append(selector!(\"{}\"));",
event.split_whitespace().next().unwrap()
));

builder.add_str(&format!(
"
starknet::Event::append_keys_and_data(@{event}, ref keys, ref data);",
event = event
));

builder.add_str("\n ");
builder.add_node(world.as_syntax_node());
builder.add_str(".emit(keys, data.span());");

builder.add_str("}");
}

builder.add_str("}");

InlinePluginResult {
Expand Down
5 changes: 5 additions & 0 deletions crates/dojo-world/src/contracts/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub enum ModelError {
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait ModelReader<E> {
fn name(&self) -> String;
fn class_hash(&self) -> FieldElement;
fn contract_address(&self) -> FieldElement;
async fn schema(&self) -> Result<Ty, E>;
Expand Down Expand Up @@ -147,6 +148,10 @@ impl<'a, P> ModelReader<ModelError> for ModelRPCReader<'a, P>
where
P: Provider + Sync + Send,
{
fn name(&self) -> String {
self.name.to_string()
}

fn class_hash(&self) -> FieldElement {
self.class_hash
}
Expand Down
4 changes: 2 additions & 2 deletions crates/dojo-world/src/manifest_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,10 +380,10 @@ async fn fetch_remote_manifest() {
let remote_manifest =
DeploymentManifest::load_from_remote(provider, world_address).await.unwrap();

assert_eq!(local_manifest.models.len(), 2);
assert_eq!(local_manifest.models.len(), 3);
assert_eq!(local_manifest.contracts.len(), 1);

assert_eq!(remote_manifest.models.len(), 2);
assert_eq!(remote_manifest.models.len(), 3);
assert_eq!(remote_manifest.contracts.len(), 1);

// compute diff from local and remote manifest
Expand Down
5 changes: 4 additions & 1 deletion crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,10 @@ impl<P: Provider + Sync> Engine<P> {
};
self.db.store_event(event_id, event, transaction_hash);
for processor in &self.processors.event {
if get_selector_from_name(&processor.event_key())? == event.keys[0]
// If the processor has no event_key, means it's a catch-all processor.
// We also validate the event
if (processor.event_key().is_empty()
|| get_selector_from_name(&processor.event_key())? == event.keys[0])
&& processor.validate(event)
{
processor
Expand Down
4 changes: 3 additions & 1 deletion crates/torii/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::num::ParseIntError;
use dojo_types::primitive::PrimitiveError;
use dojo_types::schema::EnumError;
use starknet::core::types::{FromByteSliceError, FromStrError};
use starknet::core::utils::CairoShortStringToFeltError;
use starknet::core::utils::{CairoShortStringToFeltError, NonAsciiNameError};

#[derive(Debug, thiserror::Error)]
pub enum Error {
Expand All @@ -21,6 +21,8 @@ pub enum Error {

#[derive(Debug, thiserror::Error)]
pub enum ParseError {
#[error(transparent)]
NonAsciiName(#[from] NonAsciiNameError),
#[error(transparent)]
FromStr(#[from] FromStrError),
#[error(transparent)]
Expand Down
11 changes: 10 additions & 1 deletion crates/torii/core/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use dojo_world::contracts::model::ModelReader;
use sqlx::sqlite::SqliteRow;
use sqlx::{Pool, Row, Sqlite};
use starknet::core::types::FieldElement;
use starknet::core::utils::get_selector_from_name;

use super::error::{self, Error};
use crate::error::{ParseError, QueryError};
Expand Down Expand Up @@ -59,6 +60,10 @@ impl ModelSQLReader {
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl ModelReader<Error> for ModelSQLReader {
fn name(&self) -> String {
self.name.to_string()
}

fn class_hash(&self) -> FieldElement {
self.class_hash
}
Expand All @@ -68,11 +73,15 @@ impl ModelReader<Error> for ModelSQLReader {
}

async fn schema(&self) -> Result<Ty, Error> {
// this is temporary until the hash for the model name is precomputed
let model_selector =
get_selector_from_name(&self.name).map_err(error::ParseError::NonAsciiName)?;

let model_members: Vec<SqlModelMember> = sqlx::query_as(
"SELECT id, model_idx, member_idx, name, type, type_enum, enum_options, key FROM \
model_members WHERE model_id = ? ORDER BY model_idx ASC, member_idx ASC",
)
.bind(self.name.clone())
.bind(format!("{:#x}", model_selector))
.fetch_all(&self.pool)
.await?;

Expand Down
65 changes: 65 additions & 0 deletions crates/torii/core/src/processors/event_message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use anyhow::{Error, Result};
use async_trait::async_trait;
use dojo_world::contracts::model::ModelReader;
use dojo_world::contracts::world::WorldContractReader;
use starknet::core::types::{Event, TransactionReceipt};
use starknet::providers::Provider;
use tracing::info;

use super::EventProcessor;
use crate::processors::MODEL_INDEX;
use crate::sql::Sql;

#[derive(Default)]
pub struct EventMessageProcessor;

#[async_trait]
impl<P> EventProcessor<P> for EventMessageProcessor
where
P: Provider + Send + Sync,
{
fn event_key(&self) -> String {
"".to_string()
}

fn validate(&self, event: &Event) -> bool {
// we expect at least 3 keys
// 1: event selector
// 2: model keys, arbitrary length
// last key: system key
if event.keys.len() < 3 {
return false;
}

true
}

async fn process(
&self,
_world: &WorldContractReader<P>,
db: &mut Sql,
_block_number: u64,
_transaction_receipt: &TransactionReceipt,
event_id: &str,
event: &Event,
) -> Result<(), Error> {
// silently ignore if the model is not found
let model = match db.model(&format!("{:#x}", event.keys[MODEL_INDEX])).await {
Ok(model) => model,
Err(_) => return Ok(()),
};

info!("store event message: {}", model.name());

// skip the first key, as its the event selector
// and dont include last key as its the system key
let mut keys_and_unpacked =
[event.keys[1..event.keys.len() - 1].to_vec(), event.data.clone()].concat();

let mut entity = model.schema().await?;
entity.deserialize(&mut keys_and_unpacked)?;

db.set_event_message(entity, event_id).await?;
Ok(())
}
}
1 change: 1 addition & 0 deletions crates/torii/core/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use starknet_crypto::FieldElement;

use crate::sql::Sql;

pub mod event_message;
pub mod metadata_update;
pub mod register_model;
pub mod store_del_record;
Expand Down
5 changes: 3 additions & 2 deletions crates/torii/core/src/processors/store_del_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use async_trait::async_trait;
use dojo_world::contracts::model::ModelReader;
use dojo_world::contracts::world::WorldContractReader;
use starknet::core::types::{Event, TransactionReceipt};
use starknet::core::utils::parse_cairo_short_string;
use starknet::core::utils::{get_selector_from_name, parse_cairo_short_string};
use starknet::providers::Provider;
use tracing::info;

Expand Down Expand Up @@ -47,7 +47,8 @@ where
let name = parse_cairo_short_string(&event.data[MODEL_INDEX])?;
info!("store delete record: {}", name);

let model = db.model(&name).await?;
// this is temporary until the model name hash is precomputed
let model = db.model(&format!("{:#x}", get_selector_from_name(&name)?)).await?;

let keys_start = NUM_KEYS_INDEX + 1;
let keys = event.data[keys_start..].to_vec();
Expand Down
5 changes: 3 additions & 2 deletions crates/torii/core/src/processors/store_set_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use async_trait::async_trait;
use dojo_world::contracts::model::ModelReader;
use dojo_world::contracts::world::WorldContractReader;
use starknet::core::types::{Event, TransactionReceipt};
use starknet::core::utils::parse_cairo_short_string;
use starknet::core::utils::{get_selector_from_name, parse_cairo_short_string};
use starknet::providers::Provider;
use tracing::info;

Expand Down Expand Up @@ -47,7 +47,8 @@ where
let name = parse_cairo_short_string(&event.data[MODEL_INDEX])?;
info!("store set record: {}", name);

let model = db.model(&name).await?;
// this is temporary until the model name hash is precomputed
let model = db.model(&format!("{:#x}", get_selector_from_name(&name)?)).await?;

let keys_start = NUM_KEYS_INDEX + 1;
let keys_end: usize = keys_start + usize::from(u8::try_from(event.data[NUM_KEYS_INDEX])?);
Expand Down
Loading

0 comments on commit 1bed704

Please sign in to comment.