Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an HTTPSubscriber that allows subscribing to an append-only denylist #22

Merged
merged 15 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ content blocking to the go-ipfs stack and particularly to Kubo.
## Content-blocking in Kubo

1. Grab a plugin release from the [releases](https://github.com/ipfs-shipyard/nopfs/releases) section matching your Kubo version and install the plugin file in `~/.ipfs/plugins`.
2. Write a custom denylist file (see [syntax](#denylist-syntax) below) or simply download the [BadBits denylist](https://badbits.dwebops.pub/badbits.deny) and place them in `~/.config/ipfs/denylists/`.
2. Write a custom denylist file (see [syntax](#denylist-syntax) below) or simply download one of the supported denylists from [Denyli.st](https://denyli.st) and place them in `~/.config/ipfs/denylists/` (ensure `.deny` extension).
3. Start Kubo (`ipfs daemon`). The plugin should be loaded automatically and existing denylists tracked for updates from that point (no restarts required).

## Denylist syntax
Expand Down Expand Up @@ -82,6 +82,22 @@ hints:
//QmbK7LDv5NNBvYQzNfm2eED17SNLt1yNMapcUhSuNLgkqz
```

You can create double-hashes by hand with the following command:

```
printf "QmecDgNqCRirkc3Cjz9eoRBNwXGckJ9WvTdmY16HP88768/my/path" \
| ipfs add --raw-leaves --only-hash --quiet \
| ipfs cid format -f '%M' -b base58btc
```

where:
- `QmecDgNqCRirkc3Cjz9eoRBNwXGckJ9WvTdmY16HP88768` must always be a
CidV0. If you have a CIDv1 you need to convert it to CIDv0 first. i.e
`ipfs cid format -v0 bafybeihrw75yfhdx5qsqgesdnxejtjybscwuclpusvxkuttep6h7pkgmze`
- `/my/path` is optional depending on whether you want to block a specific path. No wildcards supported here!
- The command above should give `QmSju6XPmYLG611rmK7rEeCMFVuL6EHpqyvmEU6oGx3GR8`. Use it as `//QmSju6XPmYLG611rmK7rEeCMFVuL6EHpqyvmEU6oGx3GR8` on the denylist.


## Kubo plugin

NOpfs Kubo plugin pre-built binary releases are available in the
Expand Down
34 changes: 34 additions & 0 deletions cmd/httpsubs/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package main

import (
"fmt"
"os"
"os/signal"
"time"

"github.com/ipfs-shipyard/nopfs"
)

func main() {
if len(os.Args) != 3 {
fmt.Fprintln(os.Stderr, "Usage: program <local_denylist> <source_URL>")
os.Exit(1)
}

local := os.Args[1]
remote := os.Args[2]

fmt.Printf("%s: subscribed to %s. CTRL-C to stop\n", local, remote)

subscriber, err := nopfs.NewHTTPSubscriber(remote, local, 1*time.Minute)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
fmt.Println("Stopping")
subscriber.Stop()
}
126 changes: 58 additions & 68 deletions denylist.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,44 +194,19 @@ func (dl *Denylist) parseAndFollow(follow bool) error {
return err
}

// we will update N as we go after every line.
// Fixme: this is going to play weird as the buffered reader will
// read-ahead and consume N.
// we will update N as we go after every line. Fixme: this is
// going to play weird as the buffered reader will read-ahead
// and consume N.
limRdr := &io.LimitedReader{
R: dl.f,
N: maxLineSize,
}
r := bufio.NewReader(limRdr)

lineNumber := dl.Header.headerLines
for {
line, err := r.ReadString('\n')
// limit reader exhausted
if err == io.EOF && len(line) >= maxLineSize {
err = fmt.Errorf("line too long. %s:%d", dl.Filename, lineNumber+1)
logger.Error(err)
dl.Close()
return err
}
// keep waiting for data
if err == io.EOF {
break
}
if err != nil {
logger.Error(err)
return err
}

lineNumber++
if err := dl.parseLine(line, lineNumber); err != nil {
logger.Error(err)
}
limRdr.N = maxLineSize // reset

}
// we finished reading the file as it EOF'ed.
if !follow {
return nil
return dl.followLines(r, limRdr, lineNumber, nil)
}
// We now wait for new lines.

Expand Down Expand Up @@ -261,55 +236,70 @@ func (dl *Denylist) parseAndFollow(follow bool) error {
}
}

// Is this the right way of tailing a file? Pretty sure there are a
// bunch of gotchas. It seems to work when saving on top of a file
// though. Also, important that the limitedReader is there to avoid
// parsing a huge lines. Also, this could be done by just having
// watchers on the folder, but requires a small refactoring.
go func() {
line := ""
limRdr.N = maxLineSize // reset
go dl.followLines(r, limRdr, lineNumber, waitForWrite)
return nil
}

for {
partialLine, err := r.ReadString('\n')
line += partialLine

// limit reader exhausted
if err == io.EOF && limRdr.N == 0 {
err = fmt.Errorf("line too long. %s:%d", dl.Filename, lineNumber+1)
logger.Error(err)
dl.Close()
return
}
// keep waiting for data
if err == io.EOF {
err := waitForWrite()
// followLines reads lines from a buffered reader on top of a limited reader,
// that we reset on every line. This enforces line-length limits.
// If we pass a waitWrite() function, then it waits when finding EOF.
//
// Is this the right way of tailing a file? Pretty sure there are a
// bunch of gotchas. It seems to work when saving on top of a file
// though. Also, important that the limitedReader is there to avoid
// parsing a huge lines. Also, this could be done by just having
// watchers on the folder, but requires a small refactoring.
func (dl *Denylist) followLines(r *bufio.Reader, limRdr *io.LimitedReader, lineNumber uint64, waitWrite func() error) error {
line := ""
limRdr.N = maxLineSize // reset

for {
partialLine, err := r.ReadString('\n')

// limit reader exhausted
if err == io.EOF && limRdr.N == 0 {
err = fmt.Errorf("line too long. %s:%d", dl.Filename, lineNumber+1)
logger.Error(err)
dl.Close()
return err
}

// Record how much of a line we have
line += partialLine

if err == io.EOF {
if waitWrite != nil { // keep waiting
err := waitWrite()
if err != nil {
logger.Error(err)
dl.Close()
return
return err
}
continue
} else { // Finished
return nil
}
if err != nil {
logger.Error(err)
dl.Close()
return
}
}
if err != nil {
logger.Error(err)
dl.Close()
return err
}

lineNumber++
// we have read up to \n
if err := dl.parseLine(line, lineNumber); err != nil {
logger.Error(err)
// log error and continue with next line
// if we are here, no EOF, no error and ReadString()
// found an \n so we have a full line.

lineNumber++
// we have read up to \n
if err := dl.parseLine(line, lineNumber); err != nil {
logger.Error(err)
// log error and continue with next line

}
// reset for next line
line = ""
limRdr.N = 2 << 20 // reset
}
}()
return nil
// reset for next line
line = ""
limRdr.N = maxLineSize // reset
}
}

// parseLine processes every full-line read and puts it into the BlocksDB etc.
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/ipfs/boxo v0.13.1 h1:nQ5oQzcMZR3oL41REJDcTbrvDvuZh3J9ckc9+ILeRQI=
github.com/ipfs/boxo v0.13.1/go.mod h1:btrtHy0lmO1ODMECbbEY1pxNtrLilvKSYLoGQt1yYCk=
github.com/ipfs/boxo v0.13.2-0.20231012132507-6602207a8fa3 h1:sgrhALL6mBoZsNvJ2zUcITcN6IW3y14ej6w7gv5RcOI=
github.com/ipfs/boxo v0.13.2-0.20231012132507-6602207a8fa3/go.mod h1:btrtHy0lmO1ODMECbbEY1pxNtrLilvKSYLoGQt1yYCk=
github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s=
Expand Down
124 changes: 124 additions & 0 deletions subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package nopfs

import (
"errors"
"fmt"
"io"
"io/fs"
"net/http"
"os"
"time"
)

// HTTPSubscriber represents a type that subscribes to a remote URL and appends data to a local file.
type HTTPSubscriber struct {
remoteURL string
localFile string
interval time.Duration
stopChannel chan struct{}
}

// NewHTTPSubscriber creates a new Subscriber instance with the given parameters.
func NewHTTPSubscriber(remoteURL, localFile string, interval time.Duration) (*HTTPSubscriber, error) {
logger.Infof("Subscribing to remote denylist: %s", remoteURL)

sub := HTTPSubscriber{
remoteURL: remoteURL,
localFile: localFile,
interval: interval,
stopChannel: make(chan struct{}, 1),
}

_, err := os.Stat(localFile)
// if not found, we perform a first sync before returning.
// this is necessary as otherwise the Blocker does not find much
// of the file
if err != nil && errors.Is(err, fs.ErrNotExist) {
logger.Infof("Performing first sync on: %s", localFile)
err := sub.downloadAndAppend()
if err != nil {
return nil, err
}
} else if err != nil {
return nil, err
}

go sub.subscribe()

return &sub, nil
}

// subscribe starts the subscription process.
func (s *HTTPSubscriber) subscribe() {
timer := time.NewTimer(0)

for {
select {
case <-s.stopChannel:
logger.Infof("Stopping subscription on: %s", s.localFile)
if !timer.Stop() {
<-timer.C
}
return
case <-timer.C:
err := s.downloadAndAppend()
if err != nil {
logger.Error(err)
}
timer.Reset(s.interval)
}
}
}

// Stop stops the subscription process.
func (s *HTTPSubscriber) Stop() {
close(s.stopChannel)
}

func (s *HTTPSubscriber) downloadAndAppend() error {
localFile, err := os.OpenFile(s.localFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer localFile.Close()

// Get the file size of the local file
localFileInfo, err := localFile.Stat()
if err != nil {
return err
}

localFileSize := localFileInfo.Size()

// Create a HTTP GET request with the Range header to download only the missing bytes
req, err := http.NewRequest("GET", s.remoteURL, nil)
if err != nil {
return err
}

rangeHeader := fmt.Sprintf("bytes=%d-", localFileSize)
req.Header.Set("Range", rangeHeader)

logger.Debug("%s: requesting bytes from %d: %s", s.localFile, localFileSize, req.URL)

resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

switch {
case resp.StatusCode == http.StatusPartialContent:
_, err = io.Copy(localFile, resp.Body)
if err != nil {
return err
}
logger.Infof("%s: appended %d bytes", s.localFile, resp.ContentLength)
case (resp.StatusCode >= http.StatusBadRequest &&
resp.StatusCode != http.StatusRequestedRangeNotSatisfiable) ||
resp.StatusCode >= http.StatusInternalServerError:
return fmt.Errorf("%s: server returned with unexpected code %d", s.localFile, resp.StatusCode)
// error is ignored, we continued subscribed
}
return nil
}
Loading