Skip to content

Commit

Permalink
fix comments, rollback messed events validations, removed tests check…
Browse files Browse the repository at this point in the history
…ing existence of event links in db
  • Loading branch information
ice-cronus committed Oct 10, 2024
1 parent f1bc473 commit 79e5d60
Show file tree
Hide file tree
Showing 9 changed files with 474 additions and 215 deletions.
7 changes: 4 additions & 3 deletions cmd/seeder/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
Tool to seed the relay with data
Sample call:
```bash
seeder --relays=wss://relay.damus.io/ --outputRelay=wss://localhost:9998 --profilesCount=1000 --threads=100 --perUser=100
seeder --relays=wss://relay.damus.io/ --outputRelay=wss://localhost:9998 --profilesCount=1000 --threads=100 --perUser=100 --uploadKey=...
```
Params:
* relays = relay list to fetch data from
* relays = relay list to fetch data from (multiple can be passed like --relays=wss://relay.damus.io/ --relays=wss://strfry.iris.to/)
* outputRelay = relay to write data to
* profilesCount = count of profiles to fetch
* perUser = count of posts / articles to fetch for each user
* perUser = count of posts / articles to fetch for each user
* uploadKey = imgbb free api key to host random webp images if user dont have any, if not provided - default image urls are rotated
150 changes: 122 additions & 28 deletions cmd/seeder/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ import (
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
"sync/atomic"
"time"

"github.com/0x6flab/namegenerator"
"github.com/JohnNON/ImgBB"
"github.com/alitto/pond"
"github.com/gookit/goutil/errorx"
"github.com/cockroachdb/errors"
"github.com/nbd-wtf/go-nostr"

"github.com/ice-blockchain/subzero/model"
Expand All @@ -23,24 +26,29 @@ type (
StartFetching(ctx context.Context)
}
fetcher struct {
relays [][]*nostr.Relay
outputRelays []*nostr.Relay
outputLBIdx uint64
profiles int
perUser int
threads int
relays [][]*nostr.Relay
outputRelays []*nostr.Relay
outputLBIdx uint64
imgsLBIndex uint64
profiles int
perUser int
threads int
uploadKey string
webpUploadClient *imgbb.Client
}
)

const concurrentReqs = 10

func NewFetcher(ctx context.Context, relayUrls []string, threads, profiles, perUser int, output string) Fetcher {
func NewFetcher(ctx context.Context, relayUrls []string, threads, profiles, perUser int, output, uploadApiKey string) Fetcher {
f := &fetcher{
relays: make([][]*nostr.Relay, 0, len(relayUrls)),
outputRelays: make([]*nostr.Relay, 0, threads),
profiles: profiles,
perUser: perUser,
threads: threads,
relays: make([][]*nostr.Relay, 0, len(relayUrls)),
outputRelays: make([]*nostr.Relay, 0, threads),
profiles: profiles,
perUser: perUser,
threads: threads,
uploadKey: uploadApiKey,
webpUploadClient: imgbb.NewClient(http.DefaultClient, uploadApiKey),
}
for _ = range threads {
relay, err := connectToRelay(ctx, output)
Expand All @@ -49,7 +57,7 @@ func NewFetcher(ctx context.Context, relayUrls []string, threads, profiles, perU
}
f.outputRelays = append(f.outputRelays, relay)
}
log.Println(fmt.Sprintf("Established %v conns to %v", threads, output))
log.Printf("Established %v conns to %v", threads, output)

for _, relayUrl := range relayUrls {
rels := make([]*nostr.Relay, 0, threads/concurrentReqs)
Expand Down Expand Up @@ -128,11 +136,17 @@ func (f *fetcher) mustFetchUsers(ctx context.Context, profiles int, relayConns [
}
if profile, ok := <-eventAuthor; ok {
e := &model.Event{Event: *profile}
if e.Validate() == nil {
privKey := nostr.GeneratePrivateKey()
updated, pErr := f.populateProfile(ctx, e, privKey)
if e.Validate() == nil && pErr == nil {
if aErr := f.AcceptEvent(ctx, e); aErr != nil {
log.Fatal(aErr)
}
f.fetchUserContent(ctx, ev.PubKey, relayConns[idx])
var remapEventsToKey string
if updated {
remapEventsToKey = privKey
}
f.fetchUserContent(ctx, ev.PubKey, relayConns[idx], remapEventsToKey)
atomic.AddUint64(&profilesProcessed, 1)
}
}
Expand All @@ -141,7 +155,7 @@ func (f *fetcher) mustFetchUsers(ctx context.Context, profiles int, relayConns [
}
pool.StopAndWait()
}
func (f *fetcher) fetchUserContent(ctx context.Context, userKey string, relay *nostr.Relay) {
func (f *fetcher) fetchUserContent(ctx context.Context, userKey string, relay *nostr.Relay, remapEventsToKey string) {
events, err := relay.QueryEvents(ctx, nostr.Filter{
Kinds: []int{nostr.KindTextNote, nostr.KindArticle, nostr.KindReaction, nostr.KindRepost},
Authors: []string{userKey},
Expand All @@ -154,11 +168,11 @@ func (f *fetcher) fetchUserContent(ctx context.Context, userKey string, relay *n
for ev := range events {
evList = append(evList, ev)
}
eventsCount := f.fetchLinkedAndProcessEvents(ctx, evList, relay)
eventsCount := f.fetchLinkedAndProcessEvents(ctx, evList, relay, userKey, remapEventsToKey)
fmt.Println(relay.URL, userKey, eventsCount)
}

func (f *fetcher) fetchLinkedAndProcessEvents(ctx context.Context, events []*nostr.Event, relay *nostr.Relay) int {
func (f *fetcher) fetchLinkedAndProcessEvents(ctx context.Context, events []*nostr.Event, relay *nostr.Relay, user, remapEventsToKey string) int {
repliesAndReactionsGroupedByRelay := map[string][]string{}
repostedEventsGroupedByRelay := map[string][]string{}
eventsAndReplies := map[string]*nostr.Event{}
Expand All @@ -184,16 +198,34 @@ func (f *fetcher) fetchLinkedAndProcessEvents(ctx context.Context, events []*nos
if len(repliesAndReactionsGroupedByRelay) > 0 {
evCount += f.fetchLinkedEvents(ctx, relay, repliesAndReactionsGroupedByRelay, func(evID string) {
delete(eventsAndReplies, evID)
})
}, user, remapEventsToKey)
}
if len(repostedEventsGroupedByRelay) > 0 {
evCount += f.fetchLinkedEvents(ctx, relay, repostedEventsGroupedByRelay, func(evID string) {
delete(eventsAndReplies, evID)
})
}, user, remapEventsToKey)
}
var profileEventForUpdatedReactions model.Event
var privKey string
for _, ev := range eventsAndReplies {
e := &model.Event{Event: *ev}
if e.Validate() == nil {
if !validKind(e) {
continue
}
if remapEventsToKey != "" {
if e.PubKey == user {
e.ID = ""
if err := e.Sign(remapEventsToKey); err != nil {
log.Fatal(err)
}
}
}
if e.Kind == nostr.KindReaction && e.Content != "+" && e.Content != "-" && e.Content != "" {
if err := f.updateReaction(ctx, e, &profileEventForUpdatedReactions, &privKey); err != nil {
continue
}
}
if err := e.Validate(); err == nil {
if aErr := f.AcceptEvent(ctx, e); aErr != nil {
log.Fatal(aErr)
}
Expand All @@ -202,8 +234,10 @@ func (f *fetcher) fetchLinkedAndProcessEvents(ctx context.Context, events []*nos
}
return evCount
}

func (f *fetcher) fetchLinkedEvents(ctx context.Context, currentRelay *nostr.Relay, groupedByRelay map[string][]string, onFailure func(evID string)) int {
func validKind(e *model.Event) bool {
return e.Kind == nostr.KindTextNote || e.Kind == nostr.KindArticle || e.Kind == nostr.KindReaction || e.Kind == nostr.KindRepost
}
func (f *fetcher) fetchLinkedEvents(ctx context.Context, currentRelay *nostr.Relay, groupedByRelay map[string][]string, onFailure func(evID string), userKey, remapEventsToKey string) int {
evCount := 0
for relayUrl, eventsFromRelay := range groupedByRelay {
origEvents, oErr := f.fetchPost(ctx, eventsFromRelay, relayUrl, currentRelay)
Expand All @@ -218,7 +252,7 @@ func (f *fetcher) fetchLinkedEvents(ctx context.Context, currentRelay *nostr.Rel
evList = append(evList, ev)
}
if len(evList) > 0 {
evCount += f.fetchLinkedAndProcessEvents(ctx, evList, currentRelay)
evCount += f.fetchLinkedAndProcessEvents(ctx, evList, currentRelay, userKey, remapEventsToKey)
}
}
return evCount
Expand Down Expand Up @@ -249,22 +283,82 @@ func (f *fetcher) fetchPost(ctx context.Context, events []string, relayUrl strin
}()
r, err = connectToRelay(connCtx, relayUrl)
if err != nil {
return nil, errorx.Withf(err, "failed to connect to %v", relayUrl)
return nil, errors.Wrapf(err, "failed to connect to %v", relayUrl)
}
}
fetchedEvents, err := queryEvents(ctx, r, nostr.Filter{
IDs: events,
})
if err != nil {
return nil, errorx.Withf(err, "failed to fetch original posts for replies %#v from %v", events, relayUrl)
return nil, errors.Wrapf(err, "failed to fetch original posts for replies %#v from %v", events, relayUrl)
}
return fetchedEvents, err
}

func queryEvents(ctx context.Context, r *nostr.Relay, filter nostr.Filter) (chan *nostr.Event, error) {
fetchedEvents, err := r.QueryEvents(ctx, filter)
if err != nil {
return fetchedEvents, errorx.Withf(err, "failed to query events from %v", r.URL)
return fetchedEvents, errors.Wrapf(err, "failed to query events from %v", r.URL)
}
return fetchedEvents, nil
}

func (f *fetcher) populateProfile(ctx context.Context, e *model.Event, privKey string) (bool, error) {
//return false, nil
var parsedContent model.ProfileMetadataContent
if err := json.Unmarshal([]byte(e.Content), &parsedContent); err != nil {
return false, errors.Wrapf(model.ErrWrongEventParams, "nip-01,nip-24: wrong json fields for: %+v", e)
}
updated := false
if parsedContent.Name == "" {
parsedContent.Name = namegenerator.NewGenerator().Generate()
updated = true
}
if parsedContent.About == "" {
parsedContent.About = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat."
updated = true
}
if parsedContent.Picture == "" {
var err error
if parsedContent.Picture, err = f.generateProfilePhoto(ctx, parsedContent.Name); err != nil {
return false, errors.Wrapf(err, "failed to gen random photo")
}
}
if updated {
b, err := json.Marshal(parsedContent)
if err != nil {
return false, errors.Wrapf(err, "failed to populate profile with random data")
}
e.Content = string(b)
pubKey, _ := nostr.GetPublicKey(privKey)
fmt.Println("RESIGNED ", e.PubKey, "TO ", pubKey)
e.ID = ""
if err = e.Sign(privKey); err != nil {
return false, errors.Wrapf(err, "failed to re-sign profile with random data")
}
}
return updated, nil
}

func (f *fetcher) updateReaction(ctx context.Context, e *model.Event, profileEvent *model.Event, privKey *string) error {
*privKey = nostr.GeneratePrivateKey()
reaction := e.Content
e.Content = "+"
if err := e.Sign(*privKey); err != nil {
return errors.Wrapf(err, "failed to sign event with updated reaction")
}
if profileEvent == nil {
*profileEvent = model.Event{Event: nostr.Event{
CreatedAt: nostr.Now(),
Kind: nostr.KindProfileMetadata,
Content: "{}",
}}
if _, err := f.populateProfile(ctx, profileEvent, *privKey); err == nil {
if err = f.AcceptEvent(ctx, profileEvent); err != nil {
return errors.Wrapf(err, "failed to accept event with author of updated reaction")
}
fmt.Printf("UPDATED REACTION %v TO %v", reaction, e.Content)
}
}
return nil
}
Loading

0 comments on commit 79e5d60

Please sign in to comment.