diff --git a/go/distributed/cmd/gradeservice.go b/go/distributed/cmd/gradeservice.go index 5a348d0..859db53 100644 --- a/go/distributed/cmd/gradeservice.go +++ b/go/distributed/cmd/gradeservice.go @@ -5,18 +5,33 @@ import ( "distributed/grades" "distributed/registry" "distributed/service" - "log" + stlog "log" ) func GradeServiceMain() { r := registry.Registration{ - ServiceName: "Grade Service", - ServiceHost: "localhost", - ServicePort: "4000", + ServiceName: "Grade Service", + ServiceHost: "localhost", + ServicePort: "4000", + ServiceURL: "http://localhost:4000", + ServiceUpdateURL: "http://localhost:4000/services", + HeartbeatURL: "http://localhost:4000/heartbeat", + // RequiredServices: []string{"Log Service"}, } ctx, err := service.Start(context.Background(), r, grades.RegisterHandlers) if err != nil { - log.Fatal(err) + stlog.Fatal(err) } + // Need required service + // logProvider, err := registry.GetProvider("Log Service") + // if err != nil { + // stlog.Println("dependent log service not found") + // stlog.Fatal(err) + // } + + // log.SetClientLogger(logProvider, r.ServiceName) + <-ctx.Done() + service.Stop(r) + } diff --git a/go/distributed/cmd/logservice.go b/go/distributed/cmd/logservice.go index 80a6fd7..741fb72 100644 --- a/go/distributed/cmd/logservice.go +++ b/go/distributed/cmd/logservice.go @@ -11,13 +11,17 @@ import ( func LogServiceMain() { log.Run("output.log") r := registry.Registration{ - ServiceName: "Log Service", - ServiceHost: "localhost", - ServicePort: "3000", + ServiceName: "Log Service", + ServiceHost: "localhost", + ServicePort: "3000", + ServiceURL: "http://localhost:3000", + ServiceUpdateURL: "http://localhost:3000/services", + HeartbeatURL: "http://localhost:3000/heartbeat", } ctx, err := service.Start(context.Background(), r, log.RegisterHandlers) if err != nil { stlog.Fatal(err) } <-ctx.Done() + service.Stop(r) } diff --git a/go/distributed/cmd/portalservice.go b/go/distributed/cmd/portalservice.go new file mode 100644 index 0000000..8dc07b2 --- /dev/null +++ b/go/distributed/cmd/portalservice.go @@ -0,0 +1,28 @@ +package cmd + +import ( + "context" + "distributed/portal" + "distributed/registry" + "distributed/service" + "log" +) + +func PortalServiceMain() { + r := registry.Registration{ + ServiceName: "Portal Service", + ServiceHost: "localhost", + ServicePort: "6000", + ServiceURL: "http://localhost:6000", + RequiredServices: []string{"Log Service", "Grade Service"}, + ServiceUpdateURL: "http://localhost:6000/services", + HeartbeatURL: "http://localhost:6000/heartbeat", + } + + ctx, err := service.Start(context.Background(), r, portal.RegisterHandlers) + if err != nil { + log.Fatal(err) + } + <-ctx.Done() + service.Stop(r) +} diff --git a/go/distributed/cmd/registryservice.go b/go/distributed/cmd/registryservice.go index 085aedd..be72b65 100644 --- a/go/distributed/cmd/registryservice.go +++ b/go/distributed/cmd/registryservice.go @@ -8,11 +8,16 @@ import ( ) func RegistryServiceMain() { + + registry.SetupRegistryService() // Registry Service will register itself first r := registry.Registration{ - ServiceName: "Registry Service", - ServiceHost: "localhost", - ServicePort: "5000", + ServiceName: "Registry Service", + ServiceHost: "localhost", + ServicePort: "5000", + ServiceURL: "http://localhost:5000", + ServiceUpdateURL: "http://localhost:5000/services", + HeartbeatURL: "http://localhost:5000/heartbeat", } ctx, err := service.Start(context.Background(), r, registry.RegisterHandlers) if err != nil { @@ -20,4 +25,5 @@ func RegistryServiceMain() { } <-ctx.Done() + service.Stop(r) } diff --git a/go/distributed/log/client.go b/go/distributed/log/client.go new file mode 100644 index 0000000..6d5edd5 --- /dev/null +++ b/go/distributed/log/client.go @@ -0,0 +1,30 @@ +package log + +import ( + "bytes" + "fmt" + stlog "log" + "net/http" +) + +func SetClientLogger(serviceURL string, clientServiceName string) { + stlog.SetPrefix(fmt.Sprintf("[%v] - ", clientServiceName)) + stlog.SetFlags(0) + stlog.SetOutput(&clientLogger{url: serviceURL}) +} + +type clientLogger struct { + url string +} + +func (cl clientLogger) Write(data []byte) (int, error) { + buf := bytes.NewBuffer(data) + resp, err := http.Post(cl.url, "text/plain", buf) + if err != nil { + return 0, err + } + if resp.StatusCode != http.StatusOK { + return 0, fmt.Errorf("failed to log message, get status code %v", resp.StatusCode) + } + return len(data), nil +} diff --git a/go/distributed/main.go b/go/distributed/main.go index 051c0f2..e69655f 100644 --- a/go/distributed/main.go +++ b/go/distributed/main.go @@ -18,6 +18,8 @@ func main() { cmd.GradeServiceMain() case "registry": cmd.RegistryServiceMain() + case "portal": + cmd.PortalServiceMain() default: log.Fatal("invalid cmd") } diff --git a/go/distributed/portal/server.go b/go/distributed/portal/server.go new file mode 100644 index 0000000..0fcfdc6 --- /dev/null +++ b/go/distributed/portal/server.go @@ -0,0 +1,50 @@ +package portal + +import ( + "distributed/grades" + "distributed/registry" + "encoding/json" + "fmt" + "io" + "log" + "net/http" +) + +func RegisterHandlers() { + http.Handle("/", http.RedirectHandler("/students", http.StatusPermanentRedirect)) + http.HandleFunc("/students", portalHandler) + +} + +func portalHandler(w http.ResponseWriter, r *http.Request) { + renderStudents(w, r) +} + +func renderStudents(w http.ResponseWriter, r *http.Request) { + var err error + defer func() { + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + log.Println("render students err: ", err) + } + }() + + serviceURL, err := registry.GetProvider("Grade Service") + if err != nil { + return + } + resp, err := http.Get(serviceURL + "/students") + if err != nil { + return + } + data, err := io.ReadAll(resp.Body) + if err != nil { + return + } + var students grades.Students + err = json.Unmarshal(data, &students) + if err != nil { + return + } + fmt.Printf("RECV: +v\n", students) +} diff --git a/go/distributed/registry/client.go b/go/distributed/registry/client.go index 8b28c78..88514b4 100644 --- a/go/distributed/registry/client.go +++ b/go/distributed/registry/client.go @@ -5,11 +5,33 @@ import ( "encoding/json" "errors" "fmt" + "io" "net/http" + "net/url" ) +var prov = provider{ + services: make(map[string]string, 0), +} + func RegisterService(r Registration) error { + // Listen on Heartbeat url + heartbeatURL, err := url.Parse(r.HeartbeatURL) + if err != nil { + return err + } + http.HandleFunc(heartbeatURL.Path, func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }) + + // Listen on serviceUpdate url + serviceUpdateURL, err := url.Parse(r.ServiceUpdateURL) + if err != nil { + return err + } + http.HandleFunc(serviceUpdateURL.Path, serviceUpdateHandler) + buf := new(bytes.Buffer) data, err := json.Marshal(r) if err != nil { @@ -52,5 +74,46 @@ func UnregisterService(url string) error { return errors.New("fail to unregister") } return nil +} +func serviceUpdateHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + bytes, err := io.ReadAll(r.Body) + defer r.Body.Close() + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + var p patch + err = json.Unmarshal(bytes, &p) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + prov.ApplyPatch(p) +} + +type provider struct { + services map[string]string // Map of serviceName to serviceURL +} + +func (p *provider) ApplyPatch(pat patch) { + for _, entry := range pat.Added { + p.services[entry.Name] = entry.URL + } + for _, entry := range pat.Removed { + delete(p.services, entry.Name) + } +} + +// Get the URL of provider service +func GetProvider(serviceName string) (string, error) { + url, ok := prov.services[serviceName] + if !ok { + return "", fmt.Errorf("No providers found for %v", serviceName) + } + return url, nil } diff --git a/go/distributed/registry/registration.go b/go/distributed/registry/registration.go index d64893d..3177690 100644 --- a/go/distributed/registry/registration.go +++ b/go/distributed/registry/registration.go @@ -1,18 +1,34 @@ package registry import ( + "bytes" + "encoding/json" "errors" "fmt" + "log" + "net/http" + "sync" + "time" ) type Registration struct { - ServiceName string - ServiceHost string - ServicePort string + ServiceName string + ServiceHost string + ServicePort string + ServiceURL string + RequiredServices []string + ServiceUpdateURL string + HeartbeatURL string } -func (r *Registration) GetURL() string { - return fmt.Sprintf("http://%s:%s", r.ServiceHost, r.ServicePort) +type patchEntry struct { + Name string + URL string +} + +type patch struct { + Added []patchEntry + Removed []patchEntry } type registry struct { @@ -23,15 +39,143 @@ var registryInst = registry{ registrations: make([]Registration, 0), } -func (r *registry) Add(reg Registration) { +func (r *registry) Add(reg Registration) error { r.registrations = append(r.registrations, reg) + err := r.sendRequiredServices(reg) + p := patch{ + Added: []patchEntry{{ + Name: reg.ServiceName, + URL: reg.ServiceURL, + }}, + } + r.notify(p) + return err } func (r *registry) Remove(url string) error { + fmt.Println(r.GetRegisteredServiceName()) for i := range r.registrations { - if r.registrations[i].GetURL() == url { + if r.registrations[i].ServiceURL == url { + p := patch{ + Removed: []patchEntry{{ + Name: r.registrations[i].ServiceName, + URL: url, + }}, + } r.registrations = append(r.registrations[:i], r.registrations[i+1:]...) + r.notify(p) return nil } } return errors.New("serviceURL not found") } +func (r *registry) sendRequiredServices(reg Registration) error { + var p patch + // For all regisitred service + for _, registration := range r.registrations { + // For all reg's required services + for _, requiredSerivce := range reg.RequiredServices { + // If the reg's required service is already registered + if requiredSerivce == registration.ServiceName { + p.Added = append(p.Added, patchEntry{ + Name: registration.ServiceName, + URL: registration.ServiceURL, + }) + } + } + } + err := r.sendPatch(p, reg.ServiceUpdateURL) + if err != nil { + return err + } + return nil + +} + +func (r *registry) sendPatch(p patch, url string) error { + data, err := json.Marshal(p) + if err != nil { + return err + } + resp, err := http.Post(url, "application/json", bytes.NewBuffer(data)) + if err != nil { + return err + } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("fail to send patch, get code %v", resp.StatusCode) + } + return nil +} + +// When service at registry is updated, notify other services, which may +// depend on these updated services. +func (r *registry) notify(fullpatch patch) { + for _, reg := range r.registrations { + // Only send relevant services in patches + go func(reg Registration) { + var sentPatch patch + for _, requiredService := range reg.RequiredServices { + for _, addedService := range fullpatch.Added { + if requiredService == addedService.Name { + sentPatch.Added = append(sentPatch.Added, addedService) + } + } + for _, removedServce := range fullpatch.Removed { + if requiredService == removedServce.Name { + sentPatch.Removed = append(sentPatch.Removed, removedServce) + } + } + } + if len(sentPatch.Added) != 0 || len(sentPatch.Removed) != 0 { + err := r.sendPatch(sentPatch, reg.ServiceUpdateURL) + if err != nil { + log.Println(err) + } + } + + }(reg) + } + +} + +func (r *registry) GetRegisteredServiceName() []string { + services := []string{} + for _, reg := range r.registrations { + services = append(services, reg.ServiceName) + } + return services +} + +func (r *registry) heartbeat(freq time.Duration) { + + for { + var wg sync.WaitGroup + for _, reg := range r.registrations { + wg.Add(1) + go func(reg Registration) { + defer wg.Done() + for attempt := 0; attempt < 3; attempt++ { + resp, err := http.Get(reg.HeartbeatURL) + if err == nil && resp.StatusCode == http.StatusOK { + log.Printf("heartbeat acked for %v\n", reg.ServiceName) + return + } + time.Sleep(1 * time.Second) + } + // heartbeat fail case + r.Remove(reg.ServiceURL) + log.Printf("heartbeat lost for %v\n", reg.ServiceName) + }(reg) + } + wg.Wait() + time.Sleep(freq) + } + +} + +var once sync.Once + +func SetupRegistryService() { + once.Do(func() { + go registryInst.heartbeat(3 * time.Second) + }) +} diff --git a/go/distributed/registry/server.go b/go/distributed/registry/server.go index 8224c67..680a0bf 100644 --- a/go/distributed/registry/server.go +++ b/go/distributed/registry/server.go @@ -30,7 +30,7 @@ func registryHandler(w http.ResponseWriter, r *http.Request) { return } registryInst.Add(reg) - log.Printf("Adding service %v with URL %s\n", reg.ServiceName, reg.GetURL()) + log.Printf("Adding service %v with URL %s\n", reg.ServiceName, reg.ServiceURL) case http.MethodDelete: msg, err := io.ReadAll(r.Body) defer r.Body.Close() diff --git a/go/distributed/service/service.go b/go/distributed/service/service.go index 7903c80..dd965aa 100644 --- a/go/distributed/service/service.go +++ b/go/distributed/service/service.go @@ -4,7 +4,6 @@ import ( "context" "distributed/registry" "fmt" - "log" "net/http" "time" ) @@ -42,10 +41,6 @@ func startService(ctx context.Context, r registry.Registration) context.Context for s != "exit" { fmt.Scanln(&s) } - // Unregister from registry service - if err := registry.UnregisterService(r.GetURL()); err != nil { - log.Println(err) - } srv.Shutdown(ctx) cancel() }() @@ -54,12 +49,12 @@ func startService(ctx context.Context, r registry.Registration) context.Context // Stop by server error err := srv.ListenAndServe() fmt.Println(err) - // Unregister from registry service - if err := registry.UnregisterService(r.GetURL()); err != nil { - log.Println(err) - } cancel() }() return ctx // Return context for propagation } + +func Stop(r registry.Registration) error { + return registry.UnregisterService(r.ServiceURL) +}