Skip to content

Commit

Permalink
Merge pull request #251 from CDCgov/config-retrieval
Browse files Browse the repository at this point in the history
Config retrieval
  • Loading branch information
jcrichlake authored Dec 13, 2024
2 parents c1cd0a1 + 6d3252f commit 74dee41
Show file tree
Hide file tree
Showing 19 changed files with 177 additions and 104 deletions.
1 change: 1 addition & 0 deletions codeCoverageFilter.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ EXCLUDE_FILES=(
github.com/CDCgov/reportstream-sftp-ingestion/mocks/blob_handler.go
github.com/CDCgov/reportstream-sftp-ingestion/utils/constants.go
github.com/CDCgov/reportstream-sftp-ingestion/sftp/pkg_sftp.go
github.com/CDCgov/reportstream-sftp-ingestion/config/config_struct.go
)

for exclusion in "${EXCLUDE_FILES[@]}"
Expand Down
3 changes: 3 additions & 0 deletions config/ca-phl.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"isActive": true
}
3 changes: 3 additions & 0 deletions config/flexion.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"isActive": true
}
8 changes: 0 additions & 8 deletions config/local.json

This file was deleted.

3 changes: 2 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ services:
az storage container create -n config
az storage container create -n sftp
az storage blob upload --overwrite --account-name devstoreaccount1 --container-name sftp --name import/order_message.hl7 --file mock_data/order_message.hl7
az storage blob upload --overwrite --account-name devstoreaccount1 --container-name config --name config.json --file config/local.json
az storage blob upload --overwrite --account-name devstoreaccount1 --container-name config --name flexion.json --file config/flexion.json
az storage blob upload --overwrite --account-name devstoreaccount1 --container-name config --name ca-phl.json --file config/ca-phl.json
az storage queue create -n message-import-queue
az storage queue create -n message-import-dead-letter-queue
az storage queue create -n polling-trigger-queue
Expand Down
71 changes: 71 additions & 0 deletions src/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package config

import (
"encoding/json"
"errors"
"github.com/CDCgov/reportstream-sftp-ingestion/utils"
"log/slog"
"slices"
)

type PartnerSettings struct {
DisplayName string `json:"displayName"` // full name if we need pretty names
IsActive bool `json:"isActive"`
IsExternalSftpConnection bool `json:"isExternalSftpConnection"`
HasZipPassword bool `json:"hasZipPassword"`
DefaultEncoding string `json:"defaultEncoding"`
}

/*
TODO list as of Dec 10:
Current PR:
- Add tests for NewConfig?
- Set up config files for ca-phl and flexion in all envs (need to change what's in local too - it currently only has one value)
- Do we want to add some kind of logging indicating what config got loaded? Maybe just temporarily for testing purposes?
- ADR for not-crashing if one config fails
- ADR for config in general
Future PR:
- Set up another function trigger/CRON for Flexion
- What happens if you try to retrieve a map index that doesn't exist? Need to check for errors or nils or something everywhere we get config
- In polling message handler, use queue message to:
- decide whether to do retrieval ('no' for flexion probs)
- build key names for retrieving secrets
- build file paths for saving files (both zips and hl7s)
- add config to tests
- In import message handler:
- parse file path to get partner ID
- use partner ID to build key names for retrieving secrets to call RS
- add config to tests
- See if we need to do add'l TF to set up Flexion?
- probably at least a cron expression and RS config. It would be nice to have an external Flexion SFTP site to hit for testing
- Do we want to start making TF dynamic at this point or wait for add'l partners? I think maybe wait for 1-2 more partners?
*/

func populatePartnerSettings(input []byte, partnerId string) (PartnerSettings, error) {

var partnerSettings PartnerSettings
err := json.Unmarshal(input, &partnerSettings)

if err != nil {
slog.Error("Unable unmarshall to partner settings", slog.Any(utils.ErrorKey, err))
return PartnerSettings{}, err
}

err = validateDefaultEncoding(partnerSettings.DefaultEncoding)
if err != nil {
slog.Error("Invalid encoding found", slog.Any(utils.ErrorKey, err), slog.String("Partner ID", partnerId), slog.String("Encoding", partnerSettings.DefaultEncoding))
return PartnerSettings{}, err
}

// TODO - any other validation?

return partnerSettings, nil
}

func validateDefaultEncoding(input string) error {
if slices.Contains(allowedEncodingList, input) {
return nil
}
return errors.New("Invalid encoding found: " + input)
}
65 changes: 65 additions & 0 deletions src/config/config_struct.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package config

import (
"github.com/CDCgov/reportstream-sftp-ingestion/storage"
"github.com/CDCgov/reportstream-sftp-ingestion/utils"
"log/slog"
"time"
)

type Config struct {
// PartnerId is a unique name to identify a partner. It's put in queue message from polling function and used in blob paths
PartnerId string
lastRetrieved time.Time
partnerSettings PartnerSettings
}

// TODO confirm if these should stay here in config or move to constants
var allowedEncodingList = []string{"ISO-8859-1", "UTF-8"}
var KnownPartnerIds = []string{utils.CA_PHL, utils.FLEXION}
var Configs = make(map[string]*Config)

func init() {
for _, partnerId := range KnownPartnerIds {
partnerConfig, err := NewConfig(partnerId)
if err != nil {
// TODO - add an ADR talking about this. We're not crashing if a single config doesn't load in case only one partner is impacted
slog.Error("Unable to load or parse config", slog.Any(utils.ErrorKey, err), slog.String("partner", partnerId))
}

Configs[partnerId] = partnerConfig
slog.Info("config found", slog.String("Id", partnerId))

}
}

func NewConfig(partnerId string) (*Config, error) {
// Create blob client
handler, err := storage.NewAzureBlobHandler()
if err != nil {
slog.Error("Failed to create Azure Blob handler for config retrieval", slog.Any(utils.ErrorKey, err), slog.String("partnerId", partnerId))
return nil, err
}

// Retrieve settings file from Azure
fileContents, err := handler.FetchFile("config", partnerId+".json")
if err != nil {
slog.Error("Failed to retrieve partner settings", slog.Any(utils.ErrorKey, err), slog.String("partnerId", partnerId))
return nil, err
}

// Parse file content by calling populate
partnerSettings, err := populatePartnerSettings(fileContents, partnerId)
if err != nil {
// We log any errors in the called function
return nil, err
}

// Set up config object
config := &Config{}
config.lastRetrieved = time.Now().UTC()
config.PartnerId = partnerId
config.partnerSettings = partnerSettings

return config, nil
}
25 changes: 12 additions & 13 deletions src/utils/config_test.go → src/config/config_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package utils
package config

import (
"github.com/CDCgov/reportstream-sftp-ingestion/utils"
"github.com/stretchr/testify/assert"
"log/slog"
"testing"
)

const partnerId = "test"

func Test_populatePartnerSettings_populates(t *testing.T) {
jsonInput := []byte(`{
"displayName": "Test Name",
Expand All @@ -14,21 +17,19 @@ func Test_populatePartnerSettings_populates(t *testing.T) {
"hasZipPassword": true,
"defaultEncoding": "ISO-8859-1"
}`)
test := Config{}

_ = test.populatePartnerSettings(jsonInput)
partnerSettings, _ := populatePartnerSettings(jsonInput, partnerId)

assert.Contains(t, test.partnerSettings.DisplayName, "Test Name")
assert.Equal(t, test.partnerSettings.IsActive, true)
assert.Equal(t, test.partnerSettings.IsExternalSftpConnection, true)
assert.Equal(t, test.partnerSettings.HasZipPassword, true)
assert.Contains(t, partnerSettings.DisplayName, "Test Name")
assert.Equal(t, partnerSettings.IsActive, true)
assert.Equal(t, partnerSettings.IsExternalSftpConnection, true)
assert.Equal(t, partnerSettings.HasZipPassword, true)
}

func Test_populatePartnerSettings_errors_whenJsonInvalid(t *testing.T) {
jsonInput := []byte(`bad json`)
test := Config{}

err := test.populatePartnerSettings(jsonInput)
_, err := populatePartnerSettings(jsonInput, partnerId)

assert.Error(t, err)

Expand All @@ -43,12 +44,10 @@ func Test_populatePartnerSettings_errors_whenEncodingInvalid(t *testing.T) {
"defaultEncoding": "Something else"
}`)

buffer, defaultLogger := SetupLogger()
buffer, defaultLogger := utils.SetupLogger()
defer slog.SetDefault(defaultLogger)

test := Config{}

err := test.populatePartnerSettings(jsonInput)
_, err := populatePartnerSettings(jsonInput, partnerId)

assert.Error(t, err)
assert.Contains(t, buffer.String(), "Invalid encoding found")
Expand Down
2 changes: 1 addition & 1 deletion src/mocks/blob_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ type MockBlobHandler struct {
mock.Mock
}

func (receiver *MockBlobHandler) FetchFile(sourceUrl string) ([]byte, error) {
func (receiver *MockBlobHandler) FetchFileByUrl(sourceUrl string) ([]byte, error) {
args := receiver.Called(sourceUrl)
return args.Get(0).([]byte), args.Error(1)
}
Expand Down
1 change: 1 addition & 0 deletions src/orchestration/polling_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type PollingMessageHandler struct {

func (receiver PollingMessageHandler) HandleMessageContents(message azqueue.DequeuedMessage) error {
slog.Info("Handling polling message", slog.String("message text", *message.MessageText))

// In future, we will use the message contents to figure out stuff about config and files
// SFTP handler currently has hard-coded details about where to retrieve files from
credentialGetter, err := secrets.GetCredentialGetter()
Expand Down
14 changes: 7 additions & 7 deletions src/storage/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ type AzureBlobHandler struct {
blobClient *azblob.Client
}


func NewAzureBlobHandler() (AzureBlobHandler, error) {
connectionString := os.Getenv("AZURE_STORAGE_CONNECTION_STRING")
blobClient, err := azblob.NewClientFromConnectionString(connectionString, nil)
Expand All @@ -24,24 +23,25 @@ func NewAzureBlobHandler() (AzureBlobHandler, error) {
return AzureBlobHandler{blobClient: blobClient}, nil
}

func (receiver AzureBlobHandler) FetchFile(sourceUrl string) ([]byte, error) {
func (receiver AzureBlobHandler) FetchFileByUrl(sourceUrl string) ([]byte, error) {
sourceUrlParts, err := azblob.ParseURL(sourceUrl)
if err != nil {
slog.Error("Unable to parse source URL", slog.String("sourceUrl", sourceUrl), slog.Any(utils.ErrorKey, err))
return nil, err
}
return receiver.FetchFile(sourceUrlParts.ContainerName, sourceUrlParts.BlobName)
}

streamResponse, err := receiver.blobClient.DownloadStream(context.Background(), sourceUrlParts.ContainerName, sourceUrlParts.BlobName, nil)
func (receiver AzureBlobHandler) FetchFile(containerName string, blobName string) ([]byte, error) {
streamResponse, err := receiver.blobClient.DownloadStream(context.Background(), containerName, blobName, nil)
if err != nil {
return nil, err
}

retryReader := streamResponse.NewRetryReader(context.Background(), nil)
defer retryReader.Close()

resp, err := io.ReadAll(retryReader)

return resp, err
return io.ReadAll(retryReader)
}

func (receiver AzureBlobHandler) UploadFile(fileBytes []byte, blobPath string) error {
Expand Down Expand Up @@ -69,7 +69,7 @@ func (receiver AzureBlobHandler) MoveFile(sourceUrl string, destinationUrl strin
return err
}

fileBytes, err := receiver.FetchFile(sourceUrl)
fileBytes, err := receiver.FetchFileByUrl(sourceUrl)
if err != nil {
slog.Error("Unable to fetch file", slog.String("sourceUrl", sourceUrl), slog.Any(utils.ErrorKey, err))
return err
Expand Down
2 changes: 1 addition & 1 deletion src/usecases/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package usecases
// The BlobHandler interface is about interacting with file data,
// e.g. in Azure Blob Storage or a local filesystem
type BlobHandler interface {
FetchFile(sourceUrl string) ([]byte, error)
FetchFileByUrl(sourceUrl string) ([]byte, error)
MoveFile(sourceUrl string, destinationUrl string) error
UploadFile(fileBytes []byte, blobPath string) error
}
2 changes: 1 addition & 1 deletion src/usecases/read_and_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func NewReadAndSendUsecase() (ReadAndSendUsecase, error) {
// `nil` so that we'll delete the queue message and not retry. On a transient error or an unknown error, we return
// an error, which will cause the queue message to retry later
func (receiver *ReadAndSendUsecase) ReadAndSend(sourceUrl string) error {
content, err := receiver.blobHandler.FetchFile(sourceUrl)
content, err := receiver.blobHandler.FetchFileByUrl(sourceUrl)
if err != nil {
slog.Error("Failed to read the file", slog.String("filepath", sourceUrl), slog.Any(utils.ErrorKey, err))
return err
Expand Down
8 changes: 4 additions & 4 deletions src/usecases/read_and_send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

func Test_ReadAndSend_FailsToReadBlob_ReturnsError(t *testing.T) {
mockBlobHandler := &mocks.MockBlobHandler{}
mockBlobHandler.On("FetchFile", utils.SourceUrl).Return([]byte{}, errors.New("it blew up"))
mockBlobHandler.On("FetchFileByUrl", utils.SourceUrl).Return([]byte{}, errors.New("it blew up"))

usecase := ReadAndSendUsecase{blobHandler: mockBlobHandler}

Expand All @@ -25,7 +25,7 @@ func Test_ReadAndSend_FailsToReadBlob_ReturnsError(t *testing.T) {

func Test_ReadAndSend_NonTransientFailureFromReportStream_MovesFileToFailureFolder(t *testing.T) {
mockBlobHandler := &mocks.MockBlobHandler{}
mockBlobHandler.On("FetchFile", utils.SourceUrl).Return([]byte("The DogCow went Moof!"), nil)
mockBlobHandler.On("FetchFileByUrl", utils.SourceUrl).Return([]byte("The DogCow went Moof!"), nil)
mockBlobHandler.On("MoveFile", utils.SourceUrl, utils.FailureSourceUrl).Return(nil)

mockMessageSender := &MockMessageSender{}
Expand All @@ -41,7 +41,7 @@ func Test_ReadAndSend_NonTransientFailureFromReportStream_MovesFileToFailureFold

func Test_ReadAndSend_UnexpectedErrorFromReportStream_ReturnsErrorAndDoesNotMoveFile(t *testing.T) {
mockBlobHandler := &mocks.MockBlobHandler{}
mockBlobHandler.On("FetchFile", utils.SourceUrl).Return([]byte("The DogCow went Moof!"), nil)
mockBlobHandler.On("FetchFileByUrl", utils.SourceUrl).Return([]byte("The DogCow went Moof!"), nil)
mockBlobHandler.On("MoveFile", utils.SourceUrl, utils.FailureSourceUrl).Return(nil)

mockMessageSender := &MockMessageSender{}
Expand All @@ -57,7 +57,7 @@ func Test_ReadAndSend_UnexpectedErrorFromReportStream_ReturnsErrorAndDoesNotMove

func Test_ReadAndSend_successfulReadAndSend(t *testing.T) {
mockBlobHandler := &mocks.MockBlobHandler{}
mockBlobHandler.On("FetchFile", utils.SourceUrl).Return([]byte("The DogCow went Moof!"), nil)
mockBlobHandler.On("FetchFileByUrl", utils.SourceUrl).Return([]byte("The DogCow went Moof!"), nil)
mockBlobHandler.On("MoveFile", utils.SourceUrl, utils.SuccessSourceUrl).Return(nil)

mockMessageSender := &MockMessageSender{}
Expand Down
Loading

0 comments on commit 74dee41

Please sign in to comment.