Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Register call_requests in background at metatx #35

Merged
merged 9 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 157 additions & 16 deletions bugout.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"strconv"
Expand All @@ -22,6 +23,8 @@ type BugoutAPIClient struct {
BroodBaseURL string
SpireBaseURL string
HTTPClient *http.Client

BugoutSpireClient spire.SpireClient
}

func InitBugoutAPIClient() (*BugoutAPIClient, error) {
Expand All @@ -41,10 +44,14 @@ func InitBugoutAPIClient() (*BugoutAPIClient, error) {
timeout := time.Duration(timeoutSeconds) * time.Second
httpClient := http.Client{Timeout: timeout}

spire.ClientFromEnv()

return &BugoutAPIClient{
BroodBaseURL: BROOD_API_URL,
SpireBaseURL: SPIRE_API_URL,
HTTPClient: &httpClient,

BugoutSpireClient: spire.SpireClient{SpireURL: SPIRE_API_URL, Routes: spire.RoutesFromURL(SPIRE_API_URL), HTTPClient: &httpClient},
}, nil
}

Expand Down Expand Up @@ -211,7 +218,7 @@ func (c *BugoutAPIClient) CheckAccessToResource(token, resourceId string) (Resou
var requestBodyBytes []byte
request, requestErr := http.NewRequest("GET", fmt.Sprintf("%s/resources/%s/holders", c.BroodBaseURL, resourceId), bytes.NewBuffer(requestBodyBytes))
if requestErr != nil {
return resourceHolders, 500, requestErr
return resourceHolders, 0, requestErr
}

request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", token))
Expand All @@ -220,7 +227,7 @@ func (c *BugoutAPIClient) CheckAccessToResource(token, resourceId string) (Resou

response, responseErr := c.HTTPClient.Do(request)
if responseErr != nil {
return resourceHolders, 500, responseErr
return resourceHolders, 0, responseErr
}
defer response.Body.Close()

Expand All @@ -229,8 +236,8 @@ func (c *BugoutAPIClient) CheckAccessToResource(token, resourceId string) (Resou
return resourceHolders, response.StatusCode, fmt.Errorf("could not read response body: %s", responseBodyErr.Error())
}

if response.StatusCode < 200 || response.StatusCode >= 300 {
return resourceHolders, response.StatusCode, fmt.Errorf("unexpected status code: %d -- could not read response body: %s", response.StatusCode, response.Status)
if response.StatusCode != 200 {
return resourceHolders, response.StatusCode, fmt.Errorf("unexpected status code: %d -- response status: %s", response.StatusCode, response.Status)
}

unmarshalErr := json.Unmarshal(responseBody, &resourceHolders)
Expand All @@ -252,15 +259,15 @@ func (c *BugoutAPIClient) FindUser(token, userId string) (User, int, error) {
var requestBodyBytes []byte
request, requestErr := http.NewRequest("GET", fmt.Sprintf("%s/user/find?user_id=%s", c.BroodBaseURL, userId), bytes.NewBuffer(requestBodyBytes))
if requestErr != nil {
return user, 500, requestErr
return user, 0, requestErr
}
request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", token))
request.Header.Add("Accept", "application/json")
request.Header.Add("Content-Type", "application/json")

response, responseErr := c.HTTPClient.Do(request)
if responseErr != nil {
return user, 500, requestErr
return user, 0, requestErr
}
defer response.Body.Close()

Expand All @@ -269,8 +276,8 @@ func (c *BugoutAPIClient) FindUser(token, userId string) (User, int, error) {
return user, response.StatusCode, fmt.Errorf("could not read response body: %s", responseBodyErr.Error())
}

if response.StatusCode < 200 || response.StatusCode >= 300 {
return user, response.StatusCode, fmt.Errorf("unexpected status code: %d -- could not read response body: %s", response.StatusCode, response.Status)
if response.StatusCode != 200 {
return user, response.StatusCode, fmt.Errorf("unexpected status code: %d -- response status: %s", response.StatusCode, response.Status)
}

unmarshalErr := json.Unmarshal(responseBody, &user)
Expand All @@ -292,15 +299,15 @@ func (c *BugoutAPIClient) FindGroup(token, groupId string) (Group, int, error) {
var requestBodyBytes []byte
request, requestErr := http.NewRequest("GET", fmt.Sprintf("%s/group/find?group_id=%s", c.BroodBaseURL, groupId), bytes.NewBuffer(requestBodyBytes))
if requestErr != nil {
return group, 500, requestErr
return group, 0, requestErr

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we sending a status code 0 on error?

Copy link
Collaborator Author

@kompotkot kompotkot May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to pass reasonable status codes back to client, so we need to proxy it. When getting 0, it means failed on waggle side, not from other API

}
request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", token))
request.Header.Add("Accept", "application/json")
request.Header.Add("Content-Type", "application/json")

response, responseErr := c.HTTPClient.Do(request)
if responseErr != nil {
return group, 500, requestErr
return group, 0, requestErr

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as above.

}
defer response.Body.Close()

Expand All @@ -309,8 +316,8 @@ func (c *BugoutAPIClient) FindGroup(token, groupId string) (Group, int, error) {
return group, response.StatusCode, fmt.Errorf("could not read response body: %s", responseBodyErr.Error())
}

if response.StatusCode < 200 || response.StatusCode >= 300 {
return group, response.StatusCode, fmt.Errorf("unexpected status code: %d -- could not read response body: %s", response.StatusCode, response.Status)
if response.StatusCode != 200 {
return group, response.StatusCode, fmt.Errorf("unexpected status code: %d -- response status: %s", response.StatusCode, response.Status)
}

unmarshalErr := json.Unmarshal(responseBody, &group)
Expand All @@ -334,7 +341,7 @@ func (c *BugoutAPIClient) ModifyAccessToResource(token, resourceId, method strin

request, requestErr := http.NewRequest(method, fmt.Sprintf("%s/resources/%s/holders", c.BroodBaseURL, resourceId), reqBodyBytes)
if requestErr != nil {
return resourceHolders, 500, requestErr
return resourceHolders, 0, requestErr
}

request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", token))
Expand All @@ -343,7 +350,7 @@ func (c *BugoutAPIClient) ModifyAccessToResource(token, resourceId, method strin

response, responseErr := c.HTTPClient.Do(request)
if responseErr != nil {
return resourceHolders, 500, responseErr
return resourceHolders, 0, responseErr
}
defer response.Body.Close()

Expand All @@ -352,8 +359,8 @@ func (c *BugoutAPIClient) ModifyAccessToResource(token, resourceId, method strin
return resourceHolders, response.StatusCode, fmt.Errorf("could not read response body: %s", responseBodyErr.Error())
}

if response.StatusCode < 200 || response.StatusCode >= 300 {
return resourceHolders, response.StatusCode, fmt.Errorf("unexpected status code: %d -- could not read response body: %s", response.StatusCode, response.Status)
if response.StatusCode != 200 {
return resourceHolders, response.StatusCode, fmt.Errorf("unexpected status code: %d -- response status: %s", response.StatusCode, response.Status)
}

unmarshalErr := json.Unmarshal(responseBody, &resourceHolders)
Expand All @@ -363,3 +370,137 @@ func (c *BugoutAPIClient) ModifyAccessToResource(token, resourceId, method strin

return resourceHolders, response.StatusCode, nil
}

type JobEntryContent struct {
FailedCallRequests []string `json:"failed_call_requests"`
PushedCallRequestIds []string `json:"pushed_call_request_ids"`

ContentParseError bool `json:"content_parse_error,omitempty"`
ContentRaw string `json:"content_raw,omitempty"`
}

type RequestJobEntry struct {
Title string `json:"title"`
Content string `json:"content"`
Tags []string `json:"tags"`
}

type JobResponse struct {
JobEntryUrl string `json:"job_entry_url,omitempty"`
Title string `json:"title"`
Content JobEntryContent `json:"content"`
Tags []string `json:"tags"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
}

type JobsResponse struct {
Signer string `json:"signer"`
TotalResults int `json:"total_results"`
Offset int `json:"offset,omitempty"`
NextOffset int `json:"next_offset,omitempty"`
Jobs []JobResponse `json:"jobs"`
}

func SearchJobsInJournal(client *spire.SpireClient, signer string, limit, offset int) (*JobsResponse, error) {
searchQuery := fmt.Sprintf("tag:signer:%s", signer)
parameters := map[string]string{
"content": "true",
}
entryResultsPage, searchErr := client.SearchEntries(BUGOUT_WAGGLE_ADMIN_ACCESS_TOKEN, BUGOUT_METATX_JOBS_JOURNAL_ID, searchQuery, limit, offset, parameters)
if searchErr != nil {
return nil, searchErr
}

jobsResponse := JobsResponse{
Signer: signer,
TotalResults: entryResultsPage.TotalResults,
Offset: entryResultsPage.Offset,
NextOffset: entryResultsPage.NextOffset,
}

for _, r := range entryResultsPage.Results {
var jobEntryContent JobEntryContent
unmarshalErr := json.Unmarshal([]byte(r.Content), &jobEntryContent)
if unmarshalErr != nil {
log.Printf("Unable to parse content for entry %s, error: %v", r.Url, unmarshalErr)
jobEntryContent = JobEntryContent{
ContentParseError: true,
ContentRaw: r.Content,
}
}

job := JobResponse{
JobEntryUrl: r.Url,
Title: r.Title,
Content: jobEntryContent,
Tags: r.Tags,
CreatedAt: r.CreatedAt,
UpdatedAt: r.UpdatedAt,
}
jobsResponse.Jobs = append(jobsResponse.Jobs, job)
}

return &jobsResponse, searchErr
}

func CreateJobInJournal(client *spire.SpireClient, signer string) (*spire.Entry, error) {
title := fmt.Sprintf("draft job - signer %s", signer)
entryContext := spire.EntryContext{
ContextType: "waggle",
}
tags := []string{
"type:job",
fmt.Sprintf("signer:%s", signer),
"complete:false",
}
content := string([]byte("{}"))
jobEntry, err := client.CreateEntry(BUGOUT_WAGGLE_ADMIN_ACCESS_TOKEN, BUGOUT_METATX_JOBS_JOURNAL_ID, title, content, tags, entryContext)

return &jobEntry, err
}

func (c *BugoutAPIClient) UpdateJobInJournal(entryId, signer string, pushedCallRequestIds, failedCallRequests []string) (int, error) {
tags := []string{
"type:job",
fmt.Sprintf("signer:%s", signer),
"complete:true",
}
if len(failedCallRequests) != 0 {
tags = append(tags, "failed:true")
}

jobEntryContent := &JobEntryContent{
FailedCallRequests: failedCallRequests,
PushedCallRequestIds: pushedCallRequestIds,
}
jobEntryContentStr, marshalErr := json.Marshal(jobEntryContent)
if marshalErr != nil {
return 0, marshalErr
}

requestBody := RequestJobEntry{
Title: fmt.Sprintf("job - signer %s", signer),
Content: string(jobEntryContentStr),
Tags: tags,
}
reqBodyBytes := new(bytes.Buffer)
json.NewEncoder(reqBodyBytes).Encode(requestBody)

request, requestErr := http.NewRequest("PUT", fmt.Sprintf("%s/journals/%s/entries/%s?tags_action=replace", c.SpireBaseURL, BUGOUT_METATX_JOBS_JOURNAL_ID, entryId), reqBodyBytes)
if requestErr != nil {
return 0, requestErr
}

request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", BUGOUT_WAGGLE_ADMIN_ACCESS_TOKEN))
request.Header.Add("Accept", "application/json")
request.Header.Add("Content-Type", "application/json")

response, responseErr := c.HTTPClient.Do(request)
if responseErr != nil {
return 0, responseErr
}
defer response.Body.Close()

return response.StatusCode, nil
}
51 changes: 45 additions & 6 deletions cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func CreateMoonstreamCommand() *cobra.Command {
}

var blockchain, address, contractType, contractId, contractAddress, infile string
var limit, offset, batchSize, retries int
var limit, offset, batchSize int
var showExpired bool

contractsSubcommand := &cobra.Command{
Expand Down Expand Up @@ -486,9 +486,12 @@ func CreateMoonstreamCommand() *cobra.Command {
return parseErr
}

callRequests := make([]CallRequestSpecification, len(messages))
batchSize := 100
callRequestsLen := len(messages)
var callRequestBatches [][]CallRequestSpecification
var currentBatch []CallRequestSpecification
for i, message := range messages {
callRequests[i] = CallRequestSpecification{
currentBatch = append(currentBatch, CallRequestSpecification{
Caller: message.Claimant,
Method: "claim",
RequestId: message.RequestID,
Expand All @@ -499,19 +502,55 @@ func CreateMoonstreamCommand() *cobra.Command {
Signer: message.Signer,
Signature: message.Signature,
},
})

if (i+1)%batchSize == 0 || i == callRequestsLen-1 {
callRequestBatches = append(callRequestBatches, currentBatch)
currentBatch = nil // Reset the batch
}
}

if contractId == "" && contractAddress == "" {
return fmt.Errorf("you must specify at least one of contractId or contractAddress when creating call requests")
}

for i, batchSpecs := range callRequestBatches {
requestBody := CreateCallRequestsRequest{
TTLDays: limit,
Specifications: batchSpecs,
}

if contractId != "" {
requestBody.ContractID = contractId
}

if contractAddress != "" {
requestBody.ContractAddress = contractAddress
}

requestBodyBytes, requestBodyBytesErr := json.Marshal(requestBody)
if requestBodyBytesErr != nil {
return requestBodyBytesErr
}

statusCode, responseBodyStr := client.sendCallRequests(MOONSTREAM_ACCESS_TOKEN, requestBodyBytes)
if statusCode == 200 {
fmt.Printf("Successfully pushed %d batch of %d total with %d call_requests to API\n", i+1, len(callRequestBatches), len(batchSpecs))
} else if statusCode == 409 {
fmt.Printf("During sending call requests an error ocurred: %v\n", responseBodyStr)
} else {
fmt.Printf("During sending call requests an error ocurred: %v\n", responseBodyStr)
}
}

err := client.CreateCallRequests(MOONSTREAM_ACCESS_TOKEN, contractId, contractAddress, limit, callRequests, batchSize, retries)
return err
return nil
},
}
createCallRequestsSubcommand.Flags().StringVar(&contractId, "contract-id", "", "Moonstream Engine ID of the registered contract")
createCallRequestsSubcommand.Flags().StringVar(&contractAddress, "contract-address", "", "Address of the contract (at least one of --contract-id or --contract-address must be specified)")
createCallRequestsSubcommand.Flags().IntVar(&limit, "ttl-days", 30, "Number of days for which request will remain active")
createCallRequestsSubcommand.Flags().StringVar(&infile, "infile", "", "Input file. If not specified, input will be expected from stdin.")
createCallRequestsSubcommand.Flags().IntVar(&batchSize, "batch-size", 100, "Number of rows per request to API")
createCallRequestsSubcommand.Flags().IntVar(&retries, "retries", 1, "Number of retries for failed requests")

moonstreamCommand.AddCommand(contractsSubcommand, callRequestsSubcommand, createCallRequestsSubcommand)

Expand Down
Loading
Loading