Skip to content

Commit

Permalink
Add support for controlling eebus-go via JSON-RPC from another process (
Browse files Browse the repository at this point in the history
#128)

This PR provides an implementation of a [JSON-RPC 2](https://www.jsonrpc.org/specification)
server that proxies calls into the eebus-go stack via TCP for control by other processes.

The current state of this PR is a draft, all basic (and some advanced) functionality should work, but not all functions and edge cases are handled, and those that are handled may not be implemented optimally. Consider this a request for comments from the community on the current state of the PR and how it should proceed.

The RPC server provides access to eebus-go methods via reflection and static/dynamic method scoping. Static method scoping is used to proxy methods for the eebus-go/service.Service struct by prefixing the method with "service/", methods assigned to the RPC server itself are scoped under "remote/", the service.LocalDevice methods are available under "localdevice/", and all registered use cases are available via the registered usecaseId (e.g. "eg-lpc/"). Dynamic method proxying is available currently for DeviceRemoteInterface and EntityRemoteInterface types by prefixing the wanted method with "Call/" and passing an AddressDeviceType or EntityAddressType as the first parameter.

Use case specific notifications from the eebus-go stack are passed to the RPC client via a json-rpc notification where the method field is set to the api.EventType and the parameters is a JSON object containing the ski, the device address and the device entity.

Notifications that the eebus-go stack normally provides the ServiceHandlerInterface are proxied to the RPC client with the method set the name of the called method in the ServiceHandlerInterface prefixed with the string "remote/". The parameters for these notifications are notification specific.

There are still several open points that I have either not yet implemented or that I think should be discussed by the community before being addressed:

- not all types returned by functions in the eebus-go stack can be easily serialized/deserialized via JSON. I have added automatic   serialization/deserialization for the DeviceRemoteInterface and EntityRemoteInterface, but all other types that are Interfaces, contain Interfaces, or contain private members cannot currently be serialized or deserialized, this notably includes the DeviceLocalInterface, and the RemoteEntityScenarios struct. Options to increase the number of serializable/deserializable types include implementing the Marshaler/Unmarshaler interface for simple types (such as RemoteEntityScenarios) or adjusting method return types to make them serializable/deserializable.

- it's impossible to pass a callback via JSON-RPC so all functions that can take a resultCB must currently take nil (json null) in that position. It may be nice to detect functions with such a parameter and provide a means for callers to register interest in the result of such an operation such that the RPC server creates a resultCB type that forwards the ResultDataType as a notification to the controlling client. To accomplish this, it would be necessary to adjust the type of the resultCB function to also take the MsgCounter as a second argument as the RPC server cannot create a function which knows the MsgCounter prior to the actual function in question being called.


I've already received some feedback to a few points that I will collect here:

- currently all functions that return (value, error) will drop the error from the response when it is nil, and return a JSON-RPC error message otherwise. This handling does not occur for all functions with more than 2 return values. It may be more consistent to adjust this, or drop custom handling of errors entirely. Errors would then be returned as the final argument in their string representation.

- currently the EEBus-Service is automatically started as soon as the JSON-RPC listener starts, it might be better if the service is manually started via a JSON-RPC method call instead

- the RPC server currently does not limit the number of controlling connections, nor does it bind the lifetime of the EEBus service to the lifetime of the TCP connection. We may want to limit the number of simultaneous connections to 1, as well as terminate the EEBus service on connection loss to prevent e.g. limits from staying valid even though the controlling connection died
  • Loading branch information
sthelen-enqs authored Oct 28, 2024
1 parent 459291b commit a37178d
Show file tree
Hide file tree
Showing 8 changed files with 874 additions and 0 deletions.
60 changes: 60 additions & 0 deletions cmd/remote/framer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package main

import (
"bufio"
"context"
"fmt"
"io"

"golang.org/x/exp/jsonrpc2"
)

type NewlineFramer struct{}
type newlineReader struct{ in *bufio.Reader }
type newlineWriter struct{ out io.Writer }

func (NewlineFramer) Reader(rw io.Reader) jsonrpc2.Reader {
return &newlineReader{in: bufio.NewReader(rw)}
}

func (f NewlineFramer) Writer(rw io.Writer) jsonrpc2.Writer {
return &newlineWriter{out: rw}
}

func (r *newlineReader) Read(ctx context.Context) (jsonrpc2.Message, int64, error) {
select {
case <-ctx.Done():
return nil, 0, ctx.Err()
default:
}
var total int64

// read a line
line, err := r.in.ReadBytes('\n')
total += int64(len(line))
if err != nil {
return nil, total, fmt.Errorf("failed reading line: %w", err)
}

msg, err := jsonrpc2.DecodeMessage(line[:total-1])
return msg, total, err
}

func (w *newlineWriter) Write(ctx context.Context, msg jsonrpc2.Message) (int64, error) {
select {
case <-ctx.Done():
return 0, ctx.Err()
default:
}
data, err := jsonrpc2.EncodeMessage(msg)
if err != nil {
return 0, fmt.Errorf("marshaling message: %v", err)

Check failure on line 51 in cmd/remote/framer.go

View workflow job for this annotation

GitHub Actions / Build

non-wrapping format verb for fmt.Errorf. Use `%w` to format errors (errorlint)
}
n, err := w.out.Write(data)
total := int64(n)
if err == nil {
n, err = w.out.Write([]byte("\n"))
total += int64(n)
}
return total, err
}
155 changes: 155 additions & 0 deletions cmd/remote/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package main

import (
"context"
"crypto/tls"
"flag"
"log"
"net"
"os"
"os/signal"
"strconv"
"syscall"
"time"

"github.com/enbility/eebus-go/api"
"github.com/enbility/eebus-go/usecases/cem/evcc"
"github.com/enbility/eebus-go/usecases/cem/evsecc"
eglpc "github.com/enbility/eebus-go/usecases/eg/lpc"
"github.com/enbility/eebus-go/usecases/ma/mpc"
shipapi "github.com/enbility/ship-go/api"
"github.com/enbility/ship-go/cert"
spineapi "github.com/enbility/spine-go/api"
"github.com/enbility/spine-go/model"
)

type eebusConfiguration struct {
vendorCode string
deviceBrand string
deviceModel string
serialNumber string
}

func loadCertificate(config eebusConfiguration, crtPath, keyPath string) tls.Certificate {
certificate, err := tls.LoadX509KeyPair(crtPath, keyPath)
if err != nil {
certificate, err = cert.CreateCertificate(config.vendorCode, config.deviceModel, "DE", config.serialNumber)
if err != nil {
log.Fatal(err)
}

if err = WriteKey(certificate, keyPath); err != nil {
log.Fatal(err)
}
if err = WriteCertificate(certificate, crtPath); err != nil {
log.Fatal(err)
}
}

return certificate
}

func main() {
config := eebusConfiguration{}

iface := flag.String("iface", "",
"Optional network interface the EEBUS connection should be limited to")
flag.StringVar(&config.vendorCode, "vendor", "", "EEBus vendor code")
flag.StringVar(&config.deviceBrand, "brand", "", "EEBus device brand")
flag.StringVar(&config.deviceModel, "model", "", "EEBus device model")
flag.StringVar(&config.serialNumber, "serial", "", "EEBus device serial")

flag.Parse()

if config.serialNumber == "" {
serialNumber, err := os.Hostname()
if err != nil {
log.Fatal(err)
}
config.serialNumber = serialNumber
}

if config.vendorCode == "" || config.deviceBrand == "" || config.deviceModel == "" {
flag.Usage()
return
}

certificate := loadCertificate(config, "cert.pem", "key.pem")

configuration, err := api.NewConfiguration(

Check failure on line 79 in cmd/remote/main.go

View workflow job for this annotation

GitHub Actions / Build

ineffectual assignment to err (ineffassign)
config.vendorCode, config.deviceBrand, config.deviceModel, config.serialNumber,
[]shipapi.DeviceCategoryType{
shipapi.DeviceCategoryTypeEnergyManagementSystem,
},
model.DeviceTypeTypeEnergyManagementSystem,
[]model.EntityTypeType{
model.EntityTypeTypeGridGuard,
model.EntityTypeTypeCEM,
},
23292, certificate, time.Second*4)
if *iface != "" {
configuration.SetInterfaces([]string{*iface})
log.Printf("waiting until %v is up", iface)
for {
ifi, err := net.InterfaceByName(*iface)
if err != nil {
log.Fatal(err)
}

// wait until interface is up and available for multicast
flags := net.FlagUp | net.FlagMulticast
if (ifi.Flags & flags) == flags {
break
}
time.Sleep(1 * time.Second)
}
log.Printf("interface online, continuing")
}

r, err := NewRemote(configuration)
if err != nil {
log.Fatal(err)
}

err = r.RegisterUseCase(model.EntityTypeTypeCEM, "EG-LPC", func(localEntity spineapi.EntityLocalInterface, eventCB api.EntityEventCallback) api.UseCaseInterface {
return eglpc.NewLPC(localEntity, eventCB)
})
if err != nil {
log.Fatal(err)
}

err = r.RegisterUseCase(model.EntityTypeTypeCEM, "MA-MPC", func(localEntity spineapi.EntityLocalInterface, eventCB api.EntityEventCallback) api.UseCaseInterface {
return mpc.NewMPC(localEntity, eventCB)
})
if err != nil {
log.Fatal(err)
}

err = r.RegisterUseCase(model.EntityTypeTypeCEM, "CEM-EVCC", func(localEntity spineapi.EntityLocalInterface, eventCB api.EntityEventCallback) api.UseCaseInterface {
return evcc.NewEVCC(r.service, localEntity, eventCB)
})
if err != nil {
log.Fatal(err)
}

err = r.RegisterUseCase(model.EntityTypeTypeCEM, "CEM-EVSECC", func(localEntity spineapi.EntityLocalInterface, eventCB api.EntityEventCallback) api.UseCaseInterface {
return evsecc.NewEVSECC(localEntity, eventCB)
})
if err != nil {
log.Fatal(err)
}

ctx, cancelCtx := context.WithCancel(context.Background())
if err = r.Listen(ctx, "tcp", net.JoinHostPort("::", strconv.Itoa(3393))); err != nil {
log.Fatal(err)
}
log.Print("Started")

// Clean exit to make sure mdns shutdown is invoked
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
<-sig
// User exit

cancelCtx()
}
Loading

0 comments on commit a37178d

Please sign in to comment.