Skip to content

Commit

Permalink
feat: concurrently poll twitch and restreamer
Browse files Browse the repository at this point in the history
  • Loading branch information
davidramiro committed Oct 30, 2024
1 parent c898fcd commit e175dec
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 50 deletions.
28 changes: 17 additions & 11 deletions internal/adapter/restreamer/restreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,39 +73,45 @@ func fetchInfo(ctx context.Context, stream domain.StreamQuery, client *http.Clie
return restreamerInfo, nil
}

func (s *StreamInfoProvider) GetStreamInfos(ctx context.Context, streams []*domain.StreamQuery) ([]domain.StreamInfo, error) {
log.Info().Int("count", len(streams)).Msg("getting info for restreamer streams")
func (s *StreamInfoProvider) GetStreamInfos(ctx context.Context,
streams []*domain.StreamQuery,
wg *sync.WaitGroup,
infos chan<- []domain.StreamInfo,
errorCh chan<- error) {
defer wg.Done()

wg := sync.WaitGroup{}
log.Info().Int("count", len(streams)).Msg("getting info for restreamer streams")

wg.Add(len(streams))
wg2 := new(sync.WaitGroup)
wg2.Add(len(streams))

infos := make([]domain.StreamInfo, 0)
streamInfos := make([]domain.StreamInfo, 0)
infoCh := make(chan domain.StreamInfo, len(streams))
errCh := make(chan error, len(streams))

client := &http.Client{}

for _, stream := range streams {
go fetch(ctx, stream, client, infoCh, errCh, &wg)
go fetch(ctx, stream, client, infoCh, errCh, wg2)
}

wg.Wait()
wg2.Wait()
close(infoCh)
close(errCh)

for err := range errCh {
if err != nil {
log.Error().Err(err).Msg("error getting stream info")
return nil, err
log.Error().Err(err).Msg("error getting restreamer stream info")
errorCh <- err
return
}
}

for info := range infoCh {
infos = append(infos, info)
streamInfos = append(streamInfos, info)
}

return infos, nil
infos <- streamInfos
}

func fetch(ctx context.Context,
Expand Down
73 changes: 51 additions & 22 deletions internal/adapter/twitch/twitch.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package twitch

import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/rs/zerolog/log"
"github.com/spf13/viper"
"io"
"net/http"
"net/url"
"streamobserver/internal/core/domain"
"streamobserver/internal/core/port"
"strings"
"sync"
"time"
)

Expand All @@ -35,7 +38,7 @@ type twitchResponse struct {
GameName string `json:"game_name"`
Title string `json:"title"`
ThumbnailURL string `json:"thumbnail_url"`
} `json:"data"`
} `json:"data,omitempty"`
}

type authToken struct {
Expand All @@ -49,25 +52,33 @@ func formatTwitchPhotoUrl(url string) string {
return strings.Replace(url, widthPlaceholder, "1920", 1)
}

func (s *StreamInfoProvider) GetStreamInfos(ctx context.Context, streams []*domain.StreamQuery) ([]domain.StreamInfo, error) {
func (s *StreamInfoProvider) GetStreamInfos(ctx context.Context,
streams []*domain.StreamQuery,
wg *sync.WaitGroup,
infos chan<- []domain.StreamInfo,
errCh chan<- error) {
defer wg.Done()

log.Info().Int("count", len(streams)).Msg("getting info for twitch streams")

err := s.authenticate(ctx)
if err != nil {
log.Err(err).Msg("error authenticating with twitch")
return nil, err
errCh <- err
return
}

bearer := "Bearer " + s.token.AccessToken

base, err := url.Parse(twitchStreamsURL)
if err != nil {
return nil, err
errCh <- err
return
}

// Query params
params := url.Values{}
params.Add("type", "live")
params.Add("type", "all")
params.Add("first", "100")
for _, s := range streams {
params.Add("user_login", s.UserID)
Expand All @@ -77,7 +88,8 @@ func (s *StreamInfoProvider) GetStreamInfos(ctx context.Context, streams []*doma
req, err := http.NewRequestWithContext(ctx, http.MethodGet, base.String(), nil)
if err != nil {
log.Err(err).Msg("error building request for twitch")
return nil, err
errCh <- err
return
}

req.Header.Set("Authorization", bearer)
Expand All @@ -89,33 +101,50 @@ func (s *StreamInfoProvider) GetStreamInfos(ctx context.Context, streams []*doma
resp, err := client.Do(req)
if err != nil {
log.Error().Err(err).Msg("error making request to twitch")
return nil, err
errCh <- err
return
}
if resp.StatusCode != http.StatusOK {
log.Error().Int("StatusCode", resp.StatusCode).Interface("Response", resp).
Msg("No HTTP OK from Twitch Helix.")
return nil, err
errCh <- err
return
}

defer resp.Body.Close()

var response twitchResponse
responseBytes, err := io.ReadAll(resp.Body)
if err != nil {
log.Err(err).Msg("error getting bytes from twitch json response")
errCh <- err
return
}

buffer := new(bytes.Buffer)
err = json.Compact(buffer, responseBytes)
if err != nil {
log.Err(err).Msg("error compacting twitch json response")
errCh <- err
return
}

err = json.NewDecoder(resp.Body).Decode(&response)
var response twitchResponse
err = json.NewDecoder(buffer).Decode(&response)
if err != nil {
log.Err(err).Msg("error decoding response from twitch")
return nil, err
errCh <- err
return
}

infos := make([]domain.StreamInfo, len(streams))
streamInfos := make([]domain.StreamInfo, len(streams))

for i, s := range streams {
online := false
log.Debug().Str("id", s.UserID).Msg("checking if stream in response")
for _, data := range response.Data {
if data.Username == s.UserID {
if strings.EqualFold(data.Username, s.UserID) {
log.Debug().Msg("found, setting info")
infos[i] = domain.StreamInfo{
streamInfos[i] = domain.StreamInfo{
Query: s,
Username: data.Username,
Title: fmt.Sprintf("%s: %s", data.GameName, data.Title),
Expand All @@ -125,18 +154,18 @@ func (s *StreamInfoProvider) GetStreamInfos(ctx context.Context, streams []*doma
}
online = true
}
if !online {
log.Debug().Msg("not found, setting offline info")
infos[i] = domain.StreamInfo{
Query: s,
Username: s.UserID,
IsOnline: false,
}
}
if !online {
log.Debug().Msg("not found, setting offline info")
streamInfos[i] = domain.StreamInfo{
Query: s,
Username: s.UserID,
IsOnline: false,
}
}
}

return infos, nil
infos <- streamInfos
}

func (s *StreamInfoProvider) authenticate(ctx context.Context) error {
Expand Down
7 changes: 6 additions & 1 deletion internal/core/port/port.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@ package port
import (
"context"
"streamobserver/internal/core/domain"
"sync"
)

type StreamInfoProvider interface {
// GetStreamInfos takes an array of streams for a single stream service and returns metadata for those that are online.
GetStreamInfos(ctx context.Context, streams []*domain.StreamQuery) ([]domain.StreamInfo, error)
GetStreamInfos(ctx context.Context,
streams []*domain.StreamQuery,
wg *sync.WaitGroup,
stream chan<- []domain.StreamInfo,
err chan<- error)
// Kind returns the streaming service fetched by this provider
Kind() domain.StreamKind
}
Expand Down
34 changes: 24 additions & 10 deletions internal/core/service/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/spf13/viper"
"streamobserver/internal/core/domain"
"streamobserver/internal/core/port"
"sync"
)

type StreamService struct {
Expand Down Expand Up @@ -33,7 +34,6 @@ func (ss *StreamService) GetStreamInfos(ctx context.Context, streams []*domain.S
ctx, cancel := context.WithTimeout(ctx, viper.GetDuration("general.request_timeout"))
defer cancel()

infos := make([]domain.StreamInfo, 0)
twitchStreams := make([]*domain.StreamQuery, 0)
restreamerStreams := make([]*domain.StreamQuery, 0)

Expand All @@ -50,22 +50,36 @@ func (ss *StreamService) GetStreamInfos(ctx context.Context, streams []*domain.S
Int("restreamerStreamCount", len(restreamerStreams)).
Msg("getting stream infos")

wg := new(sync.WaitGroup)

infoCh := make(chan []domain.StreamInfo, 2)
errCh := make(chan error, 2)

if len(twitchStreams) > 0 {
twitchInfos, err := ss.twitchGetter.GetStreamInfos(ctx, twitchStreams)
if err != nil {
log.Err(err).Msg("unable to fetch twitch streams")
return nil, err
}
infos = append(infos, twitchInfos...)
wg.Add(1)
go ss.twitchGetter.GetStreamInfos(ctx, twitchStreams, wg, infoCh, errCh)
}

if len(restreamerStreams) > 0 {
restreamerInfos, err := ss.restreamerGetter.GetStreamInfos(ctx, restreamerStreams)
wg.Add(1)
go ss.restreamerGetter.GetStreamInfos(ctx, restreamerStreams, wg, infoCh, errCh)
}

wg.Wait()
close(errCh)
close(infoCh)

for err := range errCh {
if err != nil {
log.Err(err).Msg("unable to fetch restream streams")
log.Error().Err(err).Msg("error getting stream info")
return nil, err
}
infos = append(infos, restreamerInfos...)
}

infos := make([]domain.StreamInfo, 0)

for info := range infoCh {
infos = append(infos, info...)
}

return infos, nil
Expand Down
12 changes: 6 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@ import (
func main() {
log.Info().Str("author", "davidramiro").Msg("starting streamobserver")

if viper.GetBool("general.debug") {
zerolog.SetGlobalLevel(zerolog.DebugLevel)
} else {
zerolog.SetGlobalLevel(zerolog.InfoLevel)
}

log.Info().Msg("initializing telegram bot")
viper.AddConfigPath(".")
err := viper.ReadInConfig()
if err != nil {
log.Panic().Err(err).Msg("failed to read config")
}

if viper.GetBool("general.debug") {
zerolog.SetGlobalLevel(zerolog.DebugLevel)
} else {
zerolog.SetGlobalLevel(zerolog.InfoLevel)
}

token := viper.GetString("telegram.apikey")

b, err := bot.New(token)
Expand Down

0 comments on commit e175dec

Please sign in to comment.