diff --git a/config/config.yaml b/config/config.yaml index 36977f77e..88d5f6622 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -235,6 +235,16 @@ checks: options: timeout: 5m type: pingpong + act: + options: + file-name: act + file-size: 1024 + postage-depth: 20 + postage-amount: 420000000 + postage-label: act-label + seed: 0 + timeout: 5m + type: act withdraw: options: target-address: 0xec44cb15b1b033e74d55ac5d0e24d861bde54532 diff --git a/config/local.yaml b/config/local.yaml index fef28507b..d2a1d924e 100644 --- a/config/local.yaml +++ b/config/local.yaml @@ -214,6 +214,15 @@ bee-configs: # CI checks checks: + ci-act: + options: + file-size: 1024 + postage-depth: 20 + postage-amount: 420000000 + postage-label: act-label + seed: 0 + timeout: 5m + type: act ci-full-connectivity: timeout: 5m type: full-connectivity diff --git a/pkg/bee/api/act.go b/pkg/bee/api/act.go new file mode 100644 index 000000000..404f8591b --- /dev/null +++ b/pkg/bee/api/act.go @@ -0,0 +1,63 @@ +package api + +import ( + "context" + "io" + "net/http" + "net/url" + + "github.com/ethersphere/bee/pkg/swarm" +) + +type ActService service + +type ActUploadResponse struct { + Reference swarm.Address `json:"reference"` + HistoryAddress swarm.Address +} + +type ActGranteesResponse struct { + Reference swarm.Address `json:"ref"` + HistoryAddress swarm.Address `json:"historyref"` +} + +func (a *ActService) Download(ctx context.Context, addr swarm.Address, opts *DownloadOptions) (resp io.ReadCloser, err error) { + return a.client.requestData(ctx, http.MethodGet, "/"+apiVersion+"/bzz/"+addr.String()+"/", nil, opts) +} + +func (a *ActService) Upload(ctx context.Context, name string, data io.Reader, o UploadOptions) (ActUploadResponse, error) { + var resp ActUploadResponse + h := http.Header{} + h.Add(postageStampBatchHeader, o.BatchID) + h.Add("swarm-deferred-upload", "true") + h.Add("content-type", "application/octet-stream") + h.Add("Swarm-Act", "true") + h.Add(swarmPinHeader, "true") + historyParser := func(h http.Header) { + resp.HistoryAddress, _ = swarm.ParseHexAddress(h.Get("Swarm-Act-History-Address")) + } + err := a.client.requestWithHeader(ctx, http.MethodPost, "/"+apiVersion+"/bzz?"+url.QueryEscape("name="+name), h, data, &resp, historyParser) + return resp, err +} + +func (a *ActService) AddGrantees(ctx context.Context, data io.Reader, o UploadOptions) (ActGranteesResponse, error) { + var resp ActGranteesResponse + h := http.Header{} + h.Add(postageStampBatchHeader, o.BatchID) + h.Add(swarmActHistoryAddress, o.ActHistoryAddress.String()) + err := a.client.requestWithHeader(ctx, http.MethodPost, "/"+apiVersion+"/grantee", h, data, &resp) + return resp, err +} + +func (a *ActService) GetGrantees(ctx context.Context, addr swarm.Address) (resp io.ReadCloser, err error) { + return a.client.requestData(ctx, http.MethodGet, "/"+apiVersion+"/grantee/"+addr.String(), nil, nil) +} + +func (a *ActService) PatchGrantees(ctx context.Context, data io.Reader, addr swarm.Address, haddr swarm.Address, batchID string) (ActGranteesResponse, error) { + var resp ActGranteesResponse + h := http.Header{} + h.Add("swarm-postage-batch-id", batchID) + h.Add("swarm-act-history-address", haddr.String()) + err := a.client.requestWithHeader(ctx, http.MethodPatch, "/"+apiVersion+"/grantee/"+addr.String(), h, data, &resp) + return resp, err +} diff --git a/pkg/bee/api/api.go b/pkg/bee/api/api.go index 2e43c84d9..4a86d5e5e 100644 --- a/pkg/bee/api/api.go +++ b/pkg/bee/api/api.go @@ -11,14 +11,19 @@ import ( "strconv" "strings" + "github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/beekeeper" ) const ( apiVersion = "v1" - contentType = "application/json; charset=utf-8" + contentType = "application/json, text/plain, */*; charset=utf-8" postageStampBatchHeader = "Swarm-Postage-Batch-Id" deferredUploadHeader = "Swarm-Deferred-Upload" + swarmAct = "Swarm-Act" + swarmActHistoryAddress = "Swarm-Act-History-Address" + swarmActPublisher = "Swarm-Act-Publisher" + swarmActTimestamp = "Swarm-Act-Timestamp" swarmPinHeader = "Swarm-Pin" swarmTagHeader = "Swarm-Tag" swarmCacheDownloadHeader = "Swarm-Cache" @@ -33,6 +38,7 @@ type Client struct { service service // Reuse a single struct instead of allocating one for each service on the heap. // Services that API provides. + Act *ActService Bytes *BytesService Chunks *ChunksService Files *FilesService @@ -71,6 +77,7 @@ func NewClient(baseURL *url.URL, o *ClientOptions) (c *Client) { func newClient(httpClient *http.Client) (c *Client) { c = &Client{httpClient: httpClient} c.service.client = c + c.Act = (*ActService)(&c.service) c.Bytes = (*BytesService)(&c.service) c.Chunks = (*ChunksService)(&c.service) c.Files = (*FilesService)(&c.service) @@ -179,6 +186,21 @@ func (c *Client) requestData(ctx context.Context, method, path string, body io.R req.Header.Set("Content-Type", contentType) } req.Header.Set("Accept", contentType) + // ACT + if opts != nil { + if opts.Act != nil { + req.Header.Set(swarmAct, strconv.FormatBool(*opts.Act)) + } + if opts.ActHistoryAddress != nil { + req.Header.Set(swarmActHistoryAddress, (*opts.ActHistoryAddress).String()) + } + if opts.ActPublicKey != nil { + req.Header.Set(swarmActPublisher, (*opts.ActPublicKey).String()) + } + if opts.ActTimestamp != nil { + req.Header.Set(swarmActTimestamp, strconv.FormatUint(*opts.ActTimestamp, 10)) + } + } if opts != nil && opts.Cache != nil { req.Header.Set(swarmCacheDownloadHeader, strconv.FormatBool(*opts.Cache)) @@ -186,7 +208,6 @@ func (c *Client) requestData(ctx context.Context, method, path string, body io.R if opts != nil && opts.RedundancyFallbackMode != nil { req.Header.Set(swarmRedundancyFallbackMode, strconv.FormatBool(*opts.RedundancyFallbackMode)) } - r, err := c.httpClient.Do(req) if err != nil { return nil, err @@ -200,7 +221,7 @@ func (c *Client) requestData(ctx context.Context, method, path string, body io.R } // requestWithHeader handles the HTTP request response cycle. -func (c *Client) requestWithHeader(ctx context.Context, method, path string, header http.Header, body io.Reader, v interface{}) (err error) { +func (c *Client) requestWithHeader(ctx context.Context, method, path string, header http.Header, body io.Reader, v interface{}, headerParser ...func(http.Header)) (err error) { req, err := http.NewRequest(method, path, body) if err != nil { return err @@ -215,12 +236,11 @@ func (c *Client) requestWithHeader(ctx context.Context, method, path string, hea return err } - if err = responseErrorHandler(r); err != nil { - return err - } - if v != nil && strings.Contains(r.Header.Get("Content-Type"), "application/json") { _ = json.NewDecoder(r.Body).Decode(&v) + for _, parser := range headerParser { + parser(r.Header) + } return err } @@ -292,13 +312,19 @@ func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { } type UploadOptions struct { - Pin bool - Tag uint64 - BatchID string - Direct bool + Act bool + Pin bool + Tag uint64 + BatchID string + Direct bool + ActHistoryAddress swarm.Address } type DownloadOptions struct { + Act *bool + ActHistoryAddress *swarm.Address + ActPublicKey *swarm.Address + ActTimestamp *uint64 Cache *bool RedundancyFallbackMode *bool } diff --git a/pkg/bee/client.go b/pkg/bee/client.go index adce01c1d..1663ea160 100644 --- a/pkg/bee/client.go +++ b/pkg/bee/client.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/tls" + "encoding/json" "errors" "fmt" "io" @@ -223,6 +224,21 @@ func (c *Client) DownloadFile(ctx context.Context, a swarm.Address, opts *api.Do return size, h.Sum(nil), nil } +func (c *Client) DownloadActFile(ctx context.Context, a swarm.Address, opts *api.DownloadOptions) (size int64, hash []byte, err error) { + r, err := c.api.Act.Download(ctx, a, opts) + if err != nil { + return 0, nil, fmt.Errorf("download file %s: %w", a, err) + } + defer r.Close() + h := fileHasher() + size, err = io.Copy(h, r) + if err != nil { + return 0, nil, fmt.Errorf("download file %s, hashing copy: %w", a, err) + } + + return size, h.Sum(nil), nil +} + // HasChunk returns true/false if node has a chunk func (c *Client) HasChunk(ctx context.Context, a swarm.Address) (bool, error) { return c.api.Node.HasChunk(ctx, a) @@ -750,6 +766,55 @@ func (c *Client) UploadFile(ctx context.Context, f *File, o api.UploadOptions) ( return } +func (c *Client) UploadActFile(ctx context.Context, f *File, o api.UploadOptions) (err error) { + h := fileHasher() + r, err := c.api.Act.Upload(ctx, f.Name(), io.TeeReader(f.DataReader(), h), o) + if err != nil { + return fmt.Errorf("upload ACT file: %w", err) + } + + f.SetAddress(r.Reference) + f.SetHistroryAddress(r.HistoryAddress) + f.SetHash(h.Sum(nil)) + + return nil +} + +func (c *Client) AddActGrantees(ctx context.Context, f *File, o api.UploadOptions) (err error) { + h := fileHasher() + r, err := c.api.Act.AddGrantees(ctx, io.TeeReader(f.DataReader(), h), o) + if err != nil { + return fmt.Errorf("add ACT grantees: %w", err) + } + + f.SetAddress(r.Reference) + f.SetHistroryAddress(r.HistoryAddress) + f.SetHash(h.Sum(nil)) + + return nil +} + +func (c *Client) GetActGrantees(ctx context.Context, a swarm.Address) (addresses []string, err error) { + r, e := c.api.Act.GetGrantees(ctx, a) + if e != nil { + return nil, fmt.Errorf("get grantees: %s: %w", a, e) + } + defer r.Close() + err = json.NewDecoder(r).Decode(&addresses) + return addresses, err +} + +func (c *Client) PatchActGrantees(ctx context.Context, pf *File, addr swarm.Address, haddr swarm.Address, batchID string) (err error) { + r, err := c.api.Act.PatchGrantees(ctx, pf.DataReader(), addr, haddr, batchID) + if err != nil { + return fmt.Errorf("add ACT grantees: %w", err) + } + + pf.SetAddress(r.Reference) + pf.SetHistroryAddress(r.HistoryAddress) + return nil +} + // UploadCollection uploads TAR collection bytes to the node func (c *Client) UploadCollection(ctx context.Context, f *File, o api.UploadOptions) (err error) { h := fileHasher() diff --git a/pkg/bee/file.go b/pkg/bee/file.go index 9478e42f3..29c2207d6 100644 --- a/pkg/bee/file.go +++ b/pkg/bee/file.go @@ -13,11 +13,12 @@ import ( // File represents Bee file type File struct { - address swarm.Address - name string - hash []byte - dataReader io.Reader - size int64 + address swarm.Address + name string + hash []byte + dataReader io.Reader + size int64 + historyAddress swarm.Address } // NewRandomFile returns new pseudorandom file @@ -62,6 +63,10 @@ func (f *File) Address() swarm.Address { return f.address } +func (f *File) HistroryAddress() swarm.Address { + return f.historyAddress +} + // Name returns file's name func (f *File) Name() string { return f.name @@ -109,6 +114,10 @@ func (f *File) SetAddress(a swarm.Address) { f.address = a } +func (f *File) SetHistroryAddress(a swarm.Address) { + f.historyAddress = a +} + func (f *File) SetHash(h []byte) { f.hash = h } diff --git a/pkg/check/act/act.go b/pkg/check/act/act.go new file mode 100644 index 000000000..ac782cb82 --- /dev/null +++ b/pkg/check/act/act.go @@ -0,0 +1,265 @@ +package act + +import ( + "bytes" + "context" + "errors" + "fmt" + "time" + + "github.com/ethersphere/bee/pkg/swarm" + "github.com/ethersphere/beekeeper/pkg/bee" + "github.com/ethersphere/beekeeper/pkg/bee/api" + "github.com/ethersphere/beekeeper/pkg/beekeeper" + "github.com/ethersphere/beekeeper/pkg/logging" + "github.com/ethersphere/beekeeper/pkg/orchestration" + "github.com/ethersphere/beekeeper/pkg/random" +) + +// Options represents check options +type Options struct { + FileName string + FileSize int64 + PostageAmount int64 + PostageDepth uint64 + PostageLabel string + Seed int64 +} + +// NewDefaultOptions returns new default options +func NewDefaultOptions() Options { + return Options{ + FileName: "act", + FileSize: 1 * 1024, + PostageAmount: 420000000, + PostageDepth: 20, + PostageLabel: "act-label", + Seed: 0, + } +} + +// compile check whether Check implements interface +var _ beekeeper.Action = (*Check)(nil) + +// Check instance +type Check struct { + logger logging.Logger +} + +// NewCheck returns new check +func NewCheck(logger logging.Logger) beekeeper.Action { + return &Check{ + logger: logger, + } +} + +// Run executes act check +func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts interface{}) (err error) { + o, ok := opts.(Options) + if !ok { + return fmt.Errorf("invalid options type") + } + + clients, err := cluster.NodesClients(ctx) + if err != nil { + return err + } + + fullNodes := cluster.FullNodeNames() + lightNodes := cluster.LightNodeNames() + + upNodeName := lightNodes[0] + upClient := clients[upNodeName] + addr, _ := upClient.Addresses(ctx) + publisher, _ := swarm.ParseHexAddress(addr.PublicKey) + + nodeName1 := fullNodes[0] + client1 := clients[nodeName1] + addr1, _ := client1.Addresses(ctx) + pubk1, _ := swarm.ParseHexAddress(addr1.PublicKey) + + nodeName2 := fullNodes[1] + client2 := clients[nodeName2] + addr2, _ := client2.Addresses(ctx) + pubk2, _ := swarm.ParseHexAddress(addr2.PublicKey) + + nodeName3 := fullNodes[2] + client3 := clients[nodeName3] + addr3, _ := client3.Addresses(ctx) + pubk3, _ := swarm.ParseHexAddress(addr3.PublicKey) + + rnds := random.PseudoGenerators(o.Seed, 1) + + fileName := fmt.Sprintf("%s-%s-%d", o.FileName, upNodeName, rnds[0].Int()) + postagelabel := fmt.Sprintf("%s-%s-%d", o.PostageLabel, upNodeName, rnds[0].Int()) + + file := bee.NewRandomFile(rnds[0], fileName, o.FileSize) + + batchID, err := upClient.GetOrCreateBatch(ctx, o.PostageAmount, o.PostageDepth, postagelabel) + if err != nil { + return fmt.Errorf("created batched id %w", err) + } + + // upload act file + // ---------------------------------------------- + // Given batch is used for the upload + // When the file is uploaded to the node + // Then the file is uploaded successfully + uErr := upClient.UploadActFile(ctx, &file, api.UploadOptions{BatchID: batchID}) + if uErr != nil { + return fmt.Errorf("node %s: %w", upNodeName, uErr) + } + c.logger.Info("ACT file uploaded") + time.Sleep(1 * time.Second) + + // download act file + // ---------------------------------------------- + // Given the file is uploaded to the node + // When the file is downloaded from the node + // Then the file is downloaded successfully + act := true + fileAddress := file.Address() + history := file.HistroryAddress() + size, hash, err := upClient.DownloadActFile(ctx, fileAddress, &api.DownloadOptions{Act: &act, ActPublicKey: &publisher, ActHistoryAddress: &history}) + if err != nil { + return fmt.Errorf("node %s: %w", upNodeName, err) + } + if !bytes.Equal(file.Hash(), hash) { + c.logger.Infof("Node %s. ACT file hash not equal. Uploaded size: %d Downloaded size: %d File: %s", upNodeName, file.Size(), size, fileAddress.String()) + return errors.New("ACT file retrieval - hash error") + } + c.logger.Info("ACT file downloaded") + + // download act file with wrong public key + // ---------------------------------------------- + // Given the file is uploaded to the node + // When the file is downloaded from the node with wrong public key + // Then the file download is denied + notPublisher := pubk1 + _, _, notPErr := upClient.DownloadActFile(ctx, file.Address(), &api.DownloadOptions{Act: &act, ActPublicKey: ¬Publisher, ActHistoryAddress: &history}) + if notPErr == nil { + return fmt.Errorf("node %s: File downloaded with wrong public key successfully - this is an error", upNodeName) + } + c.logger.Info("ACT Access denied with incorrect public key") + + // add grantees list + // ---------------------------------------------- + // Given the file is uploaded to the node (fileHis) + // When the grantees are added to the file + // Then the grantees are added successfully + gFile := bee.NewBufferFile("grantees.json", bytes.NewBuffer([]byte(`{ "grantees": [ + "`+pubk2.String()+`", + "`+pubk3.String()+`" + ] + }`))) + fileHis := file.HistroryAddress() + err = upClient.AddActGrantees(ctx, &gFile, api.UploadOptions{BatchID: batchID, ActHistoryAddress: fileHis}) + if err != nil { + return fmt.Errorf("node %s: add grantees error: %w", upNodeName, err) + } + c.logger.Info("ACT grantees added") + time.Sleep(10 * time.Second) + + // list grantees + // ---------------------------------------------- + // Given the file is uploaded to the node (gFile) + // When the grantees are listed + // Then the grantees are listed successfully + addresses, gErr := upClient.GetActGrantees(ctx, gFile.Address()) + if gErr != nil { + return fmt.Errorf("node %s: GetActGrantees: %w", upNodeName, gErr) + } + if addresses == nil { + return fmt.Errorf("node %s: GetActGrantees: addresses is nil", upNodeName) + } + if len(addresses) != 2 { + return fmt.Errorf("node %s: GetActGrantees: addresses length is not 2", upNodeName) + } + c.logger.Info("ACT grantees listed") + time.Sleep(5 * time.Second) + + // download act file with the publisher after create grantees + // ---------------------------------------------- + // Given the grantee is added to the file + // When the file is downloaded from the node with the publisher + // Then the file is downloaded successfully + h := gFile.HistroryAddress() + size0, hash0, err0 := upClient.DownloadActFile(ctx, fileAddress, &api.DownloadOptions{Act: &act, ActPublicKey: &publisher, ActHistoryAddress: &h}) + if err0 != nil { + return fmt.Errorf("node %s: %w", upNodeName, err0) + } + if !bytes.Equal(file.Hash(), hash0) { + c.logger.Infof("Node %s. ACT file hash not equal. Uploaded size: %d Downloaded size: %d File: %s", upNodeName, file.Size(), size0, fileAddress.String()) + return errors.New("ACT file retrieval - hash error") + } + c.logger.Info("ACT file downloaded with the publisher") + + // download act file with the grantee + // ---------------------------------------------- + // Given the grantee is added to the file + // When the file is downloaded from the node with the grantee + // Then the file is downloaded successfully + his := gFile.HistroryAddress() + size1, hash1, err1 := client2.DownloadActFile(ctx, fileAddress, &api.DownloadOptions{Act: &act, ActPublicKey: &publisher, ActHistoryAddress: &his}) + if err1 != nil { + return fmt.Errorf("node %s: %w", nodeName2, err1) + } + if !bytes.Equal(file.Hash(), hash1) { + c.logger.Infof("Node %s. ACT file hash not equal. Uploaded size: %d Downloaded size: %d File: %s", nodeName2, file.Size(), size1, fileAddress.String()) + return errors.New("ACT file retrieval - hash error") + } + c.logger.Info("ACT file downloaded with the grantee") + + // patch grantees + // ---------------------------------------------- + // Given the grantee is added to the file (gFile) + // When the grantees are patched + // Then the grantees are patched successfully + pFile := bee.NewBufferFile("grantees-patch.json", bytes.NewBuffer([]byte(`{ + "add": [ + "`+pubk1.String()+`" + ], + "revoke": [ + "`+pubk2.String()+`", + "`+pubk3.String()+`" + ] + }`))) + + pErr := upClient.PatchActGrantees(ctx, &pFile, gFile.Address(), gFile.HistroryAddress(), batchID) + if pErr != nil { + return fmt.Errorf("node %s: PatchActGrantees: %w", upNodeName, pErr) + } + c.logger.Info("ACT grantees patched") + time.Sleep(5 * time.Second) + + // list grantees after patch + // ---------------------------------------------- + // Given the grantee is patched + // When the grantees are listed after patch + // Then the grantees are listed successfully + patchAddresses, patchErr := upClient.GetActGrantees(ctx, pFile.Address()) + if patchErr != nil { + return fmt.Errorf("node %s: GetActGrantees after patch: %w", upNodeName, patchErr) + } + if patchAddresses == nil { + return fmt.Errorf("node %s: GetActGrantees after patch: addresses is nil", upNodeName) + } + if len(patchAddresses) != 1 { + return fmt.Errorf("node %s: GetActGrantees after patch: addresses length is not 1", upNodeName) + } + c.logger.Info("ACT grantees listed after patch") + time.Sleep(5 * time.Second) + + // download act file with the not enabled grantee after patch + //---------------------------------------------- + // Given the grantee is patched + // When the file is downloaded from the node with the not enabled grantee + // Then the file download is denied + hG := pFile.HistroryAddress() + _, _, notGErr := client2.DownloadActFile(ctx, fileAddress, &api.DownloadOptions{Act: &act, ActPublicKey: &publisher, ActHistoryAddress: &hG}) + if notGErr == nil { + return fmt.Errorf("node %s: File downloaded with wrong public key successfully - this is an error", nodeName2) + } + c.logger.Info("ACT Access denied for not enabled grantee after patch") + return +} diff --git a/pkg/config/check.go b/pkg/config/check.go index 6414623c4..4298defb7 100644 --- a/pkg/config/check.go +++ b/pkg/config/check.go @@ -6,6 +6,7 @@ import ( "reflect" "time" + "github.com/ethersphere/beekeeper/pkg/check/act" "github.com/ethersphere/beekeeper/pkg/check/networkavailability" "github.com/ethersphere/beekeeper/pkg/check/stake" @@ -56,6 +57,28 @@ type CheckGlobalConfig struct { // Checks represents all available check types var Checks = map[string]CheckType{ + "act": { + NewAction: act.NewCheck, + NewOptions: func(checkGlobalConfig CheckGlobalConfig, check Check) (interface{}, error) { + checkOpts := new(struct { + FileName *string `yaml:"file-name"` + FileSize *int64 `yaml:"file-size"` + PostageAmount *int64 `yaml:"postage-amount"` + PostageDepth *int64 `yaml:"postage-depth"` + PostageLabel *string `yaml:"postage-label"` + Seed *int64 `yaml:"seed"` + }) + if err := check.Options.Decode(checkOpts); err != nil { + return nil, fmt.Errorf("decoding check %s options: %w", check.Type, err) + } + opts := act.NewDefaultOptions() + + if err := applyCheckConfig(checkGlobalConfig, checkOpts, &opts); err != nil { + return nil, fmt.Errorf("applying options: %w", err) + } + return opts, nil + }, + }, "balances": { NewAction: balances.NewCheck, NewOptions: func(checkGlobalConfig CheckGlobalConfig, check Check) (interface{}, error) {