Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates #8

Merged
merged 19 commits into from
Jul 5, 2024
7 changes: 6 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,9 @@ go.work.sum
bin

cover.out
cover.html
cover.html

.output

.DS_Store
*.log
18 changes: 10 additions & 8 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ RUN apt-get update && \
&& rm -rf /var/lib/apt/lists/*

ARG APP_VERSION=nightly
ARG APP_NAME=p2pmq
ARG BUILD_TARGET=p2pmq
ARG APP_NAME=pmq
ARG BUILD_TARGET=pmq

WORKDIR /p2pmq
WORKDIR /pmq

COPY go.mod go.sum ./
RUN go mod download
Expand All @@ -23,12 +23,14 @@ RUN GOOS=linux CGO_ENABLED=0 go build -tags netgo -a -v -o ./bin/${BUILD_TARGET}

FROM alpine:latest as runner

ARG BUILD_TARGET=p2pmq
ARG BUILD_TARGET=pmq

RUN apk --no-cache --upgrade add ca-certificates bash

WORKDIR /p2pmq
WORKDIR /pmq

COPY --from=builder /p2pmq/.env* ./
COPY --from=builder /p2pmq/resources/config/*.p2pmq.yaml ./
COPY --from=builder /p2pmq/bin/${BUILD_TARGET} ./app
COPY --from=builder /pmq/.env* ./
COPY --from=builder /pmq/resources/config/*.pmq.yaml ./
COPY --from=builder /pmq/bin/${BUILD_TARGET} ./app

CMD ["./app"]
12 changes: 11 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
APP_NAME?=p2pmq
APP_NAME?=pmq
BUILD_TARGET?=${APP_NAME}
BUILD_IMG?=${APP_NAME}
APP_VERSION?=$(git describe --tags $(git rev-list --tags --max-count=1) 2> /dev/null || echo "nightly")
CFG_PATH?=./resources/config/default.p2pmq.yaml
TEST_PKG?=./core/...
TEST_TIMEOUT?=2m
GOSSIP_OUT_DIR=../.output

protoc:
./scripts/proto-gen.sh

lint:
@docker run --rm -v $(shell pwd):/app -w /app golangci/golangci-lint:v1.54 golangci-lint run -v --timeout=5m ./...
Expand Down Expand Up @@ -46,3 +50,9 @@ docker-run-default:

docker-run-boot:
@docker run -d --restart unless-stopped --name "${APP_NAME}" -p "${TCP_PORT}":"${TCP_PORT}" -p "${GRPC_PORT}":"${GRPC_PORT}" -e "GRPC_PORT=${GRPC_PORT}" -it "${BUILD_IMG}" /p2pmq/app -config=./bootstrapper.p2pmq.yaml

gossip-sim:
@mkdir -p "${GOSSIP_OUT_DIR}" \
&& export GOSSIP_SIMULATION=full \
&& export GOSSIP_OUT_DIR="${GOSSIP_OUT_DIR}" \
&& go test -v -timeout 10m ./core -run TestGossipSimulation
28 changes: 18 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
# Decentralized Message Engine
# PMQ

Decentralized messaging engine that facilitates the secure exchange of verifiable messages across networks, enabling the formation of a global, collaborative network.

<br />

**NOTE: This is an experimental work in progress. DO NOT USE**
**WARNING: This is an experimental work in progress, DO NOT USE in production**

<br />

[![API Reference](
https://camo.githubusercontent.com/915b7be44ada53c290eb157634330494ebe3e30a/68747470733a2f2f676f646f632e6f72672f6769746875622e636f6d2f676f6c616e672f6764646f3f7374617475732e737667
)](https://pkg.go.dev/github.com/amirylm/p2pmq?tab=doc)
![Go version](https://img.shields.io/badge/go-1.21-blue.svg)
![Github Actions](https://github.com/amirylm/p2pmq/actions/workflows/lint.yml/badge.svg?branch=main)
![Github Actions](https://github.com/amirylm/p2pmq/actions/workflows/test.yml/badge.svg?branch=main)

## Documentation

## Overview
You can find documentation in [./resources/docs](./resources/docs).

**DME** is a distributed, permissionless messaging engine for cross oracle communication.
## Usage

A network of agents is capable of the following:
- Broadcast messages over topics with optimal latency
- Pluggable and decoupled message validation using gRPC
- Scoring for protection from bad actors
- Syncing peers with the latest messages to recover from
restarts, network partition, etc.
Usage examples are available in the [examples](./examples) folder.
16 changes: 1 addition & 15 deletions cmd/p2pmq/main.go → cmd/pmq/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

func main() {
app := &cli.App{
Name: "p2pmq",
Name: "pmq",
Flags: []cli.Flag{
cli.IntFlag{
Name: "grpc-port",
Expand Down Expand Up @@ -92,20 +92,6 @@ func main() {
ctrl.Start(ctx)
defer ctrl.Close()

// <-time.After(time.Second * 10)

// if cfg.Pubsub != nil {
// if err := ctrl.Subscribe(ctx, "test-1"); err != nil {
// lggr.Errorw("could not subscribe to topic", "topic", "test-1", "err", err)
// }
// for i := 0; i < 10; i++ {
// <-time.After(time.Second * 5)
// if err := ctrl.Publish(ctx, "test-1", []byte(fmt.Sprintf("test-data-%d-%s", i, ctrl.ID()))); err != nil {
// lggr.Errorw("could not subscribe to topic", "topic", "test-1", "err", err)
// }
// }
// }

return grpcapi.ListenGrpc(srv, c.Int("grpc-port"))

},
Expand Down
5 changes: 0 additions & 5 deletions cmd/pqclient/main.go

This file was deleted.

8 changes: 7 additions & 1 deletion commons/config_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type PubsubConfig struct {
Scoring *ScoringParams `json:"scoring,omitempty" yaml:"scoring,omitempty"`
MsgValidator *MsgValidationConfig `json:"msgValidator,omitempty" yaml:"msgValidator,omitempty"`
MsgIDFnConfig *MsgIDFnConfig `json:"msgIDFn,omitempty" yaml:"msgIDFn,omitempty"`
Trace bool `json:"trace,omitempty" yaml:"trace,omitempty"`
Trace *PubsubTraceConfig `json:"trace,omitempty" yaml:"trace,omitempty"`
}

func (psc PubsubConfig) GetTopicConfig(name string) (TopicConfig, bool) {
Expand All @@ -30,6 +30,12 @@ func (psc PubsubConfig) GetTopicConfig(name string) (TopicConfig, bool) {
return TopicConfig{}, false
}

type PubsubTraceConfig struct {
Skiplist []string `json:"skiplist,omitempty" yaml:"skiplist,omitempty"`
JsonFile string `json:"jsonFile,omitempty" yaml:"jsonFile,omitempty"`
Debug bool `json:"debug,omitempty" yaml:"debug,omitempty"`
}

type MsgIDFnConfig struct {
Type string `json:"type,omitempty" yaml:"type,omitempty"`
Size int `json:"size,omitempty" yaml:"size,omitempty"`
Expand Down
54 changes: 40 additions & 14 deletions core/ctrl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package core
import (
"context"
"fmt"
"sync/atomic"

"github.com/amirylm/p2pmq/commons"
"github.com/amirylm/p2pmq/commons/utils"
Expand All @@ -12,6 +13,8 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
libp2pnetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/discovery/mdns"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
Expand All @@ -33,9 +36,11 @@ type Controller struct {
mdnsSvc mdns.Service
pubsub *pubsub.PubSub

topicManager *topicManager
denylist pubsub.Blacklist
subFilter pubsub.SubscriptionFilter
topicManager *topicManager
denylist pubsub.Blacklist
subFilter pubsub.SubscriptionFilter
psTracer *psTracer
pubsubRpcCounter *atomic.Uint64

valRouter MsgRouter[pubsub.ValidationResult]
msgRouter MsgRouter[error]
Expand All @@ -46,15 +51,16 @@ func NewController(
cfg commons.Config,
msgRouter MsgRouter[error],
valRouter MsgRouter[pubsub.ValidationResult],
lggrNS string,
name string,
) (*Controller, error) {
d := &Controller{
threadControl: utils.NewThreadControl(),
lggr: lggr.Named(lggrNS).Named("controller"),
cfg: cfg,
valRouter: valRouter,
msgRouter: msgRouter,
topicManager: newTopicManager(),
threadControl: utils.NewThreadControl(),
lggr: lggr.Named(name).Named("ctrl"),
cfg: cfg,
valRouter: valRouter,
msgRouter: msgRouter,
topicManager: newTopicManager(),
pubsubRpcCounter: new(atomic.Uint64),
}
err := d.setup(ctx, cfg)

Expand All @@ -65,6 +71,21 @@ func (c *Controller) ID() string {
return c.host.ID().String()
}

func (c *Controller) Connect(ctx context.Context, dest *Controller) error {
ai := peer.AddrInfo{
ID: dest.host.ID(),
Addrs: dest.host.Addrs(),
}
switch c.host.Network().Connectedness(ai.ID) {
case libp2pnetwork.Connected:
return nil
case libp2pnetwork.CannotConnect:
return fmt.Errorf("cannot connect to %s", ai.ID)
default:
}
return c.host.Connect(ctx, ai)
}

func (c *Controller) RefreshRouters(msgHandler func(*MsgWrapper[error]), valHandler func(*MsgWrapper[pubsub.ValidationResult])) {
if c.valRouter != nil {
c.valRouter.RefreshHandler(valHandler)
Expand All @@ -78,7 +99,7 @@ func (c *Controller) RefreshRouters(msgHandler func(*MsgWrapper[error]), valHand

func (c *Controller) Start(ctx context.Context) {
c.StartOnce(func() {
// d.lggr.Debugf("starting controller with host %s", d.host.ID())
c.lggr.Debugf("starting ctrl")

if c.msgRouter != nil {
c.threadControl.Go(c.msgRouter.Start)
Expand All @@ -98,7 +119,7 @@ func (c *Controller) Start(ctx context.Context) {
c.connect(b)
}
if err := c.dht.Bootstrap(ctx); err != nil {
c.lggr.Panicf("failed to start discovery: %w", err)
c.lggr.Panicf("failed to start dht: %w", err)
}
}
if c.mdnsSvc != nil {
Expand All @@ -111,7 +132,8 @@ func (c *Controller) Start(ctx context.Context) {

func (c *Controller) Close() {
c.StopOnce(func() {
c.lggr.Debugf("closing controller with host %s", c.host.ID())
h := c.host.ID()
c.lggr.Debugf("closing controller with host %s", h)
c.threadControl.Close()
if c.dht != nil {
if err := c.dht.Close(); err != nil {
Expand All @@ -126,6 +148,7 @@ func (c *Controller) Close() {
if err := c.host.Close(); err != nil {
c.lggr.Errorf("failed to close host: %w", err)
}
c.lggr.Debugf("closed controller with host %s", h)
})
}

Expand Down Expand Up @@ -194,7 +217,8 @@ func (c *Controller) setup(ctx context.Context, cfg commons.Config) (err error)
return err
}
c.host = h
c.lggr.Infow("created libp2p host", "peerID", h.ID(), "addrs", h.Addrs())
c.lggr = c.lggr.With("peerID", h.ID())
c.lggr.Debugw("created libp2p host", "addrs", h.Addrs())

if len(cfg.MdnsTag) > 0 {
c.setupMdnsDiscovery(ctx, h, cfg.MdnsTag)
Expand All @@ -207,5 +231,7 @@ func (c *Controller) setup(ctx context.Context, cfg commons.Config) (err error)
}
}

c.lggr.Infow("ctrl setup done", "addrs", h.Addrs())

return nil
}
3 changes: 1 addition & 2 deletions core/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ import (

"github.com/amirylm/p2pmq/commons"
dht "github.com/libp2p/go-libp2p-kad-dht"
dhtopts "github.com/libp2p/go-libp2p-kad-dht/opts"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
)

func (c *Controller) dhtRoutingFactory(ctx context.Context, opts ...dhtopts.Option) func(host.Host) (routing.PeerRouting, error) {
func (c *Controller) dhtRoutingFactory(ctx context.Context, opts ...dht.Option) func(host.Host) (routing.PeerRouting, error) {
return func(h host.Host) (routing.PeerRouting, error) {
dhtInst, err := dht.New(ctx, h, opts...)
if err != nil {
Expand Down
Loading
Loading