Skip to content

Commit

Permalink
Publish ads via http and announce via http, using ipnisync publisher (#…
Browse files Browse the repository at this point in the history
…17)

* Publish ads via http and announce via http, using ipnisync publisher

- Add options to configure optional http publisher
- Add options to send direct http announcements
- Add option to disable libp2p announcements
- Update dependencies
- Flags to set publish options
- Update README
  • Loading branch information
gammazero authored May 6, 2024
1 parent 4290b52 commit 649fe87
Show file tree
Hide file tree
Showing 7 changed files with 468 additions and 1,542 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/depute
27 changes: 19 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,31 @@ $ go install github.com/ipni/depute/cmd/depute@latest
```shell
$ depute -h
Usage of depute:
Usage of ./depute:
-directAnnounceURL value
Indexer URL to send direct http announcement to. Multiple OK
-grpcListenAddr string
The gRPC server listen address. (default "0.0.0.0:40080")
The gRPC server listen address. (default "0.0.0.0:40080")
-grpcTlsCertPath string
The path to gRPC server TLS Certificate.
Path to gRPC server TLS Certificate.
-grpcTlsKeyPath string
The path to gRPC server TLS Key.
Path to gRPC server TLS Key.
-httpListenAddr string
Address to listen on for publishing advertisements over HTTP.
-libp2pIdentityPath string
The path to the marshalled libp2p host identity. If unspecified a random identity is generated.
Path to the marshalled libp2p host identity. If unspecified a random identity is generated.
-libp2pListenAddrs string
The comma separated libp2p host listen addrs. If unspecified the default listen addrs are used at ephemeral port.
Comma separated libp2p host listen addrs. If unspecified the default listen addrs are used at ephemeral port.
-logLevel string
The logging level. Only applied if GOLOG_LOG_LEVEL environment variable is unset. (default "info")
Logging level. Only applied if GOLOG_LOG_LEVEL environment variable is unset. (default "info")
-noPubsub
Disable pubsub announcements of new advertisements.
-pubAddr value
Address to tell indexer where to retrieve advertisements. Multiple OK
-retrievalAddrs string
The comma separated retrieval multiaddrs to advertise. If unspecified, libp2p host listen addrs are used.
Comma separated retrieval multiaddrs to advertise. If unspecified, libp2p host listen addrs are used.
-topic string
Sets the topic that pubsub messages are send on. (default "/indexer/ingest/mainnet")
```
### Run Server Locally
Expand All @@ -47,4 +58,4 @@ To shutdown the server, interrupt the terminal by pressing `Ctrl + C`
## License
[SPDX-License-Identifier: Apache-2.0 OR MIT](LICENSE.md)
[SPDX-License-Identifier: Apache-2.0 OR MIT](LICENSE.md)
42 changes: 36 additions & 6 deletions cmd/depute/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,33 @@ const (
libp2pUserAgent = "ipni/depute"
)

type arrayFlags []string

func (a *arrayFlags) String() string {
return strings.Join(*a, ", ")
}

func (a *arrayFlags) Set(value string) error {
*a = append(*a, value)
return nil
}

func main() {
libp2pIdentityPath := flag.String("libp2pIdentityPath", "", "The path to the marshalled libp2p host identity. If unspecified a random identity is generated.")
libp2pListenAddrs := flag.String("libp2pListenAddrs", "", "The comma separated libp2p host listen addrs. If unspecified the default listen addrs are used at ephemeral port.")
retrievalAddrs := flag.String("retrievalAddrs", "", "The comma separated retrieval multiaddrs to advertise. If unspecified, libp2p host listen addrs are used.")
httpListenAddr := flag.String("httpListenAddr", "", "Address to listen on for publishing advertisements over HTTP.")
var directAnnounceURLs arrayFlags
flag.Var(&directAnnounceURLs, "directAnnounceURL", "Indexer URL to send direct http announcement to. Multiple OK")
var pubAddrs arrayFlags
flag.Var(&pubAddrs, "pubAddr", "Address to tell indexer where to retrieve advertisements. Multiple OK")

noPubsub := flag.Bool("noPubsub", false, "Disable pubsub announcements of new advertisements.")
libp2pIdentityPath := flag.String("libp2pIdentityPath", "", "Path to the marshalled libp2p host identity. If unspecified a random identity is generated.")
libp2pListenAddrs := flag.String("libp2pListenAddrs", "", "Comma separated libp2p host listen addrs. If unspecified the default listen addrs are used at ephemeral port.")
retrievalAddrs := flag.String("retrievalAddrs", "", "Comma separated retrieval multiaddrs to advertise. If unspecified, libp2p host listen addrs are used.")
grpcListenAddr := flag.String("grpcListenAddr", "0.0.0.0:40080", "The gRPC server listen address.")
grpcTlsCertPath := flag.String("grpcTlsCertPath", "", "The path to gRPC server TLS Certificate.")
grpcTlsKeyPath := flag.String("grpcTlsKeyPath", "", "The path to gRPC server TLS Key.")
logLevel := flag.String("logLevel", "info", "The logging level. Only applied if GOLOG_LOG_LEVEL environment variable is unset.")
grpcTlsCertPath := flag.String("grpcTlsCertPath", "", "Path to gRPC server TLS Certificate.")
grpcTlsKeyPath := flag.String("grpcTlsKeyPath", "", "Path to gRPC server TLS Key.")
logLevel := flag.String("logLevel", "info", "Logging level. Only applied if GOLOG_LOG_LEVEL environment variable is unset.")
topic := flag.String("topic", depute.DefaultTopic, "Sets the topic that pubsub messages are send on.")
flag.Parse()

if _, set := os.LookupEnv("GOLOG_LOG_LEVEL"); !set {
Expand Down Expand Up @@ -69,6 +88,17 @@ func main() {
rAddrs := strings.Split(*libp2pListenAddrs, ",")
deputeOpts = append(deputeOpts, depute.WithRetrievalAddrs(rAddrs...))
}
deputeOpts = append(deputeOpts, depute.WithHttpListenAddr(*httpListenAddr))
if *noPubsub {
deputeOpts = append(deputeOpts, depute.WithNoPubsubAnnounce())
}
deputeOpts = append(deputeOpts, depute.WithPublishTopic(*topic))
if len(directAnnounceURLs) != 0 {
deputeOpts = append(deputeOpts, depute.WithDirectAnnounceURLs(directAnnounceURLs))
}
if len(pubAddrs) != 0 {
deputeOpts = append(deputeOpts, depute.WithPublishAddrs(pubAddrs))
}

var gsOpts []grpc.ServerOption
// TODO: expose more flags for gRPC server options.
Expand Down
32 changes: 31 additions & 1 deletion depute.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package depute

import (
"context"
"fmt"
"net"

"github.com/gogo/status"
Expand All @@ -12,8 +13,11 @@ import (
_ "github.com/ipld/go-ipld-prime/codec/dagcbor"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
depute "github.com/ipni/depute/api/v0"
"github.com/ipni/go-libipni/announce"
"github.com/ipni/go-libipni/announce/httpsender"
"github.com/ipni/go-libipni/announce/p2psender"
"github.com/ipni/go-libipni/ingest/schema"
"github.com/ipni/index-provider/engine/chunker"
"github.com/ipni/storetheindex/api/v0/ingest/schema"
"github.com/multiformats/go-multicodec"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand All @@ -39,6 +43,7 @@ var (
type Depute struct {
*options
chunker *chunker.ChainChunker
senders []announce.Sender
server *grpc.Server
}

Expand All @@ -47,8 +52,31 @@ func New(o ...Option) (*Depute, error) {
if err != nil {
return nil, err
}

var senders []announce.Sender
if opts.h != nil && len(opts.publishAddrs) != 0 {
if !opts.noPubsubAnnounce {
// Create an announce sender to send over gossip pubsub.
p2pSender, err := p2psender.New(opts.h, opts.pubTopicName)
if err != nil {
return nil, fmt.Errorf("cannot create p2p pubsub announce sender: %w", err)
}
senders = append(senders, p2pSender)
logger.Info("Pubsub announcements enabled")
}
if len(opts.directAnnounceURLs) != 0 {
httpSender, err := httpsender.New(opts.directAnnounceURLs, opts.h.ID())
if err != nil {
return nil, fmt.Errorf("cannot create http announce sender: %w", err)
}
senders = append(senders, httpSender)
logger.Info("Http announcements enabled")
}
}

return &Depute{
options: opts,
senders: senders,
server: grpc.NewServer(opts.grpcServerOpts...),
}, nil
}
Expand Down Expand Up @@ -112,6 +140,8 @@ func (d *Depute) Publish(ctx context.Context, req *depute.Publish_Request) (*dep
logger.Errorw("Failed to set latest ad link", "link", link.String(), "err", err)
return nil, status.Errorf(codes.Internal, "failed to set latest ad link: %v", err)
}
adCid := link.(cidlink.Link).Cid
announce.Send(ctx, adCid, d.publishAddrs, d.senders...)
logger.Infow("Published advertisement", "link", link.String())
var l depute.Link
if err := l.Marshal(link); err != nil {
Expand Down
170 changes: 68 additions & 102 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,151 +1,117 @@
module github.com/ipni/depute

go 1.19
go 1.21

require (
github.com/gogo/status v1.1.0
github.com/ipfs/go-cid v0.3.2
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-datastore v0.6.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipld/go-ipld-prime v0.19.0
github.com/ipld/go-ipld-prime/storage/dsadapter v0.0.0-20230102063945-1a409dc236dd
github.com/ipni/index-provider v0.10.1
github.com/ipni/storetheindex v0.5.7
github.com/libp2p/go-libp2p v0.23.4
github.com/multiformats/go-multiaddr v0.7.0
github.com/multiformats/go-multicodec v0.7.0
github.com/multiformats/go-multihash v0.2.1
google.golang.org/grpc v1.56.3
github.com/ipld/go-ipld-prime v0.21.0
github.com/ipld/go-ipld-prime/storage/dsadapter v0.0.0-20240322071758-198d7dba8fb8
github.com/ipni/go-libipni v0.6.5
github.com/ipni/index-provider v0.15.1
github.com/libp2p/go-libp2p v0.33.2
github.com/multiformats/go-multiaddr v0.12.3
github.com/multiformats/go-multicodec v0.9.0
github.com/multiformats/go-multihash v0.2.3
google.golang.org/grpc v1.63.2
google.golang.org/protobuf v1.33.0
)

require (
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bep/debounce v1.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/cgroups v1.0.4 // indirect
github.com/containerd/cgroups v1.1.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/elastic/gosigar v0.14.2 // indirect
github.com/filecoin-project/go-cbor-util v0.0.1 // indirect
github.com/filecoin-project/go-data-transfer v1.15.2 // indirect
github.com/filecoin-project/go-ds-versioning v0.1.2 // indirect
github.com/filecoin-project/go-statemachine v1.0.2 // indirect
github.com/filecoin-project/go-statestore v0.2.0 // indirect
github.com/flynn/noise v1.0.0 // indirect
github.com/flynn/noise v1.1.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
github.com/gammazero/channelqueue v0.2.1 // indirect
github.com/gammazero/deque v0.2.1 // indirect
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/gogo/googleapis v1.1.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hannahhoward/cbor-gen-for v0.0.0-20200817222906-ea96cece81f1 // indirect
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e // indirect
github.com/google/pprof v0.0.0-20240207164012-fb44976bdcd5 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/huin/goupnp v1.0.3 // indirect
github.com/ipfs/go-block-format v0.0.3 // indirect
github.com/ipfs/go-graphsync v0.13.2 // indirect
github.com/ipfs/go-ipfs-exchange-offline v0.3.0 // indirect
github.com/ipfs/go-ipfs-pq v0.0.2 // indirect
github.com/ipfs/go-ipfs-util v0.0.2 // indirect
github.com/ipfs/go-ipld-cbor v0.0.6 // indirect
github.com/ipfs/go-ipld-format v0.4.0 // indirect
github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipfs/go-peertaskqueue v0.8.0 // indirect
github.com/ipld/go-car/v2 v2.4.1 // indirect
github.com/ipld/go-codec-dagpb v1.5.0 // indirect
github.com/hashicorp/go-retryablehttp v0.7.4 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/huin/goupnp v1.3.0 // indirect
github.com/ipld/go-car/v2 v2.13.1 // indirect
github.com/ipld/go-ipld-adl-hamt v0.0.0-20220616142416-9004dbd839e0 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/klauspost/compress v1.15.12 // indirect
github.com/klauspost/cpuid/v2 v2.1.2 // indirect
github.com/koron/go-ssdp v0.0.3 // indirect
github.com/klauspost/compress v1.17.6 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/libp2p/go-cidranger v1.1.0 // indirect
github.com/libp2p/go-flow-metrics v0.1.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect
github.com/libp2p/go-libp2p-core v0.20.1 // indirect
github.com/libp2p/go-libp2p-gostream v0.5.0 // indirect
github.com/libp2p/go-libp2p-pubsub v0.8.1 // indirect
github.com/libp2p/go-msgio v0.2.0 // indirect
github.com/libp2p/go-nat v0.1.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.4.1 // indirect
github.com/libp2p/go-libp2p-pubsub v0.10.0 // indirect
github.com/libp2p/go-msgio v0.3.0 // indirect
github.com/libp2p/go-nat v0.2.0 // indirect
github.com/libp2p/go-netroute v0.2.1 // indirect
github.com/libp2p/go-openssl v0.1.0 // indirect
github.com/libp2p/go-reuseport v0.2.0 // indirect
github.com/libp2p/go-yamux/v4 v4.0.0 // indirect
github.com/lucas-clemente/quic-go v0.29.1 // indirect
github.com/marten-seemann/qpack v0.3.0 // indirect
github.com/marten-seemann/qtls-go1-18 v0.1.3 // indirect
github.com/marten-seemann/qtls-go1-19 v0.1.1 // indirect
github.com/libp2p/go-reuseport v0.4.0 // indirect
github.com/libp2p/go-yamux/v4 v4.0.1 // indirect
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/mattn/go-pointer v0.0.1 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/miekg/dns v1.1.50 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/miekg/dns v1.1.58 // indirect
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect
github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect
github.com/minio/sha256-simd v1.0.0 // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.1.0 // indirect
github.com/multiformats/go-base36 v0.1.0 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect
github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect
github.com/multiformats/go-multibase v0.1.1 // indirect
github.com/multiformats/go-multistream v0.3.3 // indirect
github.com/multiformats/go-varint v0.0.6 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/gomega v1.24.0 // indirect
github.com/opencontainers/runtime-spec v1.0.2 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/multiformats/go-multibase v0.2.0 // indirect
github.com/multiformats/go-multistream v0.5.0 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/onsi/ginkgo/v2 v2.15.0 // indirect
github.com/opencontainers/runtime-spec v1.2.0 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/polydawn/refmt v0.0.0-20201211092308-30ac6d18308e // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/polydawn/refmt v0.89.0 // indirect
github.com/prometheus/client_golang v1.18.0 // indirect
github.com/prometheus/client_model v0.6.0 // indirect
github.com/prometheus/common v0.47.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
github.com/quic-go/quic-go v0.42.0 // indirect
github.com/quic-go/webtransport-go v0.6.0 // indirect
github.com/raulk/go-watchdog v1.3.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/twmb/murmur3 v1.1.6 // indirect
github.com/urfave/cli/v2 v2.16.3 // indirect
github.com/whyrusleeping/cbor-gen v0.0.0-20220514204315-f29c37e9c44c // indirect
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.opentelemetry.io/otel v1.10.0 // indirect
go.opentelemetry.io/otel/trace v1.10.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.23.0 // indirect
github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 // indirect
github.com/whyrusleeping/cbor-gen v0.0.0-20240109153615-66e95c3e8a87 // indirect
go.uber.org/dig v1.17.1 // indirect
go.uber.org/fx v1.20.1 // indirect
go.uber.org/mock v0.4.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/exp v0.0.0-20221106115401-f9659909a136 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/exp v0.0.0-20240213143201-ec583247a57a // indirect
golang.org/x/mod v0.15.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 // indirect
golang.org/x/tools v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
lukechampine.com/blake3 v1.1.7 // indirect
golang.org/x/tools v0.18.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect
lukechampine.com/blake3 v1.2.1 // indirect
)
Loading

0 comments on commit 649fe87

Please sign in to comment.