diff --git a/api/api.go b/api/api.go index 0797700be3..b88c9b802b 100644 --- a/api/api.go +++ b/api/api.go @@ -424,9 +424,6 @@ func (a *API) Get(ctx context.Context, decrypt DecryptFunc, manifestAddr storage } mimeType = entry.ContentType log.Debug("content lookup key", "key", contentAddr, "mimetype", mimeType) - if len(entry.Publisher) > 0 { - ctx = context.WithValue(ctx, "publisher", entry.Publisher) - } reader, _ = a.fileStore.Retrieve(ctx, contentAddr) } else { // no entry found @@ -632,12 +629,10 @@ func (a *API) Modify(ctx context.Context, addr storage.Address, path, contentHas apiModifyFail.Inc(1) return nil, err } - publisher, _ := ctx.Value("publisher").(string) if contentHash != "" { entry := newManifestTrieEntry(&ManifestEntry{ Path: path, ContentType: contentType, - Publisher: publisher, }, nil) entry.Hash = contentHash trie.addEntry(entry, quitC) @@ -672,14 +667,12 @@ func (a *API) AddFile(ctx context.Context, mhash, path, fname string, content [] path = path[1:] } - publisher, _ := ctx.Value("publisher").(string) entry := &ManifestEntry{ Path: filepath.Join(path, fname), ContentType: mime.TypeByExtension(filepath.Ext(fname)), Mode: 0700, Size: int64(len(content)), ModTime: time.Now(), - Publisher: publisher, } mw, err := a.NewManifestWriter(ctx, mkey, nil) @@ -704,7 +697,7 @@ func (a *API) AddFile(ctx context.Context, mhash, path, fname string, content [] return fkey, newMkey.String(), nil } -func (a *API) UploadTar(ctx context.Context, bodyReader io.ReadCloser, manifestPath, defaultPath string, mw *ManifestWriter, publisher string) (storage.Address, error) { +func (a *API) UploadTar(ctx context.Context, bodyReader io.ReadCloser, manifestPath, defaultPath string, mw *ManifestWriter) (storage.Address, error) { apiUploadTarCount.Inc(1) var contentKey storage.Address tr := tar.NewReader(bodyReader) @@ -737,7 +730,6 @@ func (a *API) UploadTar(ctx context.Context, bodyReader io.ReadCloser, manifestP Mode: hdr.Mode, Size: hdr.Size, ModTime: hdr.ModTime, - Publisher: publisher, } contentKey, err = mw.AddEntry(ctx, tr, entry) if err != nil { @@ -756,7 +748,6 @@ func (a *API) UploadTar(ctx context.Context, bodyReader io.ReadCloser, manifestP Mode: hdr.Mode, Size: hdr.Size, ModTime: hdr.ModTime, - Publisher: publisher, } contentKey, err = mw.AddEntry(ctx, nil, entry) if err != nil { @@ -871,14 +862,12 @@ func (a *API) AppendFile(ctx context.Context, mhash, path, fname string, existin return nil, "", err } - publisher, _ := ctx.Value("publisher").(string) entry := &ManifestEntry{ Path: filepath.Join(path, fname), ContentType: mime.TypeByExtension(filepath.Ext(fname)), Mode: 0700, Size: totalSize, ModTime: time.Now(), - Publisher: publisher, } fkey, err := mw.AddEntry(ctx, io.Reader(combinedReader), entry) diff --git a/api/client/client.go b/api/client/client.go index 653df2f67e..8845c9fadc 100644 --- a/api/client/client.go +++ b/api/client/client.go @@ -163,11 +163,11 @@ func Open(path string) (*File, error) { // (if the manifest argument is non-empty) or creates a new manifest containing // the file, returning the resulting manifest hash (the file will then be // available at bzz://) -func (c *Client) Upload(file *File, manifest string, toEncrypt, toPin, anonymous bool, publisher string) (string, error) { +func (c *Client) Upload(file *File, manifest string, toEncrypt, toPin, anonymous bool) (string, error) { if file.Size <= 0 { return "", errors.New("file size must be greater than zero") } - return c.TarUpload(manifest, &FileUploader{file}, "", toEncrypt, toPin, anonymous, publisher) + return c.TarUpload(manifest, &FileUploader{file}, "", toEncrypt, toPin, anonymous) } // Download downloads a file with the given path from the swarm manifest with @@ -197,7 +197,7 @@ func (c *Client) Download(hash, path string) (*File, error) { // directory will then be available at bzz://path/to/file), with // the file specified in defaultPath being uploaded to the root of the manifest // (i.e. bzz://) -func (c *Client) UploadDirectory(dir, defaultPath, manifest string, toEncrypt, toPin, anonymous bool, publisher string) (string, error) { +func (c *Client) UploadDirectory(dir, defaultPath, manifest string, toEncrypt, toPin, anonymous bool) (string, error) { stat, err := os.Stat(dir) if err != nil { return "", err @@ -212,12 +212,12 @@ func (c *Client) UploadDirectory(dir, defaultPath, manifest string, toEncrypt, t return "", fmt.Errorf("default path: %v", err) } } - return c.TarUpload(manifest, &DirectoryUploader{dir}, defaultPath, toEncrypt, toPin, anonymous, publisher) + return c.TarUpload(manifest, &DirectoryUploader{dir}, defaultPath, toEncrypt, toPin, anonymous) } // DownloadDirectory downloads the files contained in a swarm manifest under // the given path into a local directory (existing files will be overwritten) -func (c *Client) DownloadDirectory(hash, path, destDir, credentials string) error { +func (c *Client) DownloadDirectory(hash, path, destDir, credentials, publisher string) error { stat, err := os.Stat(destDir) if err != nil { return err @@ -227,6 +227,9 @@ func (c *Client) DownloadDirectory(hash, path, destDir, credentials string) erro uri := c.Gateway + "/bzz:/" + hash + "/" + path req, err := http.NewRequest("GET", uri, nil) + values := req.URL.Query() + values.Add("publisher", publisher) + req.URL.RawQuery = values.Encode() if err != nil { return err } @@ -284,7 +287,7 @@ func (c *Client) DownloadDirectory(hash, path, destDir, credentials string) erro // DownloadFile downloads a single file into the destination directory // if the manifest entry does not specify a file name - it will fallback // to the hash of the file as a filename -func (c *Client) DownloadFile(hash, path, dest, credentials string) error { +func (c *Client) DownloadFile(hash, path, dest, credentials, publisher string) error { hasDestinationFilename := false if stat, err := os.Stat(dest); err == nil { hasDestinationFilename = !stat.IsDir() @@ -297,7 +300,7 @@ func (c *Client) DownloadFile(hash, path, dest, credentials string) error { } } - manifestList, err := c.List(hash, path, credentials) + manifestList, err := c.List(hash, path, credentials, publisher) if err != nil { return err } @@ -313,6 +316,9 @@ func (c *Client) DownloadFile(hash, path, dest, credentials string) error { uri := c.Gateway + "/bzz:/" + hash + "/" + path req, err := http.NewRequest("GET", uri, nil) + values := req.URL.Query() + values.Add("publisher", publisher) + req.URL.RawQuery = values.Encode() if err != nil { return err } @@ -410,8 +416,12 @@ func (c *Client) DownloadManifest(hash string) (*api.Manifest, bool, error) { // - a prefix of "dir1/" would return [dir1/dir2/, dir1/file3.txt] // // where entries ending with "/" are common prefixes. -func (c *Client) List(hash, prefix, credentials string) (*api.ManifestList, error) { - req, err := http.NewRequest(http.MethodGet, c.Gateway+"/bzz-list:/"+hash+"/"+prefix, nil) +func (c *Client) List(hash, prefix, credentials, publisher string) (*api.ManifestList, error) { + uri := c.Gateway + "/bzz-list:/" + hash + "/" + prefix + req, err := http.NewRequest(http.MethodGet, uri, nil) + values := req.URL.Query() + values.Add("publisher", publisher) + req.URL.RawQuery = values.Encode() if err != nil { return nil, err } @@ -511,7 +521,7 @@ type UploadFn func(file *File) error // TarUpload uses the given Uploader to upload files to swarm as a tar stream, // returning the resulting manifest hash -func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, toEncrypt, toPin, anonymous bool, publisher string) (string, error) { +func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, toEncrypt, toPin, anonymous bool) (string, error) { ctx, sp := spancontext.StartSpan(context.Background(), "api.client.tarupload") defer sp.Finish() @@ -539,7 +549,6 @@ func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, t transport := http.DefaultTransport req.Header.Set("Content-Type", "application/x-tar") - req.Header.Set("publisher", publisher) if defaultPath != "" { q := req.URL.Query() q.Set("defaultpath", defaultPath) diff --git a/api/client/client_test.go b/api/client/client_test.go index b3f5dca423..b8fef60219 100644 --- a/api/client/client_test.go +++ b/api/client/client_test.go @@ -125,7 +125,7 @@ func testClientUploadDownloadFiles(toEncrypt bool, t *testing.T) string { Size: int64(len(data)), }, } - hash, err := client.Upload(file, manifest, toEncrypt, toPin, true, "") + hash, err := client.Upload(file, manifest, toEncrypt, toPin, true) if err != nil { t.Fatal(err) } @@ -221,7 +221,7 @@ func TestClientUploadDownloadDirectory(t *testing.T) { // upload the directory client := NewClient(srv.URL) defaultPath := testDirFiles[0] - hash, err := client.UploadDirectory(dir, defaultPath, "", false, false, true, "") + hash, err := client.UploadDirectory(dir, defaultPath, "", false, false, true) if err != nil { t.Fatalf("error uploading directory: %s", err) } @@ -258,7 +258,7 @@ func TestClientUploadDownloadDirectory(t *testing.T) { t.Fatal(err) } defer os.RemoveAll(tmp) - if err := client.DownloadDirectory(hash, "", tmp, ""); err != nil { + if err := client.DownloadDirectory(hash, "", tmp, "", ""); err != nil { t.Fatal(err) } for _, file := range testDirFiles { @@ -289,13 +289,13 @@ func testClientFileList(toEncrypt bool, t *testing.T) { defer os.RemoveAll(dir) client := NewClient(srv.URL) - hash, err := client.UploadDirectory(dir, "", "", toEncrypt, false, true, "") + hash, err := client.UploadDirectory(dir, "", "", toEncrypt, false, true) if err != nil { t.Fatalf("error uploading directory: %s", err) } ls := func(prefix string) []string { - list, err := client.List(hash, prefix, "") + list, err := client.List(hash, prefix, "", "") if err != nil { t.Fatal(err) } @@ -475,7 +475,7 @@ func TestClientBzzWithFeed(t *testing.T) { } // upload data to bzz:// and retrieve the content-addressed manifest hash, hex-encoded. - manifestAddressHex, err := swarmClient.Upload(f, "", false, false, true, "") + manifestAddressHex, err := swarmClient.Upload(f, "", false, false, true) if err != nil { t.Fatalf("Error creating manifest: %s", err) } diff --git a/api/http/server.go b/api/http/server.go index 43b2884723..a988522bd0 100644 --- a/api/http/server.go +++ b/api/http/server.go @@ -21,6 +21,7 @@ package http import ( "bytes" + "context" "encoding/json" "fmt" "io" @@ -80,8 +81,9 @@ const ( AnonymousHeaderName = "x-swarm-anonymous" // Presence of this in header indicates only pull sync should be used for upload PinHeaderName = "x-swarm-pin" // Presence of this in header indicates pinning required - encryptAddr = "encrypt" - tarContentType = "application/x-tar" + encryptAddr = "encrypt" + tarContentType = "application/x-tar" + StatusEnhanceYourCalm = 420 ) type methodHandler map[string]http.Handler @@ -239,6 +241,8 @@ type Server struct { func (s *Server) HandleBzzGet(w http.ResponseWriter, r *http.Request) { log.Debug("handleBzzGet", "ruid", GetRUID(r.Context()), "uri", r.RequestURI) + publisher := r.URL.Query().Get("publisher") + r = r.WithContext(context.WithValue(r.Context(), "publisher", publisher)) if r.Header.Get("Accept") == tarContentType { uri := GetURI(r.Context()) _, credentials, _ := r.BasicAuth() @@ -249,6 +253,11 @@ func (s *Server) HandleBzzGet(w http.ResponseWriter, r *http.Request) { respondError(w, r, err.Error(), http.StatusUnauthorized) return } + if isRecoveryAttemptError(err) { + w.Header().Set("WWW-Authenticate", fmt.Sprintf("Basic realm=%q", uri.Address().String())) + respondError(w, r, err.Error(), StatusEnhanceYourCalm) + return + } respondError(w, r, fmt.Sprintf("Had an error building the tarball: %v", err), http.StatusInternalServerError) return } @@ -468,12 +477,8 @@ func (s *Server) handleTarUpload(r *http.Request, mw *api.ManifestWriter) (stora log.Debug("handle.tar.upload", "ruid", GetRUID(r.Context()), "tag", sctx.GetTag(r.Context())) defaultPath := r.URL.Query().Get("defaultpath") - var publisher string - if len(r.Header["Publisher"]) > 0 { - publisher = r.Header["Publisher"][0] - } - key, err := s.api.UploadTar(r.Context(), r.Body, GetURI(r.Context()).Path, defaultPath, mw, publisher) + key, err := s.api.UploadTar(r.Context(), r.Body, GetURI(r.Context()).Path, defaultPath, mw) if err != nil { return nil, err } @@ -839,6 +844,8 @@ func (s *Server) HandleGet(w http.ResponseWriter, r *http.Request) { // a list of all files contained in under grouped into // common prefixes using "/" as a delimiter func (s *Server) HandleGetList(w http.ResponseWriter, r *http.Request) { + publisher := r.URL.Query().Get("publisher") + r = r.WithContext(context.WithValue(r.Context(), "publisher", publisher)) ruid := GetRUID(r.Context()) uri := GetURI(r.Context()) _, credentials, _ := r.BasicAuth() @@ -867,6 +874,11 @@ func (s *Server) HandleGetList(w http.ResponseWriter, r *http.Request) { respondError(w, r, err.Error(), http.StatusUnauthorized) return } + if isRecoveryAttemptError(err) { + w.Header().Set("WWW-Authenticate", fmt.Sprintf("Basic realm=%q", addr.String())) + respondError(w, r, err.Error(), StatusEnhanceYourCalm) + return + } respondError(w, r, err.Error(), http.StatusInternalServerError) return } @@ -942,6 +954,11 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *http.Request) { respondError(w, r, err.Error(), http.StatusUnauthorized) return } + if isRecoveryAttemptError(err) { + w.Header().Set("WWW-Authenticate", fmt.Sprintf("Basic realm=%q", manifestAddr)) + respondError(w, r, err.Error(), StatusEnhanceYourCalm) + return + } switch status { case http.StatusNotFound: @@ -1176,3 +1193,7 @@ func (lrw *loggingResponseWriter) WriteHeader(code int) { func isDecryptError(err error) bool { return strings.Contains(err.Error(), api.ErrDecrypt.Error()) } + +func isRecoveryAttemptError(err error) bool { + return strings.Contains(err.Error(), storage.ErrRecoveryAttempt.Error()) +} diff --git a/api/manifest.go b/api/manifest.go index 10e24468e9..b07d78c430 100644 --- a/api/manifest.go +++ b/api/manifest.go @@ -57,7 +57,6 @@ type ManifestEntry struct { Status int `json:"status,omitempty"` Access *AccessEntry `json:"access,omitempty"` Feed *feed.Feed `json:"feed,omitempty"` - Publisher string `json:"publisher,omitempty"` } // ManifestList represents the result of listing files in a manifest @@ -243,7 +242,6 @@ func readManifest(mr storage.LazySectionReader, addr storage.Address, fileStore if err != nil { // size == 0 // can't determine size means we don't have the root chunk log.Trace("manifest not found", "addr", addr) - err = fmt.Errorf("Manifest not Found") return } if size > manifestSizeLimit { diff --git a/cmd/swarm-smoke/util.go b/cmd/swarm-smoke/util.go index e61490b271..f90fba1f4d 100644 --- a/cmd/swarm-smoke/util.go +++ b/cmd/swarm-smoke/util.go @@ -212,7 +212,7 @@ func uploadWithTag(data []byte, endpoint string, tag string) (string, error) { Tag: tag, } - return swarm.TarUpload("", &client.FileUploader{File: f}, "", false, false, true, "") + return swarm.TarUpload("", &client.FileUploader{File: f}, "", false, false, true) } func digest(r io.Reader) ([]byte, error) { diff --git a/cmd/swarm/download.go b/cmd/swarm/download.go index b3c9894539..6b43b69602 100644 --- a/cmd/swarm/download.go +++ b/cmd/swarm/download.go @@ -31,7 +31,7 @@ import ( var downloadCommand = cli.Command{ Action: download, Name: "down", - Flags: []cli.Flag{SwarmRecursiveFlag, SwarmAccessPasswordFlag}, + Flags: []cli.Flag{SwarmRecursiveFlag, SwarmAccessPasswordFlag, SwarmPublisher}, Usage: "downloads a swarm manifest or a file inside a manifest", ArgsUsage: " []", Description: `Downloads a swarm bzz uri to the given dir. When no dir is provided, working directory is assumed. --recursive flag is expected when downloading a manifest with multiple entries.`, @@ -60,6 +60,7 @@ func download(ctx *cli.Context) { bzzapi = strings.TrimRight(ctx.GlobalString(SwarmApiFlag.Name), "/") isRecursive = ctx.Bool(SwarmRecursiveFlag.Name) client = swarm.NewClient(bzzapi) + publisher = ctx.String(SwarmPublisher.Name) ) if fi, err := os.Stat(dest); err == nil { @@ -80,7 +81,7 @@ func download(ctx *cli.Context) { dl := func(credentials string) error { // assume behaviour according to --recursive switch if isRecursive { - if err := client.DownloadDirectory(uri.Addr, uri.Path, dest, credentials); err != nil { + if err := client.DownloadDirectory(uri.Addr, uri.Path, dest, credentials, publisher); err != nil { if err == swarm.ErrUnauthorized { return err } @@ -90,7 +91,7 @@ func download(ctx *cli.Context) { // we are downloading a file log.Debug("downloading file/path from a manifest", "uri.Addr", uri.Addr, "uri.Path", uri.Path) - err := client.DownloadFile(uri.Addr, uri.Path, dest, credentials) + err := client.DownloadFile(uri.Addr, uri.Path, dest, credentials, publisher) if err != nil { if err == swarm.ErrUnauthorized { return err diff --git a/cmd/swarm/flags.go b/cmd/swarm/flags.go index 5108804f42..6978cee56e 100644 --- a/cmd/swarm/flags.go +++ b/cmd/swarm/flags.go @@ -158,10 +158,6 @@ var ( Name: "stdin", Usage: "reads data to be uploaded from stdin", } - SwarmUploadPublisher = cli.StringFlag{ - Name: "publisher", - Usage: "Manually specify Publisher", - } SwarmUploadMimeType = cli.StringFlag{ Name: "mime", Usage: "Manually specify MIME type", @@ -269,4 +265,8 @@ var ( Name: "block-profile", Usage: "Enable pprof block profile", } + SwarmPublisher = cli.StringFlag{ + Name: "publisher", + Usage: "Manually specify content recovery publisher", + } ) diff --git a/cmd/swarm/list.go b/cmd/swarm/list.go index 1defdee8ea..311a9ef265 100644 --- a/cmd/swarm/list.go +++ b/cmd/swarm/list.go @@ -53,7 +53,7 @@ func list(ctx *cli.Context) { bzzapi := strings.TrimRight(ctx.GlobalString(SwarmApiFlag.Name), "/") client := swarm.NewClient(bzzapi) - list, err := client.List(manifest, prefix, "") + list, err := client.List(manifest, prefix, "", "") if err != nil { utils.Fatalf("Failed to generate file and directory list: %s", err) } diff --git a/cmd/swarm/upload.go b/cmd/swarm/upload.go index 91fcf78bc4..b7aeb4cc31 100644 --- a/cmd/swarm/upload.go +++ b/cmd/swarm/upload.go @@ -48,7 +48,7 @@ var ( Name: "up", Usage: "uploads a file or directory to swarm using the HTTP API", ArgsUsage: "", - Flags: []cli.Flag{SwarmEncryptedFlag, SwarmPinFlag, SwarmProgressFlag, SwarmVerboseFlag, SwarmUploadPublisher}, + Flags: []cli.Flag{SwarmEncryptedFlag, SwarmPinFlag, SwarmProgressFlag, SwarmVerboseFlag}, Description: "uploads a file or directory to swarm using the HTTP API and prints the root hash", } @@ -73,7 +73,6 @@ func upload(ctx *cli.Context) { defaultPath = ctx.GlobalString(SwarmUploadDefaultPath.Name) fromStdin = ctx.GlobalBool(SwarmUpFromStdinFlag.Name) mimeType = ctx.GlobalString(SwarmUploadMimeType.Name) - publisher = ctx.String(SwarmUploadPublisher.Name) verbose = ctx.Bool(SwarmVerboseFlag.Name) client = swarm.NewClient(bzzapi) toEncrypt = ctx.Bool(SwarmEncryptedFlag.Name) @@ -161,7 +160,7 @@ func upload(ctx *cli.Context) { defaultPath = strings.TrimPrefix(absDefaultPath, absFile) } } - return client.UploadDirectory(file, defaultPath, "", toEncrypt, toPin, anon, publisher) + return client.UploadDirectory(file, defaultPath, "", toEncrypt, toPin, anon) } } else { doUpload = func() (string, error) { @@ -173,7 +172,7 @@ func upload(ctx *cli.Context) { if mimeType != "" { f.ContentType = mimeType } - return client.Upload(f, "", toEncrypt, toPin, anon, publisher) + return client.Upload(f, "", toEncrypt, toPin, anon) } } start := time.Now() diff --git a/prod/prod.go b/prod/prod.go index b469f7ab8b..57cfca5d90 100644 --- a/prod/prod.go +++ b/prod/prod.go @@ -20,7 +20,6 @@ import ( "context" "encoding/json" "errors" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethersphere/swarm/chunk" @@ -132,8 +131,6 @@ func getFeedContent(ctx context.Context, handler feed.GenericHandler, topic feed User: user, } query := feed.NewQueryLatest(&fd, lookup.NoClue) - ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) - defer cancel() _, err := handler.Lookup(ctx, query) // feed should still be queried even if there are no updates diff --git a/storage/netstore.go b/storage/netstore.go index c66b4b508c..37eca5357a 100644 --- a/storage/netstore.go +++ b/storage/netstore.go @@ -44,7 +44,8 @@ const ( ) var ( - ErrNoSuitablePeer = errors.New("no suitable peer") + ErrNoSuitablePeer = errors.New("no suitable peer") + ErrRecoveryAttempt = errors.New("recovery was initiated") ) // Fetcher is a struct which maintains state of remote requests. @@ -176,6 +177,10 @@ func (n *NetStore) WithRecoveryCallback(f func(ctx context.Context, chunkAddress // If it is not found in the LocalStore then it uses RemoteGet to fetch from the network. func (n *NetStore) Get(ctx context.Context, mode chunk.ModeGet, req *Request) (ch Chunk, err error) { metrics.GetOrRegisterCounter("netstore/get", nil).Inc(1) + publisher, ok := ctx.Value("publisher").(string) + if ok { + log.Debug("netstore get publisher received", "publisher", publisher) + } start := time.Now() @@ -201,10 +206,10 @@ func (n *NetStore) Get(ctx context.Context, mode chunk.ModeGet, req *Request) (c if ok { ch, err = n.RemoteFetch(ctx, req, fi) if err != nil { - if n.recoveryCallback != nil { - n.recoveryCallback(ctx, ref) - time.Sleep(500 * time.Millisecond) // TODO: view what the ideal timeout is - return n.RemoteFetch(ctx, req, fi) + if n.recoveryCallback != nil && publisher != "" { + log.Debug("content recovery callback triggered", "ref", ref.String()) + go n.recoveryCallback(ctx, ref) + return nil, ErrRecoveryAttempt } return nil, err }