Skip to content
This repository has been archived by the owner on May 24, 2024. It is now read-only.

Commit

Permalink
use caching v2 (#23)
Browse files Browse the repository at this point in the history
Signed-off-by: Ayman <[email protected]>

Signed-off-by: Ayman <[email protected]>
Co-authored-by: Ayman <[email protected]>
  • Loading branch information
khalifapro and enkhalifapro authored Jan 26, 2023
1 parent f10be4d commit e34611a
Show file tree
Hide file tree
Showing 3 changed files with 228 additions and 60 deletions.
174 changes: 127 additions & 47 deletions cmd/confluence/confluence.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package main

import (
"bytes"
"context"
"crypto/sha256"
"encoding/base64"
"encoding/csv"
"encoding/json"
"flag"
"fmt"
Expand Down Expand Up @@ -57,14 +59,16 @@ const (
var (
gMaxUpdatedAt time.Time
gMaxUpdatedAtMtx = &sync.Mutex{}
cachedSpaces = make(map[string]EntityCache)
spacesCacheFile = "spaces-cache.csv"
// ConfluenceDataSource - constant
//ConfluenceDataSource = &models.DataSource{Name: "Confluence", Slug: "confluence", Model: "documentation"}
//gConfluenceMetaData = &models.MetaData{BackendName: "confluence", BackendVersion: ConfluenceBackendVersion}
)

// Publisher - publish data to S3
type Publisher interface {
PushEvents(action, source, eventType, subEventType, env string, data []interface{}) error
PushEvents(action, source, eventType, subEventType, env string, data []interface{}) (string, error)
}

// DSConfluence - DS implementation for confluence - does nothing at all, just presents a skeleton code
Expand Down Expand Up @@ -344,8 +348,8 @@ func (j *DSConfluence) GetHistoricalContents(ctx *shared.Ctx, content map[string
headers,
nil,
nil,
map[[2]int]struct{}{{200, 200}: {}}, // JSON statuses: 200
nil, // Error statuses
map[[2]int]struct{}{{200, 200}: {}}, // JSON statuses: 200
nil, // Error statuses
map[[2]int]struct{}{{200, 200}: {}, {500, 500}: {}, {404, 404}: {}}, // OK statuses: 200
map[[2]int]struct{}{{200, 200}: {}}, // Cache statuses: 200
false, // retry
Expand Down Expand Up @@ -574,6 +578,7 @@ func (j *DSConfluence) Sync(ctx *shared.Ctx) (err error) {
if ctx.DateTo != nil {
j.log.WithFields(logrus.Fields{"operation": "Sync"}).Infof("%s fetching till %v", j.URL, ctx.DateTo)
}
j.getCachedContent()
// NOTE: Non-generic starts here
var (
sDateFrom string
Expand Down Expand Up @@ -1291,8 +1296,7 @@ func (j *DSConfluence) GetModelData(ctx *shared.Ctx, docs []interface{}) (data m
SourceTimestamp: updatedOn,
Children: kids,
}
cacheID := fmt.Sprintf("content-%s", confluenceContentID)
isCreated, err := j.cacheProvider.IsKeyCreated(j.endpoint, cacheID)
isCreated := isKeyCreated(confluenceContentID)
if err != nil {
j.log.WithFields(logrus.Fields{"operation": "GetModelData"}).Errorf("error getting cache for endpoint %s. error: %+v", j.endpoint, err)
return data, err
Expand Down Expand Up @@ -1338,30 +1342,26 @@ func (j *DSConfluence) ConfluenceEnrichItems(ctx *shared.Ctx, thrN int, items []
contentsStr := "contents"
envStr := os.Getenv("STAGE")
// Push the event
d := make([]map[string]interface{}, 0)
for k, v := range data {
switch k {
case "created":
ev, _ := v[0].(insightsConf.ContentCreatedEvent)
err = j.Publisher.PushEvents(ev.Event(), insightsStr, ConfluenceDataSource, contentsStr, envStr, v)
cacheData, err := j.cachedCreatedContent(v)
path, err := j.Publisher.PushEvents(ev.Event(), insightsStr, ConfluenceDataSource, contentsStr, envStr, v)
err = j.cachedCreatedContent(v, path)
if err != nil {
j.log.WithFields(logrus.Fields{"operation": "ConfluenceEnrichItems"}).Errorf("cachedCreatedContent error: %+v", err)
return
}
d = append(d, cacheData...)
case "updated":
updates, cacheData, err := j.preventUpdateDuplication(v)
updates, err := j.preventUpdateDuplication(v)
if err != nil {
j.log.WithFields(logrus.Fields{"operation": "ConfluenceEnrichItems"}).Errorf("preventUpdateDuplication error: %+v", err)
return
}
if len(cacheData) > 0 {
d = append(d, cacheData...)
}

if len(updates) > 0 {
ev, _ := updates[0].(insightsConf.ContentUpdatedEvent)
err = j.Publisher.PushEvents(ev.Event(), insightsStr, ConfluenceDataSource, contentsStr, envStr, updates)
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, ConfluenceDataSource, contentsStr, envStr, updates)
}
default:
err = fmt.Errorf("unknown confluence event type '%s'", k)
Expand All @@ -1370,11 +1370,8 @@ func (j *DSConfluence) ConfluenceEnrichItems(ctx *shared.Ctx, thrN int, items []
break
}
}
if len(d) > 0 {
err = j.cacheProvider.Create(j.endpoint, d)
if err != nil {
j.log.WithFields(logrus.Fields{"operation": "ConfluenceEnrichItems"}).Errorf("error creating cache for endpoint %s. Error: %+v", j.endpoint, err)
}
if err = j.createCacheFile([]EntityCache{}, ""); err != nil {
j.log.WithFields(logrus.Fields{"operation": "ConfluenceEnrichItems"}).Errorf("error creating cache for endpoint %s. Error: %+v", j.endpoint, err)
}
} else {
jsonBytes, err = jsoniter.Marshal(data)
Expand Down Expand Up @@ -1567,11 +1564,9 @@ func (j *DSConfluence) AddCacheProvider() {
j.endpoint = strings.ReplaceAll(strings.TrimPrefix(strings.TrimPrefix(j.URL, "https://"), "http://"), "/", "-")
}

func (j *DSConfluence) cachedCreatedContent(v []interface{}) ([]map[string]interface{}, error) {
cacheData := make([]map[string]interface{}, 0)
func (j *DSConfluence) cachedCreatedContent(v []interface{}, path string) error {
for _, val := range v {
content := val.(insightsConf.ContentCreatedEvent).Payload
id := fmt.Sprintf("%s-%s", "content", val.(insightsConf.ContentCreatedEvent).Payload.ID)
c := insightsConf.Content{
ID: content.ID,
EndpointID: content.EndpointID,
Expand All @@ -1587,22 +1582,24 @@ func (j *DSConfluence) cachedCreatedContent(v []interface{}) ([]map[string]inter
}
b, err := json.Marshal(c)
if err != nil {
return cacheData, err
return err
}
contentHash := fmt.Sprintf("%x", sha256.Sum256(b))
cacheData = append(cacheData, map[string]interface{}{
"id": id,
"data": map[string]interface{}{
contentHashField: contentHash,
},
})
tStamp := content.SyncTimestamp.Unix()
cachedSpaces[content.ID] = EntityCache{
Timestamp: fmt.Sprintf("%v", tStamp),
EntityID: content.ID,
SourceEntityID: content.ContentID,
FileLocation: path,
Hash: contentHash,
Orphaned: false,
}
}
return cacheData, nil
return nil
}

func (j *DSConfluence) preventUpdateDuplication(v []interface{}) ([]interface{}, []map[string]interface{}, error) {
func (j *DSConfluence) preventUpdateDuplication(v []interface{}) ([]interface{}, error) {
updatedVals := make([]interface{}, 0, len(v))
cacheData := make([]map[string]interface{}, 0)
for _, val := range v {
content := val.(insightsConf.ContentUpdatedEvent).Payload
c := insightsConf.Content{
Expand All @@ -1620,25 +1617,108 @@ func (j *DSConfluence) preventUpdateDuplication(v []interface{}) ([]interface{},
}
b, err := json.Marshal(c)
if err != nil {
return updatedVals, cacheData, nil
return updatedVals, nil
}
contentHash := fmt.Sprintf("%x", sha256.Sum256(b))
cacheID := fmt.Sprintf("content-%s", content.ID)
byt, err := j.cacheProvider.GetFileByKey(j.endpoint, cacheID)
if err != nil {
return updatedVals, cacheData, nil
cacheCon, ok := cachedSpaces[content.ID]
if !ok {
continue
}
cachedHash := make(map[string]interface{})
err = json.Unmarshal(byt, &cachedHash)
if contentHash != cachedHash["contentHash"] {
if contentHash != cacheCon.Hash {
updatedVals = append(updatedVals, val)
cacheData = append(cacheData, map[string]interface{}{
"id": cacheID,
"data": map[string]interface{}{
contentHashField: contentHash,
},
})
cacheCon.Hash = contentHash
cachedSpaces[content.ID] = cacheCon
}
}
return updatedVals, cacheData, nil
return updatedVals, nil
}

func (j *DSConfluence) getCachedContent() {
comB, err := j.cacheProvider.GetFileByKey(j.endpoint, spacesCacheFile)
if err != nil {
return
}
reader := csv.NewReader(bytes.NewBuffer(comB))
records, err := reader.ReadAll()
if err != nil {
return
}
for i, record := range records {
if i == 0 {
continue
}
orphaned, err := strconv.ParseBool(record[5])
if err != nil {
orphaned = false
}

cachedSpaces[record[1]] = EntityCache{
Timestamp: record[0],
EntityID: record[1],
SourceEntityID: record[2],
FileLocation: record[3],
Hash: record[4],
Orphaned: orphaned,
}
}
}

func (j *DSConfluence) createCacheFile(cache []EntityCache, path string) error {
for _, comm := range cache {
comm.FileLocation = path
cachedSpaces[comm.EntityID] = comm
}
records := [][]string{
{"timestamp", "entity_id", "source_entity_id", "file_location", "hash", "orphaned"},
}
for _, c := range cachedSpaces {
records = append(records, []string{c.Timestamp, c.EntityID, c.SourceEntityID, c.FileLocation, c.Hash, strconv.FormatBool(c.Orphaned)})
}

csvFile, err := os.Create(spacesCacheFile)
if err != nil {
return err
}

w := csv.NewWriter(csvFile)
err = w.WriteAll(records)
if err != nil {
return err
}
err = csvFile.Close()
if err != nil {
return err
}
file, err := os.ReadFile(spacesCacheFile)
if err != nil {
return err
}
err = os.Remove(spacesCacheFile)
if err != nil {
return err
}
err = j.cacheProvider.UpdateFileByKey(j.endpoint, spacesCacheFile, file)
if err != nil {
return err
}

return nil
}

func isKeyCreated(id string) bool {
_, ok := cachedSpaces[id]
if ok {
return true
}
return false
}

// EntityCache single commit cache schema
type EntityCache struct {
Timestamp string `json:"timestamp"`
EntityID string `json:"entity_id"`
SourceEntityID string `json:"source_entity_id"`
FileLocation string `json:"file_location"`
Hash string `json:"hash"`
Orphaned bool `json:"orphaned"`
}
24 changes: 19 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@ go 1.17

require (
github.com/LF-Engineering/insights-datasource-shared v1.5.20
github.com/LF-Engineering/lfx-event-schema v0.1.29
github.com/LF-Engineering/lfx-event-schema v0.1.35
github.com/aws/aws-sdk-go v1.43.3
github.com/json-iterator/go v1.1.12
github.com/sirupsen/logrus v1.8.1
gopkg.in/DataDog/dd-trace-go.v1 v1.43.1
)

require (
github.com/DataDog/datadog-agent/pkg/obfuscate v0.0.0-20211129110424-6491aa3bf583 // indirect
github.com/DataDog/datadog-go v4.8.2+incompatible // indirect
github.com/DataDog/datadog-go/v5 v5.0.2 // indirect
github.com/DataDog/sketches-go v1.2.1 // indirect
github.com/Microsoft/go-winio v0.5.1 // indirect
github.com/avast/retry-go v3.0.0+incompatible // indirect
github.com/aws/aws-sdk-go-v2 v1.11.2 // indirect
github.com/aws/aws-sdk-go-v2/config v1.6.0 // indirect
Expand All @@ -25,18 +30,27 @@ require (
github.com/aws/aws-sdk-go-v2/service/sso v1.3.2 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.6.1 // indirect
github.com/aws/smithy-go v1.11.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/dgraph-io/ristretto v0.1.0 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/elastic/go-elasticsearch/v8 v8.0.0-20201229214741-2366c2514674 // indirect
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/philhofer/fwd v1.1.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/stretchr/testify v1.7.1 // indirect
github.com/tinylib/msgp v1.1.2 // indirect
go4.org/intern v0.0.0-20211027215823-ae77deb06f29 // indirect
go4.org/unsafe/assume-no-moving-gc v0.0.0-20220617031537-928513b29760 // indirect
golang.org/x/crypto v0.0.0-20220214200702-86341886e292 // indirect
golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
google.golang.org/protobuf v1.28.0 // indirect
inet.af/netaddr v0.0.0-20220617031823-097006376321 // indirect
)
Loading

0 comments on commit e34611a

Please sign in to comment.