Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Routing job load across some agents [ req/rep - pull/push] #189

Open
toni-moreno opened this issue Mar 8, 2020 · 4 comments
Open

Routing job load across some agents [ req/rep - pull/push] #189

toni-moreno opened this issue Mar 8, 2020 · 4 comments
Labels
enhancement New feature or request

Comments

@toni-moreno
Copy link

toni-moreno commented Mar 8, 2020

Hello! everybody working in this great library.

I'm evaluating mangos as base library to spread load from "master" (who sends jobs ) to a lot of "agents".

Agents right now are all in the same category and any of the agents, will run any of the messages in the master queue.

master && agents Execution

without routing

  • Master
go run pipeline.go master tcp://127.0.0.1:40899 
  • Agent 1
go run pipeline.go agent tcp://127.0.0.1:40899  AGENT1
  • Agent 2
go run pipeline.go agent tcp://127.0.0.1:40899  AGENT2
  • Agent 3
go run pipeline.go agent tcp://127.0.0.1:40899  AGENT3
  • Agent 4
go run pipeline.go agent tcp://127.0.0.1:40899  AGENT4

I would like to add labels to agents in any way to route messages by example by environment=production, testing and master will send messages only to the agents with its proper label. ( agents will only dequeue messages routed with its own label )

This new feature will apply on req/rep and pull/push protocols

Execution examples...

with routing

  • Master
go run pipeline.go master tcp://127.0.0.1:40899 
  • Agent 1
go run pipeline.go agent tcp://127.0.0.1:40899  AGENT1 env=production
  • Agent 2
go run pipeline.go agent tcp://127.0.0.1:40899  AGENT2 env=production
  • Agent 3
go run pipeline.go agent tcp://127.0.0.1:40899  AGENT3  env=testing
  • Agent 4
go run pipeline.go agent tcp://127.0.0.1:40899  AGENT4 env=testing

PoC Code

Without Routing ( as currently is working)

I did the PoC with this code.

package main

import (
	"fmt"
	"os"
	"time"

	"go.nanomsg.org/mangos/v3"

	// register transports

	"go.nanomsg.org/mangos/v3/protocol/rep"
	"go.nanomsg.org/mangos/v3/protocol/req"
	_ "go.nanomsg.org/mangos/v3/transport/all"
)

func die(format string, v ...interface{}) {
	fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
	os.Exit(1)
}

func agent(url string, agentName string) {
	var sock mangos.Socket
	var err error
	var msg []byte
	if sock, err = rep.NewSocket(); err != nil {
		die("can't get new pull socket: %s", err)
	}
	if err = sock.Dial(url); err != nil {
		die("can't dial on push socket: %s", err.Error())
	}
	for {
		// Could also use sock.RecvMsg to get header
		msg, err = sock.Recv()
		fmt.Printf("AGENT [%s]: RECEIVED \"%s\"\n", agentName, msg)
		err = sock.Send([]byte(agentName))
		if err != nil {
			die("can't send reply: %s", err.Error())
		}

	}
}

func master(url string) {
	var sock mangos.Socket
	var err error

	if sock, err = req.NewSocket(); err != nil {
		die("can't get new push socket: %s", err.Error())
	}
	defer sock.Close()
	if err = sock.Listen(url); err != nil {
		die("can't listen on pull socket: %s", err.Error())
	}
	for {
		var resp []byte
		time.Sleep(time.Second / 2)
		msg := "MSG: " + time.Now().String()

		if err = sock.Send([]byte(msg)); err != nil {
			die("can't send message on push socket: %s", err.Error())
		}
		if resp, err = sock.Recv(); err != nil {
			die("can't receive resp: %s", err.Error())
		}
		fmt.Printf("MASTER: SENDED [%s] RECEIVED [%s] \n", msg, string(resp))
	}

}

func main() {
	if len(os.Args) > 2 && os.Args[1] == "agent" {
		agent(os.Args[2], os.Args[3])
		os.Exit(0)
	}
	if len(os.Args) > 3 && os.Args[1] == "master" {
		master(os.Args[2])
		os.Exit(0)
	}
	fmt.Fprintf(os.Stderr,
		"Usage: pipeline agent|master <URL> <ARG> ...\n")
	os.Exit(1)
}

With Routing in Agent (pseudocode)

func agent(url string, agentName string,environment string) {
	var sock mangos.Socket
	var err error
	var msg []byte
	if sock, err = rep.NewSocket(); err != nil {
		die("can't get new pull socket: %s", err)
	}
	if err = sock.Dial(url); err != nil {
		die("can't dial on push socket: %s", err.Error())
	}
	for {
		// Will add labels to Recv to dequeue only messages with this label.
		msg, err = sock.Recv(environment)  // <-------------CHANGE HERE
		fmt.Printf("AGENT [%s]: RECEIVED \"%s\"\n", agentName, msg)
		err = sock.Send([]byte(agentName))
		if err != nil {
			die("can't send reply: %s", err.Error())
		}

	}
}

With Routing in Master(pseudocode)

var sock mangos.Socket

func procesJobs(env string) {
var err error
 for {
		var resp []byte
		time.Sleep(time.Second / 2)
		msg := "MSG: " + time.Now().String()

		if err = sock.Send(env,[]byte(msg)); err != nil { //<-----------------CHANGED HERE
			die("can't send message on push socket: %s", err.Error())
		}
		if resp, err = sock.Recv(); err != nil {
			die("can't receive resp: %s", err.Error())
		}
		fmt.Printf("MASTER: SENDED [%s] RECEIVED [%s] \n", msg, string(resp))
	 }
}

func master(url string) {

	var err error

	if sock, err = req.NewSocket(); err != nil {
		die("can't get new push socket: %s", err.Error())
	}
	defer sock.Close()
	if err = sock.Listen(url); err != nil {
		die("can't listen on pull socket: %s", err.Error())
	}
        //jobs for production
        go procesJobs("production")
        go procesJobs("testing")
}
@gdamore
Copy link
Contributor

gdamore commented Mar 11, 2020

The best way to achieve what you want is to use separate service addresses (URLs) for production, development, etc. Basically use a completely disjoint topology, I think.

The sockets otherwise have no idea how to route your traffic, and really there is nothing currently in the protocol that would allow them to do that.

@toni-moreno
Copy link
Author

Hello @gdamore , Separate URL's implies restart the "master" and reconfigure "agents" on each new "route" I think is not a good solution for me.

The real use case is a web site performance measuring system , right now agents only over AWS "eu-west-1" , but perhaps tomorrow I will add a new agent on AWS "ca-central-1", I would like control measuring jobs from each AWS zone. the job will have a location parameter, and each agent will be started with location label. The master should be transparent to the number of different locations, this is, jobs should be routed thought the same "socket".

There is much work to add this feature? perhaps we can help to add if somebody can help us.

@toni-moreno
Copy link
Author

Hello @gdamore I'm still interested in this feature, and we would like to help implement it if not very difficult and you could assist us.

@gdamore
Copy link
Contributor

gdamore commented Mar 22, 2020

Its not trivial to do this -- because as I said it, there is nothing in the protocol itself to guide routing decisions.

You could probably build a router, starting from the code implementing nng_device(), but you'd need to have it make informed routing decisions (presumably by looking at your application specific payload.)

There might be another way if you're willing to have a separate service address locally, where you use different local routers, which could be uninformed leaving the decision about where to send the message to client.

@gdamore gdamore changed the title [Feature Request] Routing job load across some agents [ req/rep - pull/push] Routing job load across some agents [ req/rep - pull/push] Mar 29, 2020
@gdamore gdamore added the enhancement New feature or request label Mar 29, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants