Skip to content

Commit

Permalink
Merge pull request #50 from dnstapir/cleanup-mqtt+config+bootstrap
Browse files Browse the repository at this point in the history
Cleanup mqtt+config+bootstrap
  • Loading branch information
johanix authored Jun 17, 2024
2 parents 68619c8 + 368da15 commit 47bd5ed
Show file tree
Hide file tree
Showing 13 changed files with 265 additions and 154 deletions.
135 changes: 75 additions & 60 deletions apihandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,13 @@ func APIbootstrap(conf *Config) func(w http.ResponseWriter, r *http.Request) {
log.Printf("API: received /bootstrap request (cmd: %s) from %s.\n", bp.Command, r.RemoteAddr)

switch bp.Command {
case "greylist-status":
me := conf.TemData.MqttEngine
stats := me.Stats()
resp.MsgCounters = stats.MsgCounters
resp.MsgTimeStamps = stats.MsgTimeStamps
log.Printf("API: greylist-status: msgs: %d last msg: %v", stats.MsgCounters[bp.ListName], stats.MsgTimeStamps[bp.ListName])

case "export-greylist":
td := conf.TemData
td.mu.RLock()
Expand Down Expand Up @@ -501,13 +508,13 @@ func APIdispatcher(conf *Config, done <-chan struct{}) {
walkRoutes(router, viper.GetString("apiserver.address"))
log.Println("")

address := viper.GetString("apiserver.address")
tlsaddress := viper.GetString("apiserver.tlsaddress")
addresses := viper.GetStringSlice("apiserver.addresses")
tlsaddresses := viper.GetStringSlice("apiserver.tlsaddresses")
certfile := viper.GetString("certs.tem.cert")
keyfile := viper.GetString("certs.tem.key")

bootstrapaddress := viper.GetString("bootstrapserver.address")
bootstraptlsaddress := viper.GetString("bootstrapserver.tlsaddress")
bootstrapaddresses := viper.GetStringSlice("bootstrapserver.addresses")
bootstraptlsaddresses := viper.GetStringSlice("bootstrapserver.tlsaddresses")
bootstraprouter := SetupBootstrapRouter(conf)

tlspossible := true
Expand All @@ -529,79 +536,87 @@ func APIdispatcher(conf *Config, done <-chan struct{}) {
TEMExiter("Error creating API server tls config: %v\n", err)
}

tlsServer := &http.Server{
Addr: tlsaddress,
Handler: router,
TLSConfig: tlsConfig,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}
bootstrapTlsServer := &http.Server{
Addr: bootstraptlsaddress,
Handler: bootstraprouter,
TLSConfig: tlsConfig,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}

var wg sync.WaitGroup

log.Println("*** API: Starting API dispatcher #1. Listening on", address)

if address != "" {
wg.Add(1)
go func(wg *sync.WaitGroup) {
apiServer := &http.Server{
Addr: address,
Handler: router,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}

log.Println("*** API: Starting API dispatcher #1. Listening on", address)
wg.Done()
TEMExiter(apiServer.ListenAndServe())
}(&wg)
}
// log.Println("*** API: Starting API dispatcher #1. Listening on", address)

if tlsaddress != "" {
if tlspossible {
if len(addresses) > 0 {
for idx, address := range addresses {
wg.Add(1)
go func(wg *sync.WaitGroup) {
log.Println("*** API: Starting TLS API dispatcher #1. Listening on", tlsaddress)
apiServer := &http.Server{
Addr: address,
Handler: router,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}

log.Printf("*** API: Starting API dispatcher #%d. Listening on %s", idx+1, address)
wg.Done()
TEMExiter(tlsServer.ListenAndServeTLS(certfile, keyfile))
TEMExiter(apiServer.ListenAndServe())
}(&wg)
} else {
log.Printf("*** API: APIdispatcher: Error: Cannot provide TLS service without cert and key files.\n")
}
}

if bootstrapaddress != "" {
wg.Add(1)
go func(wg *sync.WaitGroup) {
apiServer := &http.Server{
Addr: bootstrapaddress,
Handler: bootstraprouter,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
if len(tlsaddresses) > 0 {
if tlspossible {
for idx, tlsaddress := range tlsaddresses {
wg.Add(1)
go func(wg *sync.WaitGroup) {
tlsServer := &http.Server{
Addr: tlsaddress,
Handler: router,
TLSConfig: tlsConfig,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}
log.Printf("*** API: Starting TLS API dispatcher #%d. Listening on %s", idx+1, tlsaddress)
wg.Done()
TEMExiter(tlsServer.ListenAndServeTLS(certfile, keyfile))
}(&wg)
}
log.Println("*** API: Starting Bootstrap API dispatcher #1. Listening on", bootstrapaddress)
wg.Done()
TEMExiter(apiServer.ListenAndServe())
}(&wg)
} else {
log.Println("*** API: No bootstrap address specified")
} else {
log.Printf("*** API: APIdispatcher: Error: Cannot provide TLS service without cert and key files.\n")
}
}

if bootstraptlsaddress != "" {
if tlspossible {
if len(bootstrapaddresses) > 0 {
for idx, address := range bootstrapaddresses {
wg.Add(1)
go func(wg *sync.WaitGroup) {
log.Println("*** API: Starting Bootstrap TLS API dispatcher #1. Listening on", bootstraptlsaddress)
apiServer := &http.Server{
Addr: address,
Handler: bootstraprouter,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}
log.Printf("*** API: Starting Bootstrap API dispatcher #%d. Listening on %s", idx+1, address)
wg.Done()
TEMExiter(bootstrapTlsServer.ListenAndServeTLS(certfile, keyfile))
TEMExiter(apiServer.ListenAndServe())
}(&wg)
}
} else {
log.Println("*** API: No bootstrap address specified")
}

if len(bootstraptlsaddresses) > 0 {
if tlspossible {
for idx, address := range bootstraptlsaddresses {
wg.Add(1)
go func(wg *sync.WaitGroup) {
bootstrapTlsServer := &http.Server{
Addr: address,
Handler: bootstraprouter,
TLSConfig: tlsConfig,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}

log.Printf("*** API: Starting Bootstrap TLS API dispatcher #%d. Listening on %s", idx+1, address)
wg.Done()
TEMExiter(bootstrapTlsServer.ListenAndServeTLS(certfile, keyfile))
}(&wg)
}
} else {
log.Printf("*** API: APIdispatcher: Error: Cannot provide Bootstrap TLS service without cert and key files.\n")
}
Expand Down
51 changes: 47 additions & 4 deletions bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,20 @@ func (td *TemData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir
return nil, fmt.Errorf("error setting up TLS for the API client: %v", err)
}

bootstrapaddrs := viper.GetStringSlice("bootstrapserver.addresses")
tlsbootstrapaddrs := viper.GetStringSlice("bootstrapserver.tlsaddresses")
bootstrapaddrs = append(bootstrapaddrs, tlsbootstrapaddrs...)

// Iterate over the bootstrap servers
for _, server := range src.Bootstrap {
// Is this myself?
for _, bs := range bootstrapaddrs {
if bs == server {
td.Logger.Printf("MQTT bootstrap server %s is myself, skipping", server)
continue
}
}

api.BaseUrl = fmt.Sprintf(src.BootstrapUrl, server)

// Send an API ping command
Expand All @@ -57,6 +69,37 @@ func (td *TemData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir
td.Logger.Printf("MQTT bootstrap server %s uptime: %v. It has processed %d MQTT messages", server, uptime, 17)

status, buf, err := api.RequestNG(http.MethodPost, "/bootstrap", tapir.BootstrapPost{
Command: "greylist-status",
ListName: src.Name,
Encoding: "json", // XXX: This is our default, but we'll test other encodings later
}, true)

if err != nil {
fmt.Printf("Error from RequestNG: %v\n", err)
continue
}

if status != http.StatusOK {
td.Logger.Printf("HTTP Error: %s\n", buf)
continue
}

var br tapir.BootstrapResponse
err = json.Unmarshal(buf, &br)
if err != nil {
td.Logger.Printf("Error decoding greylist-status response from %s: %v. Giving up.\n", server, err)
continue
}
if br.Error {
td.Logger.Printf("Bootstrap server %s responded with error: %s (instead of greylist status)", server, br.ErrorMsg)
}
if len(br.Msg) != 0 {
td.Logger.Printf("Bootstrap server %s responded: %s", server, br.Msg)
}

td.Logger.Printf("MQTT bootstrap server %s uptime: %v. It has processed %d MQTT messages on the %s topic (last msg arrived at %s), ", server, uptime, br.MsgCounters["greylist"], src.Name, br.MsgTimeStamps["greylist"].Format(time.RFC3339))

status, buf, err = api.RequestNG(http.MethodPost, "/bootstrap", tapir.BootstrapPost{
Command: "export-greylist",
ListName: src.Name,
Encoding: "gob", // XXX: This is our default, but we'll test other encodings later
Expand All @@ -67,7 +110,7 @@ func (td *TemData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir
}

if status != http.StatusOK {
fmt.Printf("HTTP Error: %s\n", buf)
td.Logger.Printf("HTTP Error: %s\n", buf)
continue
}

Expand All @@ -94,13 +137,13 @@ func (td *TemData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir
}

if td.Debug {
fmt.Printf("%v\n", greylist)
fmt.Printf("Names present in greylist %s:\n", src.Name)
td.Logger.Printf("%v", greylist)
td.Logger.Printf("Names present in greylist %s:", src.Name)
out := []string{"Name|Time added|TTL|Tags"}
for _, n := range greylist.Names {
out = append(out, fmt.Sprintf("%s|%v|%v|%v", n.Name, n.TimeAdded.Format(tapir.TimeLayout), n.TTL, n.TagMask))
}
fmt.Printf("%s\n", columnize.SimpleFormat(out))
td.Logger.Printf("%s", columnize.SimpleFormat(out))
}

// Successfully received and decoded bootstrap data
Expand Down
82 changes: 53 additions & 29 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ import (
)

type Config struct {
Service ServiceConf
Server ServerConf
Apiserver ApiserverConf
Dnsengine DnsengineConf
Sources map[string]SourceConf
Policy PolicyConf
Log struct {
Services ServicesConf
ApiServer ApiserverConf
DnsEngine DnsengineConf
BootstrapServer BootstrapServerConf
Sources map[string]SourceConf
Policy PolicyConf
Log struct {
File string `validate:"required"`
Verbose *bool `validate:"required"`
Debug *bool `validate:"required"`
Expand All @@ -36,12 +36,40 @@ type Config struct {
BootTime time.Time
}

type ServiceConf struct {
Name string `validate:"required"`
// Filter string `validate:"required"`
Reset_Soa_Serial *bool `validate:"required"`
Debug *bool
Verbose *bool
type ServicesConf struct {
Rpz struct {
ZoneName string `validate:"required"`
Primary string `validate:"required"` // XXX: must be an address that DnsEngine listens to
SerialCache string `validate:"required"`
}

Reaper struct {
Interval int `validate:"required"`
}
}

type ApiserverConf struct {
Active *bool `validate:"required"`
Name string `validate:"required"`
Key string `validate:"required"`
Addresses []string `validate:"required"`
TlsAddresses []string `validate:"required"`
}

type DnsengineConf struct {
Active *bool `validate:"required"`
Name string `validate:"required"`
Addresses []string `validate:"required"`
Logfile string `validate:"required"`
// Logger *log.Logger
}

type BootstrapServerConf struct {
Active *bool `validate:"required"`
Name string `validate:"required"`
Addresses []string `validate:"required"`
TlsAddresses []string `validate:"required"`
Logfile string
}

type ServerConf struct {
Expand Down Expand Up @@ -96,16 +124,6 @@ type GreylistConf struct {
}
}

type ApiserverConf struct {
Address string `validate:"required"`
Key string `validate:"required"`
}
type DnsengineConf struct {
Address string `validate:"required"`
Logfile string `validate:"required"`
// Logger *log.Logger
}

type InternalConf struct {
// RefreshZoneCh chan RpzRefresher
// RpzCmdCh chan RpzCmdData
Expand All @@ -128,9 +146,11 @@ func ValidateConfig(v *viper.Viper, cfgfile string) error {
var configsections = make(map[string]interface{}, 5)

configsections["log"] = config.Log
configsections["service"] = config.Service
configsections["server"] = config.Server
configsections["apiserver"] = config.Apiserver
configsections["services"] = config.Services
// configsections["server"] = config.Server
configsections["apiserver"] = config.ApiServer
configsections["dnsengine"] = config.DnsEngine
configsections["bootstrapserver"] = config.BootstrapServer
configsections["policy"] = config.Policy

// Cannot validate a map[string]foobar, must validate the individual foobars:
Expand All @@ -149,11 +169,15 @@ func ValidateBySection(config *Config, configsections map[string]interface{}, cf
validate := validator.New()

for k, data := range configsections {
log.Printf("%s: Validating config for %s section\n", config.Service.Name, k)
switch data := data.(type) {
case *SourceConf:
log.Printf("%s: Validating config for source %s", data.Name, k)
case *DnsengineConf, *ApiserverConf, *BootstrapServerConf:
// log.Printf("%s: Validating config for service %s", data.Name, k)
}
if err := validate.Struct(data); err != nil {
log.Printf("ValidateBySection: data that caused validation to fail:\n%v\n", data)
TEMExiter("ValidateBySection: Config %s, section %s: missing required attributes:\n%v\n",
cfgfile, k, err)
TEMExiter("ValidateBySection: Config %s, section %s: missing required attributes:\n%v\n", cfgfile, k, err)
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func SetupLogging(conf *Config) {
conf.Loggers.Dnsengine = log.Default()
}

logfile = viper.GetString("mqtt.logfile")
logfile = viper.GetString("tapir.mqtt.logfile")
if logfile != "" {
logfile = filepath.Clean(logfile)
f, err := os.OpenFile(logfile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644) // #nosec G302
Expand Down
Loading

0 comments on commit 47bd5ed

Please sign in to comment.