Skip to content

Commit

Permalink
refactored 'share.tail' for new subordinate boot protocol (#789)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelquigley committed Nov 13, 2024
1 parent f4fa04e commit 85a2295
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 32 deletions.
4 changes: 3 additions & 1 deletion agent/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,14 @@ func (a *access) tail(data []byte) {
}
}
a.booted = true
} else {
a.bootErr = errors.New(line)
}
} else {
a.bootErr = errors.New(line)
}
} else {
logrus.Warn(line)
a.bootErr = errors.New(line)
}
} else {
a.bootErr = errors.New(line)
Expand Down
77 changes: 46 additions & 31 deletions agent/share.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,43 +54,58 @@ func (s *share) tail(data []byte) {
if line, err := s.readBuffer.ReadString('\n'); err == nil {
line = strings.Trim(line, "\n")
if !s.booted {
in := make(map[string]interface{})
if err := json.Unmarshal([]byte(line), &in); err == nil {
if v, found := in["token"]; found {
if str, ok := v.(string); ok {
s.token = str
}
}
if v, found := in["backend_mode"]; found {
if str, ok := v.(string); ok {
s.backendMode = sdk.BackendMode(str)
}
}
if v, found := in["share_mode"]; found {
if str, ok := v.(string); ok {
s.shareMode = sdk.ShareMode(str)
}
}
if v, found := in["frontend_endpoints"]; found {
if vArr, ok := v.([]interface{}); ok {
for _, v := range vArr {
if str, ok := v.(string); ok {
s.frontendEndpoints = append(s.frontendEndpoints, str)
if strings.HasPrefix(line, "{") {
in := make(map[string]interface{})
if err := json.Unmarshal([]byte(line), &in); err == nil {
if v, found := in["message"]; found {
if str, ok := v.(string); ok {
if str == "boot" {
if v, found := in["token"]; found {
if str, ok := v.(string); ok {
s.token = str
}
}
if v, found := in["backend_mode"]; found {
if str, ok := v.(string); ok {
s.backendMode = sdk.BackendMode(str)
}
}
if v, found := in["share_mode"]; found {
if str, ok := v.(string); ok {
s.shareMode = sdk.ShareMode(str)
}
}
if v, found := in["frontend_endpoints"]; found {
if vArr, ok := v.([]interface{}); ok {
for _, v := range vArr {
if str, ok := v.(string); ok {
s.frontendEndpoints = append(s.frontendEndpoints, str)
}
}
}
}
if v, found := in["target"]; found {
if str, ok := v.(string); ok {
s.target = str
}
}
s.booted = true
} else {
s.bootErr = errors.New(line)
}
} else {
s.bootErr = errors.New(line)
}
} else {
s.bootErr = errors.New(line)
}
} else {
s.bootErr = errors.New(line)
}
if v, found := in["target"]; found {
if str, ok := v.(string); ok {
s.target = str
}
}
s.booted = true
close(s.bootComplete)
} else {
s.bootErr = errors.New(line)
logrus.Warn(line)
}
close(s.bootComplete)

} else {
if strings.HasPrefix(line, "{") {
in := make(map[string]interface{})
Expand Down

0 comments on commit 85a2295

Please sign in to comment.