Skip to content

Commit

Permalink
add: distributed skeleton
Browse files Browse the repository at this point in the history
  • Loading branch information
hthuz committed Aug 3, 2024
1 parent cb0cbcd commit 414d4dd
Show file tree
Hide file tree
Showing 11 changed files with 365 additions and 28 deletions.
25 changes: 20 additions & 5 deletions go/distributed/cmd/gradeservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,33 @@ import (
"distributed/grades"
"distributed/registry"
"distributed/service"
"log"
stlog "log"
)

func GradeServiceMain() {
r := registry.Registration{
ServiceName: "Grade Service",
ServiceHost: "localhost",
ServicePort: "4000",
ServiceName: "Grade Service",
ServiceHost: "localhost",
ServicePort: "4000",
ServiceURL: "http://localhost:4000",
ServiceUpdateURL: "http://localhost:4000/services",
HeartbeatURL: "http://localhost:4000/heartbeat",
// RequiredServices: []string{"Log Service"},
}
ctx, err := service.Start(context.Background(), r, grades.RegisterHandlers)
if err != nil {
log.Fatal(err)
stlog.Fatal(err)
}
// Need required service
// logProvider, err := registry.GetProvider("Log Service")
// if err != nil {
// stlog.Println("dependent log service not found")
// stlog.Fatal(err)
// }

// log.SetClientLogger(logProvider, r.ServiceName)

<-ctx.Done()
service.Stop(r)

}
10 changes: 7 additions & 3 deletions go/distributed/cmd/logservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@ import (
func LogServiceMain() {
log.Run("output.log")
r := registry.Registration{
ServiceName: "Log Service",
ServiceHost: "localhost",
ServicePort: "3000",
ServiceName: "Log Service",
ServiceHost: "localhost",
ServicePort: "3000",
ServiceURL: "http://localhost:3000",
ServiceUpdateURL: "http://localhost:3000/services",
HeartbeatURL: "http://localhost:3000/heartbeat",
}
ctx, err := service.Start(context.Background(), r, log.RegisterHandlers)
if err != nil {
stlog.Fatal(err)
}
<-ctx.Done()
service.Stop(r)
}
28 changes: 28 additions & 0 deletions go/distributed/cmd/portalservice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package cmd

import (
"context"
"distributed/portal"
"distributed/registry"
"distributed/service"
"log"
)

func PortalServiceMain() {
r := registry.Registration{
ServiceName: "Portal Service",
ServiceHost: "localhost",
ServicePort: "6000",
ServiceURL: "http://localhost:6000",
RequiredServices: []string{"Log Service", "Grade Service"},
ServiceUpdateURL: "http://localhost:6000/services",
HeartbeatURL: "http://localhost:6000/heartbeat",
}

ctx, err := service.Start(context.Background(), r, portal.RegisterHandlers)
if err != nil {
log.Fatal(err)
}
<-ctx.Done()
service.Stop(r)
}
12 changes: 9 additions & 3 deletions go/distributed/cmd/registryservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,22 @@ import (
)

func RegistryServiceMain() {

registry.SetupRegistryService()
// Registry Service will register itself first
r := registry.Registration{
ServiceName: "Registry Service",
ServiceHost: "localhost",
ServicePort: "5000",
ServiceName: "Registry Service",
ServiceHost: "localhost",
ServicePort: "5000",
ServiceURL: "http://localhost:5000",
ServiceUpdateURL: "http://localhost:5000/services",
HeartbeatURL: "http://localhost:5000/heartbeat",
}
ctx, err := service.Start(context.Background(), r, registry.RegisterHandlers)
if err != nil {
log.Fatal(err)
}

<-ctx.Done()
service.Stop(r)
}
30 changes: 30 additions & 0 deletions go/distributed/log/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package log

import (
"bytes"
"fmt"
stlog "log"
"net/http"
)

func SetClientLogger(serviceURL string, clientServiceName string) {
stlog.SetPrefix(fmt.Sprintf("[%v] - ", clientServiceName))
stlog.SetFlags(0)
stlog.SetOutput(&clientLogger{url: serviceURL})
}

type clientLogger struct {
url string
}

func (cl clientLogger) Write(data []byte) (int, error) {
buf := bytes.NewBuffer(data)
resp, err := http.Post(cl.url, "text/plain", buf)
if err != nil {
return 0, err
}
if resp.StatusCode != http.StatusOK {
return 0, fmt.Errorf("failed to log message, get status code %v", resp.StatusCode)
}
return len(data), nil
}
2 changes: 2 additions & 0 deletions go/distributed/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ func main() {
cmd.GradeServiceMain()
case "registry":
cmd.RegistryServiceMain()
case "portal":
cmd.PortalServiceMain()
default:
log.Fatal("invalid cmd")
}
Expand Down
50 changes: 50 additions & 0 deletions go/distributed/portal/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package portal

import (
"distributed/grades"
"distributed/registry"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
)

func RegisterHandlers() {
http.Handle("/", http.RedirectHandler("/students", http.StatusPermanentRedirect))
http.HandleFunc("/students", portalHandler)

}

func portalHandler(w http.ResponseWriter, r *http.Request) {
renderStudents(w, r)
}

func renderStudents(w http.ResponseWriter, r *http.Request) {
var err error
defer func() {
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
log.Println("render students err: ", err)
}
}()

serviceURL, err := registry.GetProvider("Grade Service")
if err != nil {
return
}
resp, err := http.Get(serviceURL + "/students")
if err != nil {
return
}
data, err := io.ReadAll(resp.Body)
if err != nil {
return
}
var students grades.Students
err = json.Unmarshal(data, &students)
if err != nil {
return
}
fmt.Printf("RECV: +v\n", students)
}
63 changes: 63 additions & 0 deletions go/distributed/registry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,33 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
)

var prov = provider{
services: make(map[string]string, 0),
}

func RegisterService(r Registration) error {

// Listen on Heartbeat url
heartbeatURL, err := url.Parse(r.HeartbeatURL)
if err != nil {
return err
}
http.HandleFunc(heartbeatURL.Path, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})

// Listen on serviceUpdate url
serviceUpdateURL, err := url.Parse(r.ServiceUpdateURL)
if err != nil {
return err
}
http.HandleFunc(serviceUpdateURL.Path, serviceUpdateHandler)

buf := new(bytes.Buffer)
data, err := json.Marshal(r)
if err != nil {
Expand Down Expand Up @@ -52,5 +74,46 @@ func UnregisterService(url string) error {
return errors.New("fail to unregister")
}
return nil
}

func serviceUpdateHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
bytes, err := io.ReadAll(r.Body)
defer r.Body.Close()
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
var p patch
err = json.Unmarshal(bytes, &p)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
prov.ApplyPatch(p)
}

type provider struct {
services map[string]string // Map of serviceName to serviceURL
}

func (p *provider) ApplyPatch(pat patch) {
for _, entry := range pat.Added {
p.services[entry.Name] = entry.URL
}
for _, entry := range pat.Removed {
delete(p.services, entry.Name)
}
}

// Get the URL of provider service
func GetProvider(serviceName string) (string, error) {
url, ok := prov.services[serviceName]
if !ok {
return "", fmt.Errorf("No providers found for %v", serviceName)
}
return url, nil
}
Loading

0 comments on commit 414d4dd

Please sign in to comment.