Skip to content

Commit

Permalink
event filters (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
ice-dionysos authored Nov 18, 2024
1 parent a77ef17 commit d72efde
Show file tree
Hide file tree
Showing 20 changed files with 1,337 additions and 155 deletions.
4 changes: 2 additions & 2 deletions database/query/.testdata/application.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# SPDX-License-Identifier: ice License 1.0


database:
query:
url: ":memory:"
url: ":memory:"
private-key: "eff8260efe5ffe4a73a757d794e673d7497a16621ab7efb387446c65ec8488a9"
30 changes: 27 additions & 3 deletions database/query/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import (
type (
dbClient struct {
*sqlx.DB

stmtCacheMx *sync.RWMutex
stmtCache map[string]*sqlx.NamedStmt
relayPrivateKey string
stmtCacheMx *sync.RWMutex
stmtCache map[string]*sqlx.NamedStmt
}
)

Expand Down Expand Up @@ -57,6 +57,21 @@ func init() {
Ptr: sqlAttestationUpdateIsAllowed,
Pure: true,
},
{
Name: "subzero_nostr_tag_a_get_kind",
Ptr: sqlTagAGetAt(0),
Pure: true,
},
{
Name: "subzero_nostr_tag_a_get_pk",
Ptr: sqlTagAGetAt(1),
Pure: true,
},
{
Name: "subzero_nostr_tag_a_get_dtag",
Ptr: sqlTagAGetAt(2),
Pure: true,
},
}

for idx := range funcTable {
Expand Down Expand Up @@ -109,6 +124,15 @@ func openDatabase(target string, runDDL bool) *dbClient {
return client
}

func (db *dbClient) WithPrivateKey(privateKey string) *dbClient {
if privateKey == "" {
panic("private key is empty")
}
db.relayPrivateKey = privateKey

return db
}

func (db *dbClient) exec(ctx context.Context, sql string, arg any) (rowsAffected int64, err error) {
var (
hash = hashSQL(sql)
Expand Down
12 changes: 12 additions & 0 deletions database/query/client_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package query

import (
"encoding/json"
"strings"

"github.com/cockroachdb/errors"

Expand Down Expand Up @@ -52,3 +53,14 @@ func sqlAttestationUpdateIsAllowed(oldTagsJSON, newTagsJSON string) (bool, error

return model.AttestationUpdateIsAllowed(oldTags, newTags), nil
}

func sqlTagAGetAt(pos int) func(string) string {
return func(tag string) string {
fields := strings.Split(tag, ":")
if len(fields) <= pos {
return ""
}

return fields[pos]
}
}
6 changes: 4 additions & 2 deletions database/query/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ var (

type (
config struct {
URL string `yaml:"url"`
URL string `yaml:"url"`
PrivateKey string `yaml:"private-key"`
}
)

func MustInit(ctx context.Context) {
globalDB.Once.Do(func() {
globalConfig = cfg.MustGet[config]()
globalDB.Client = openDatabase(globalConfig.URL, true)
globalDB.Client = openDatabase(globalConfig.URL, true).
WithPrivateKey(globalConfig.PrivateKey)

go globalDB.Client.StartExpiredEventsCleanup(ctx)
go func() {
Expand Down
70 changes: 64 additions & 6 deletions database/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,23 @@ func (db *dbClient) executeBatch(ctx context.Context, req *databaseBatchRequest)
return err
}

func (db *dbClient) MustSignEvent(event *databaseEvent) {
if event.PubKey != "" {
event.Tags = append(event.Tags, model.Tag{"p", event.PubKey})
}
if event.ID != "" {
event.Tags = append(event.Tags, model.Tag{"i", event.ID, "event"})
}
if event.MasterPubKey != "" && event.MasterPubKey != event.PubKey {
event.Tags = append(event.Tags, model.Tag{model.CustomIONTagOnBehalfOf, event.MasterPubKey})
}

err := event.Sign(db.relayPrivateKey)
if err != nil {
panic(errors.Wrap(err, "failed to sign event"))
}
}

func (db *dbClient) SelectEvents(ctx context.Context, subscription *model.Subscription) EventIterator {
limit := int64(selectDefaultBatchLimit)
hasLimitFilter := subscription != nil && len(subscription.Filters) > 0 && subscription.Filters[0].Limit > 0
Expand All @@ -197,6 +214,7 @@ func (db *dbClient) SelectEvents(ctx context.Context, subscription *model.Subscr

it := &eventIterator{
oneShot: hasLimitFilter && limit <= selectDefaultBatchLimit,
signer: db,
fetch: func(pivot int64) (*sqlx.Rows, error) {
if limit <= 0 {
return nil, nil
Expand Down Expand Up @@ -268,7 +286,7 @@ func generateEventsCountClause(subscription *model.Subscription) (sqlQuery strin
}
}

where, params, err := generateEventsWhereClause(subscription)
where, _, params, err := generateEventsWhereClause(subscription)
if err != nil {
return "", nil, errors.Wrap(err, "failed to generate events where clause")
}
Expand Down Expand Up @@ -296,7 +314,7 @@ func (db *dbClient) CountEvents(ctx context.Context, subscription *model.Subscri
}

func generateSelectEventsSQL(subscription *model.Subscription, systemCreatedAtPivot, limit int64) (sql string, params map[string]any, err error) {
where, params, err := generateEventsWhereClause(subscription)
whereMain, depClause, params, err := generateEventsWhereClause(subscription)
if err != nil {
return "", nil, errors.Wrap(err, "failed to generate events where clause")
}
Expand All @@ -313,32 +331,72 @@ func generateSelectEventsSQL(subscription *model.Subscription, systemCreatedAtPi
limitQuery = " limit :mainlimit"
}

return `
if depClause == "" {
return `
select
e.kind,
e.created_at,
e.system_created_at,
e.id,
e.pubkey,
e.master_pubkey,
e.sig,
e.content,
tags as jtags
from
events e
where ` + systemCreatedAtFilter + `(` + where + `)
where ` + systemCreatedAtFilter + `(` + whereMain + `)
order by
system_created_at desc
` + limitQuery, params, nil
}

return `
with eventsmain as (
select
e.kind,
e.created_at,
e.system_created_at,
e.id,
e.pubkey,
e.master_pubkey,
e.sig,
e.content,
e.d_tag,
tags as jtags
from
events e
where ` + systemCreatedAtFilter + `(` + whereMain + `)
order by
system_created_at desc
` + limitQuery + `
)
select
*
from
eventsmain
` + depClause, params, nil
}

func generateEventsWhereClause(subscription *model.Subscription) (clause string, params map[string]any, err error) {
func generateEventsWhereClause(subscription *model.Subscription) (clauseMain, clauseDeps string, params map[string]any, err error) {
var filters []model.Filter

if subscription != nil {
filters = subscription.Filters
}

return newWhereBuilder().Build(filters...)
builder := newWhereBuilder()
clauseMain, params, err = builder.Build(filters...)
if err != nil {
return "", "", nil, err
}

clauseDeps, params, err = builder.BuildDependencies("eventsmain")
if err != nil {
return "", "", nil, err
}

return clauseMain, clauseDeps, params, nil
}

func (db *dbClient) deleteExpiredEvents(ctx context.Context) error {
Expand Down
21 changes: 17 additions & 4 deletions database/query/query_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,16 @@ import (

type EventIterator iter.Seq2[*model.Event, error]

type eventIterator struct {
fetch func(pivot int64) (*sqlx.Rows, error)
oneShot bool
}
type (
eventSigner interface {
MustSignEvent(*databaseEvent)
}
eventIterator struct {
fetch func(pivot int64) (*sqlx.Rows, error)
signer eventSigner
oneShot bool
}
)

func (it *eventIterator) decodeTags(jtags string) (tags model.Tags, err error) {
if len(jtags) == 0 {
Expand All @@ -42,6 +48,13 @@ func (it *eventIterator) scanEvent(rows *sqlx.Rows) (_ *databaseEvent, err error
return nil, errors.Wrap(err, "failed to decode tags")
}

if ev.Sig == "" {
switch ev.Kind {
case model.KindDVMCount, model.CustomIONKindRelayListMetadata:
it.signer.MustSignEvent(&ev)
}
}

return &ev, nil
}

Expand Down
12 changes: 12 additions & 0 deletions database/query/query_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,18 @@ import (
"github.com/ice-blockchain/subzero/model"
)

func helperSelectEvents(t *testing.T, db *dbClient, filters ...model.Filter) (events []*model.Event) {
t.Helper()

for ev, err := range db.SelectEvents(context.Background(), &model.Subscription{Filters: filters}) {
require.NoError(t, err)
require.NotNil(t, ev)
events = append(events, ev)
}

return events
}

func helperSelectEventsN(t *testing.T, db *dbClient, limit int) (events map[string]*model.Event) {
t.Helper()

Expand Down
3 changes: 2 additions & 1 deletion database/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ func helperGetStoredEventsAll(t *testing.T, client *dbClient, ctx context.Contex
func helperNewDatabase(t interface{ Helper() }) *dbClient {
t.Helper()

return openDatabase(":memory:", true)
return openDatabase(":memory:", true).
WithPrivateKey(nostr.GeneratePrivateKey())
}

func TestMain(m *testing.M) {
Expand Down
Loading

0 comments on commit d72efde

Please sign in to comment.