From 438d3dacb994f30d12d5e252d898a1f044645090 Mon Sep 17 00:00:00 2001 From: Wayback Archiver <66856220+waybackarchiver@users.noreply.github.com> Date: Sun, 25 Aug 2024 08:48:59 +0000 Subject: [PATCH] Add support for publish to database --- README.md | 12 +- cmd/wayback/main.go | 3 + cmd/wayback/serve.go | 26 +++- config/config_test.go | 180 ++++++++++++++++++++++++++++ config/options.go | 45 +++++++ config/parser.go | 8 ++ docs/environment.md | 4 + docs/integrations/datastore.md | 13 ++ docs/integrations/datastore.zh.md | 13 ++ go.mod | 1 + go.sum | 2 + ingress/register/publish.go | 1 + metrics/metrics.go | 1 + mkdocs.yml | 1 + publish/datastore/datastore.go | 74 ++++++++++++ publish/datastore/datastore_test.go | 15 +++ publish/datastore/doc.go | 5 + publish/datastore/setup.go | 27 +++++ publish/publish.go | 3 + service/discord/discord_test.go | 6 +- service/telegram/telegram_test.go | 9 +- storage/database.go | 83 +++++++++++++ storage/migrations.go | 40 +++++++ storage/storage.go | 44 ++++--- storage/storage_test.go | 14 ++- storage/telegram_test.go | 18 ++- storage/wayback.go | 59 +++++++++ wayback.1 | 17 +++ wayback.conf | 4 + 29 files changed, 692 insertions(+), 36 deletions(-) create mode 100644 docs/integrations/datastore.md create mode 100644 docs/integrations/datastore.zh.md create mode 100644 publish/datastore/datastore.go create mode 100644 publish/datastore/datastore_test.go create mode 100644 publish/datastore/doc.go create mode 100644 publish/datastore/setup.go create mode 100644 storage/database.go create mode 100644 storage/migrations.go create mode 100644 storage/wayback.go diff --git a/README.md b/README.md index c0f5ae86..2656d7ea 100644 --- a/README.md +++ b/README.md @@ -117,17 +117,19 @@ Examples: Flags: --chatid string Telegram channel id -c, --config string Configuration file path, defaults: ./wayback.conf, ~/wayback.conf, /etc/wayback.conf - -d, --daemon strings Run as daemon service, supported services are telegram, web, mastodon, twitter, discord, slack, irc, xmpp + -d, --daemon strings Run as daemon service, supported services are telegram, web, mastodon, twitter, discord, slack, irc --debug Enable debug mode (default mode is false) + --ga Wayback webpages to Ghostarchive (default true) -h, --help help for wayback - --ia Wayback webpages to Internet Archive + --ia Wayback webpages to Internet Archive (default true) --info Show application information - --ip Wayback webpages to IPFS + --ip Wayback webpages to IPFS (default true) --ipfs-host string IPFS daemon host, do not require, unless enable ipfs (default "127.0.0.1") -m, --ipfs-mode string IPFS mode (default "pinner") -p, --ipfs-port uint IPFS daemon port (default 5001) - --is Wayback webpages to Archive Today - --ph Wayback webpages to Telegraph + --is Wayback webpages to Archive Today (default true) + --migrate Run SQL migrations + --ph Wayback webpages to Telegraph (default true) --print Show application configurations -t, --token string Telegram Bot API Token --tor Snapshot webpage via Tor anonymity network diff --git a/cmd/wayback/main.go b/cmd/wayback/main.go index 7ec8aa9f..8040468d 100644 --- a/cmd/wayback/main.go +++ b/cmd/wayback/main.go @@ -45,6 +45,8 @@ var ( configFile string + migrate bool + rootCmd = &cobra.Command{ Use: "wayback", Short: "A command-line tool and daemon service for archiving webpages.", @@ -88,6 +90,7 @@ func init() { rootCmd.Flags().BoolVarP(&debug, "debug", "", false, "Enable debug mode (default mode is false)") rootCmd.Flags().BoolVarP(&info, "info", "", false, "Show application information") rootCmd.Flags().BoolVarP(&print, "print", "", false, "Show application configurations") + rootCmd.Flags().BoolVarP(&migrate, "migrate", "", false, "Run SQL migrations") } func checkRequiredFlags(cmd *cobra.Command) error { diff --git a/cmd/wayback/serve.go b/cmd/wayback/serve.go index 40a49ba4..9b812cb8 100644 --- a/cmd/wayback/serve.go +++ b/cmd/wayback/serve.go @@ -26,12 +26,36 @@ import ( var signalChan chan (os.Signal) = make(chan os.Signal, 1) func serve(_ *cobra.Command, opts *config.Options, _ []string) { - store, err := storage.Open(opts, "") + db, err := storage.NewConnectionPool( + opts.DatabaseURL(), + opts.DatabaseMinConns(), + opts.DatabaseMaxConns(), + opts.DatabaseConnectionLifetime(), + ) + if err != nil { + logger.Fatal("unable to connect to database: %v", err) + } + defer db.Close() + + bolt, err := storage.Open(opts, "") if err != nil { logger.Fatal("open storage failed: %v", err) } + store := storage.NewStorage(db, bolt) defer store.Close() + if !opts.IsDefaultDatabaseURL() { + if err = store.Ping(); err != nil { + logger.Fatal("ping database failed: %v", err) + } + + if migrate { + if err = storage.Migrate(db); err != nil { + logger.Fatal("migrate database failed: %v", err) + } + } + } + cfg := []pooling.Option{ pooling.Capacity(opts.PoolingSize()), pooling.Timeout(opts.WaybackTimeout()), diff --git a/config/config_test.go b/config/config_test.go index a3dd2c13..782e428a 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -243,6 +243,186 @@ func TestIPFSMode(t *testing.T) { } } +func TestDatabaseURL(t *testing.T) { + var tests = []struct { + url string + expected string + }{ + { + url: defDatabaseURL, + expected: defDatabaseURL, + }, + { + url: "foo bar", + expected: "foo bar", + }, + } + + for i, test := range tests { + t.Run(strconv.Itoa(i), func(t *testing.T) { + os.Clearenv() + os.Setenv("WAYBACK_DATABASE_URL", test.url) + + parser := NewParser() + opts, err := parser.ParseEnvironmentVariables() + if err != nil { + t.Fatalf(`Parsing environment variables failed: %v`, err) + } + + expected := test.expected + got := opts.DatabaseURL() + + if got != expected { + t.Errorf(`Unexpected database URL, got %v instead of %s`, got, expected) + } + }) + } +} + +func TestIsDefaultDatabaseURL(t *testing.T) { + var tests = []struct { + url string + expected bool + }{ + { + url: defDatabaseURL, + expected: true, + }, + { + url: "foo bar", + expected: false, + }, + } + + for i, test := range tests { + t.Run(strconv.Itoa(i), func(t *testing.T) { + os.Clearenv() + os.Setenv("WAYBACK_DATABASE_URL", test.url) + + parser := NewParser() + opts, err := parser.ParseEnvironmentVariables() + if err != nil { + t.Fatalf(`Parsing environment variables failed: %v`, err) + } + + expected := test.expected + got := opts.IsDefaultDatabaseURL() + + if got != expected { + t.Errorf(`Unexpected default database URL, got %t instead of %t`, got, expected) + } + }) + } +} + +func TestDatabaseMaxConns(t *testing.T) { + var tests = []struct { + maxConns int + expected int + }{ + { + maxConns: defDatabaseMaxConns, + expected: defDatabaseMaxConns, + }, + { + maxConns: 100, + expected: 100, + }, + } + + for i, test := range tests { + t.Run(strconv.Itoa(i), func(t *testing.T) { + os.Clearenv() + os.Setenv("WAYBACK_DATABASE_MAX_CONNS", strconv.Itoa(test.maxConns)) + + parser := NewParser() + opts, err := parser.ParseEnvironmentVariables() + if err != nil { + t.Fatalf(`Parsing environment variables failed: %v`, err) + } + + expected := test.expected + got := opts.DatabaseMaxConns() + + if got != expected { + t.Errorf(`Unexpected maxConns, got %v instead of %d`, got, expected) + } + }) + } +} + +func TestDatabaseMinConns(t *testing.T) { + var tests = []struct { + minConns int + expected int + }{ + { + minConns: defDatabaseMinConns, + expected: defDatabaseMinConns, + }, + { + minConns: 100, + expected: 100, + }, + } + + for i, test := range tests { + t.Run(strconv.Itoa(i), func(t *testing.T) { + os.Clearenv() + os.Setenv("WAYBACK_DATABASE_MIN_CONNS", strconv.Itoa(test.minConns)) + + parser := NewParser() + opts, err := parser.ParseEnvironmentVariables() + if err != nil { + t.Fatalf(`Parsing environment variables failed: %v`, err) + } + + expected := test.expected + got := opts.DatabaseMinConns() + + if got != expected { + t.Errorf(`Unexpected minConns, got %v instead of %d`, got, expected) + } + }) + } +} + +func TestDatabaseConnectionLifetime(t *testing.T) { + var tests = []struct { + connectionLifetime int + expected time.Duration + }{ + { + connectionLifetime: defDatabaseConnectionLifetime, + expected: defDatabaseConnectionLifetime * time.Minute, + }, + { + connectionLifetime: 100, + expected: 100 * time.Minute, + }, + } + + for i, test := range tests { + t.Run(strconv.Itoa(i), func(t *testing.T) { + os.Clearenv() + os.Setenv("WAYBACK_DATABASE_CONNECTION_LIFETIME", strconv.Itoa(test.connectionLifetime)) + + parser := NewParser() + opts, err := parser.ParseEnvironmentVariables() + if err != nil { + t.Fatalf(`Parsing environment variables failed: %v`, err) + } + + expected := test.expected + got := opts.DatabaseConnectionLifetime() + + if got != expected { + t.Errorf(`Unexpected connection lifetime, got %v instead of %d`, got, expected) + } + }) + } +} + func TestIPFSTarget(t *testing.T) { var tests = []struct { token string // managed ipfs token diff --git a/config/options.go b/config/options.go index 22dd655e..b0fbed18 100644 --- a/config/options.go +++ b/config/options.go @@ -104,6 +104,12 @@ const ( defOmnivoreApikey = "" + defRunMigrations = false + defDatabaseURL = "user=postgres password=postgres dbname=wayback sslmode=disable" + defDatabaseMaxConns = 20 + defDatabaseMinConns = 1 + defDatabaseConnectionLifetime = 5 + maxAttachSizeTelegram = 50000000 // 50MB maxAttachSizeDiscord = 8000000 // 8MB maxAttachSizeSlack = 5000000000 // 5GB @@ -131,6 +137,7 @@ type Options struct { ipfs *ipfs slots map[string]bool + database *database telegram *telegram mastodon *mastodon discord *discord @@ -160,6 +167,13 @@ type Options struct { waybackFallback bool } +type database struct { + url string + maxConns int + minConns int + connectionLifetime int +} + type ipfs struct { host string port int @@ -282,6 +296,12 @@ func NewOptions() *Options { waybackMaxRetries: defWaybackMaxRetries, waybackUserAgent: defWaybackUserAgent, waybackFallback: defWaybackFallback, + database: &database{ + url: defDatabaseURL, + maxConns: defDatabaseMaxConns, + minConns: defDatabaseMinConns, + connectionLifetime: defDatabaseConnectionLifetime, + }, ipfs: &ipfs{ host: defIPFSHost, port: defIPFSPort, @@ -443,6 +463,31 @@ func (o *Options) EnabledMetrics() bool { return o.metrics } +// IsDefaultDatabaseURL returns true if the default database URL is used. +func (o *Options) IsDefaultDatabaseURL() bool { + return o.database.url == defDatabaseURL +} + +// DatabaseURL returns the database URL. +func (o *Options) DatabaseURL() string { + return o.database.url +} + +// DatabaseMaxConns returns the maximum number of database connections. +func (o *Options) DatabaseMaxConns() int { + return o.database.maxConns +} + +// DatabaseMinConns returns the minimum number of database connections. +func (o *Options) DatabaseMinConns() int { + return o.database.minConns +} + +// DatabaseConnectionLifetime returns the maximum amount of time a connection may be reused. +func (o *Options) DatabaseConnectionLifetime() time.Duration { + return time.Duration(o.database.connectionLifetime) * time.Minute +} + // IPFSHost returns the host of IPFS daemon service. func (o *Options) IPFSHost() string { return o.ipfs.host diff --git a/config/parser.go b/config/parser.go index d3ede970..f1545ee5 100644 --- a/config/parser.go +++ b/config/parser.go @@ -95,6 +95,14 @@ func (p *Parser) parseLines(lines []string) (err error) { p.opts.chromeRemoteAddr = parseString(val, defChromeRemoteAddr) case "WAYBACK_PROXY": p.opts.proxy = parseString(val, defProxy) + case "WAYBACK_DATABASE_URL": + p.opts.database.url = parseString(val, defDatabaseURL) + case "WAYBACK_DATABASE_MAX_CONNS": + p.opts.database.maxConns = parseInt(val, defDatabaseMaxConns) + case "WAYBACK_DATABASE_MIN_CONNS": + p.opts.database.minConns = parseInt(val, defDatabaseMinConns) + case "WAYBACK_DATABASE_CONNECTION_LIFETIME": + p.opts.database.connectionLifetime = parseInt(val, defDatabaseConnectionLifetime) case "WAYBACK_IPFS_HOST": p.opts.ipfs.host = parseString(val, defIPFSHost) case "WAYBACK_IPFS_PORT": diff --git a/docs/environment.md b/docs/environment.md index 6c0116ab..d495c9b3 100644 --- a/docs/environment.md +++ b/docs/environment.md @@ -41,6 +41,10 @@ Use the `-c` / `--config` option to specify the build definition file to use. | - | `WAYBACK_MEILI_INDEXING` | `capsules` | Meilisearch indexing name | | - | `WAYBACK_MEILI_APIKEY` | - | Meilisearch admin API key | | - | `WAYBACK_OMNIVORE_APIKEY` | - | Omnivore API key | +| - | `WAYBACK_DATABASE_URL` | - | The URL of the Postgres database | +| - | `WAYBACK_DATABASE_MAX_CONNS` | `20` | Maximum connections of the Postgres database | +| - | `WAYBACK_DATABASE_MIN_CONNS` | `1` | Minimum connections of the Postgres database | +| - | `WAYBACK_DATABASE_CONNECTION_LIFETIME` | `5` | Connection lifetime of the Postgres database | | `-d`, `--daemon` | - | - | Run as daemon service, e.g. `telegram`, `web`, `mastodon`, `twitter`, `discord` | | `--ia` | `WAYBACK_ENABLE_IA` | `true` | Wayback webpages to **Internet Archive** | | `--is` | `WAYBACK_ENABLE_IS` | `true` | Wayback webpages to **Archive Today** | diff --git a/docs/integrations/datastore.md b/docs/integrations/datastore.md new file mode 100644 index 00000000..a18154bf --- /dev/null +++ b/docs/integrations/datastore.md @@ -0,0 +1,13 @@ +--- +title: Publish to Database +--- + +Note: Only Postgres is supported. + +## Configuration + +- `WAYBACK_DATABASE_URL`: The URL of the Postgres database, e.g. `user=postgres password=postgres dbname=wayback sslmode=disable`. +- `WAYBACK_DATABASE_MAX_CONNS`: Maximum connections of the Postgres database (optional). +- `WAYBACK_DATABASE_MIN_CONNS`: Minimum connections of the Postgres database (optional). +- `WAYBACK_DATABASE_CONNECTION_LIFETIME`: Connection lifetime of the Postgres database (optional). + diff --git a/docs/integrations/datastore.zh.md b/docs/integrations/datastore.zh.md new file mode 100644 index 00000000..02c962b1 --- /dev/null +++ b/docs/integrations/datastore.zh.md @@ -0,0 +1,13 @@ +--- +title: 发布到数据库 +--- + +注意:仅支持 Postgres。 + +## 配置 + +- `WAYBACK_DATABASE_URL`: Postgres 数据库的 URL,例如 `user=postgres password=postgres dbname=wayback sslmode=disable`。 +- `WAYBACK_DATABASE_MAX_CONNS`: Postgres 数据库的最大连接数(可选)。 +- `WAYBACK_DATABASE_MIN_CONNS`: Postgres 数据库的最小连接数(可选)。 +- `WAYBACK_DATABASE_CONNECTION_LIFETIME`: Postgres 数据库的连接生命周期(可选)。 + diff --git a/go.mod b/go.mod index 942471c9..ab6c6c16 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/iawia002/lux v0.17.0 github.com/ipsn/go-libtor v1.0.380 github.com/jedib0t/go-pretty/v6 v6.4.0 + github.com/lib/pq v1.10.6 github.com/mattn/go-mastodon v0.0.5-0.20210515144304-86627ec7d635 github.com/nbd-wtf/go-nostr v0.17.1-0.20230426111250-32ca737acf77 github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d diff --git a/go.sum b/go.sum index 6bc19ae4..143c3696 100644 --- a/go.sum +++ b/go.sum @@ -286,6 +286,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/ledongthuc/pdf v0.0.0-20220302134840-0c2507a12d80 h1:6Yzfa6GP0rIo/kULo2bwGEkFvCePZ3qHDDTC3/J9Swo= github.com/ledongthuc/pdf v0.0.0-20220302134840-0c2507a12d80/go.mod h1:imJHygn/1yfhB7XSJJKlFZKl/J+dCPAknuiaGOshXAs= github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= +github.com/lib/pq v1.10.6 h1:jbk+ZieJ0D7EVGJYpL9QTz7/YW6UHbmdnZWYyK5cdBs= +github.com/lib/pq v1.10.6/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8= github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg= github.com/libp2p/go-flow-metrics v0.1.0 h1:0iPhMI8PskQwzh57jB9WxIuIOQ0r+15PChFGkx3Q3WM= diff --git a/ingress/register/publish.go b/ingress/register/publish.go index c1ebe788..64364f66 100644 --- a/ingress/register/publish.go +++ b/ingress/register/publish.go @@ -5,6 +5,7 @@ package register // import "github.com/wabarc/wayback/ingress/register" import ( + _ "github.com/wabarc/wayback/publish/datastore" _ "github.com/wabarc/wayback/publish/discord" _ "github.com/wabarc/wayback/publish/github" _ "github.com/wabarc/wayback/publish/mastodon" diff --git a/metrics/metrics.go b/metrics/metrics.go index 3aee1ed4..d7d6875b 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -38,6 +38,7 @@ const ( PublishNostr = "nostr" PublishMeili = "meili" PublishOmnivore = "omnivore" + PublishDatabase = "database" StatusRequest = "request" StatusSuccess = "success" diff --git a/mkdocs.yml b/mkdocs.yml index 9cd06172..9ea3fdd8 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -60,6 +60,7 @@ nav: - Nostr: 'integrations/nostr.md' - Meilisearch: 'integrations/meilisearch.md' - Omnivore: 'integrations/omnivore.md' + - Datastore: 'integrations/datastore.md' - Playback: 'integrations/playback.md' - 'Internet Archive': 'integrations/internet-archive.md' - 'Archive.today': 'integrations/archive-today.md' diff --git a/publish/datastore/datastore.go b/publish/datastore/datastore.go new file mode 100644 index 00000000..1562a905 --- /dev/null +++ b/publish/datastore/datastore.go @@ -0,0 +1,74 @@ +// Copyright 2024 Wayback Archiver. All rights reserved. +// Use of this source code is governed by the GNU GPL v3 +// license that can be found in the LICENSE file. + +package datastore // import "github.com/wabarc/wayback/publish/datastore" + +import ( + "context" + + "github.com/wabarc/logger" + "github.com/wabarc/wayback" + "github.com/wabarc/wayback/config" + "github.com/wabarc/wayback/errors" + "github.com/wabarc/wayback/metrics" + "github.com/wabarc/wayback/publish" + "github.com/wabarc/wayback/reduxer" + "github.com/wabarc/wayback/storage" +) + +// Interface guard +var _ publish.Publisher = (*Datastore)(nil) + +type Datastore struct { + bot *storage.Storage + opts *config.Options +} + +// New returns a Datastore struct. +func New(store *storage.Storage, opts *config.Options) *Datastore { + if opts.IsDefaultDatabaseURL() { + logger.Debug("Datastore integration WAYBACK_DATABASE_URL is required") + return nil + } + + if store == nil { + db, err := storage.NewConnectionPool( + opts.DatabaseURL(), + opts.DatabaseMinConns(), + opts.DatabaseMaxConns(), + opts.DatabaseConnectionLifetime(), + ) + if err != nil { + logger.Fatal("unable to connect to database: %v", err) + } + + store = storage.NewStorage(db, nil) + } + + return &Datastore{bot: store, opts: opts} +} + +// Publish save url to the datastore of the given cols and args. +func (d *Datastore) Publish(ctx context.Context, _ reduxer.Reduxer, cols []wayback.Collect, args ...string) error { + metrics.IncrementPublish(metrics.PublishDatabase, metrics.StatusRequest) + + if len(cols) == 0 { + metrics.IncrementPublish(metrics.PublishDatabase, metrics.StatusFailure) + return errors.New("publish to datastore: collects empty") + } + + err := d.bot.CreateWayback(ctx, cols) + if err != nil { + metrics.IncrementPublish(metrics.PublishDatabase, metrics.StatusFailure) + return err + } + + metrics.IncrementPublish(metrics.PublishDatabase, metrics.StatusSuccess) + return nil +} + +// Shutdown shuts down the datastore publish service, it always return a nil error. +func (d *Datastore) Shutdown() error { + return d.bot.Close() +} diff --git a/publish/datastore/datastore_test.go b/publish/datastore/datastore_test.go new file mode 100644 index 00000000..6a8408f4 --- /dev/null +++ b/publish/datastore/datastore_test.go @@ -0,0 +1,15 @@ +// Copyright 2024 Wayback Archiver. All rights reserved. +// Use of this source code is governed by the GNU GPL v3 +// license that can be found in the LICENSE file. + +package datastore // import "github.com/wabarc/wayback/publish/datastore" + +import ( + "testing" +) + +func TestPublish(t *testing.T) { +} + +func TestShutdown(t *testing.T) { +} diff --git a/publish/datastore/doc.go b/publish/datastore/doc.go new file mode 100644 index 00000000..8a908e8d --- /dev/null +++ b/publish/datastore/doc.go @@ -0,0 +1,5 @@ +// Copyright 2024 Wayback Archiver. All rights reserved. +// Use of this source code is governed by the GNU GPL v3 +// license that can be found in the LICENSE file. + +package datastore // import "github.com/wabarc/wayback/publish/datastore" diff --git a/publish/datastore/setup.go b/publish/datastore/setup.go new file mode 100644 index 00000000..f105e541 --- /dev/null +++ b/publish/datastore/setup.go @@ -0,0 +1,27 @@ +// Copyright 2024 Wayback Archiver. All rights reserved. +// Use of this source code is governed by the GNU GPL v3 +// license that can be found in the LICENSE file. + +package datastore // import "github.com/wabarc/wayback/publish/datastore" + +import ( + "github.com/wabarc/wayback/config" + "github.com/wabarc/wayback/publish" +) + +func init() { + publish.Register(publish.FlagDatabase, setup) +} + +func setup(opts *config.Options) *publish.Module { + if !opts.IsDefaultDatabaseURL() { + publisher := New(nil, opts) + + return &publish.Module{ + Publisher: publisher, + Opts: opts, + } + } + + return nil +} diff --git a/publish/publish.go b/publish/publish.go index bd0e83e1..11b667d0 100644 --- a/publish/publish.go +++ b/publish/publish.go @@ -33,6 +33,7 @@ const ( FlagGitHub // FlagGitHub is a flag for github publish service FlagMeili // FlagMeili is a flag for meilisearch publish service FlagOmnivore // FlagOmnivore is a flag for Omnivore publish service + FlagDatabase // FlagDatabase is a flag for database store publish service ) // Publisher is the interface that wraps the basic Publish method. @@ -75,6 +76,8 @@ func (f Flag) String() string { return "meilisearch" case FlagOmnivore: return "omnivore" + case FlagDatabase: + return "database" default: return "unknown" } diff --git a/service/discord/discord_test.go b/service/discord/discord_test.go index a5545bd3..2d142c9b 100644 --- a/service/discord/discord_test.go +++ b/service/discord/discord_test.go @@ -192,10 +192,11 @@ func TestServe(t *testing.T) { go pool.Roll() dbpath := filepath.Join(t.TempDir(), "testing.db") - store, err := storage.Open(opts, dbpath) + db, err := storage.Open(opts, dbpath) if err != nil { t.Fatalf("open storage failed: %v", err) } + store := storage.NewStorage(nil, db) defer store.Close() pub := publish.New(ctx, opts) @@ -235,10 +236,11 @@ func TestProcess(t *testing.T) { opts.EnableServices(config.ServiceDiscord.String()) dbpath := filepath.Join(t.TempDir(), "testing.db") - store, err := storage.Open(opts, dbpath) + db, err := storage.Open(opts, dbpath) if err != nil { t.Fatalf("open storage failed: %v", err) } + store := storage.NewStorage(nil, db) defer store.Close() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) diff --git a/service/telegram/telegram_test.go b/service/telegram/telegram_test.go index a155f8ed..ef6c73af 100644 --- a/service/telegram/telegram_test.go +++ b/service/telegram/telegram_test.go @@ -189,10 +189,13 @@ func newTelegram(client *http.Client, opts *config.Options, endpoint string) (tg return tg, nil, err } - store, e := storage.Open(opts, "") - if e != nil { - return tg, nil, e + db, err := storage.Open(opts, "") + if err != nil { + return tg, nil, err } + store := storage.NewStorage(nil, db) + defer store.Close() + cfg := []pooling.Option{ pooling.Capacity(opts.PoolingSize()), pooling.Timeout(opts.WaybackTimeout()), diff --git a/storage/database.go b/storage/database.go new file mode 100644 index 00000000..bfa491a3 --- /dev/null +++ b/storage/database.go @@ -0,0 +1,83 @@ +// Copyright 2024 Wayback Archiver. All rights reserved. +// Use of this source code is governed by the GNU GPL v3 +// license that can be found in the LICENSE file. + +package storage // import "github.com/wabarc/wayback/storage" + +import ( + "database/sql" + "fmt" + "time" + + "github.com/wabarc/wayback/config" + + _ "github.com/lib/pq" + bolt "go.etcd.io/bbolt" +) + +// Open open a bolt database on current directory in given path. +// It is the caller's responsibility to close it. +func Open(opts *config.Options, path string) (*bolt.DB, error) { + if path == "" { + path = opts.BoltPathname() + } + db, err := bolt.Open(path, 0600, nil) + if err != nil { + return nil, fmt.Errorf("open bolt database failed: %v", err) + } + return db, nil +} + +// NewConnectionPool configures the database connection pool. +func NewConnectionPool(dsn string, minConnections, maxConnections int, connectionLifetime time.Duration) (*sql.DB, error) { + db, err := sql.Open("postgres", dsn) + if err != nil { + return nil, err + } + + db.SetMaxOpenConns(maxConnections) + db.SetMaxIdleConns(minConnections) + db.SetConnMaxLifetime(connectionLifetime) + + return db, nil +} + +// Migrate executes database migrations. +// nolint: errcheck +func Migrate(db *sql.DB) error { + var currentVersion int + err := db.QueryRow(`SELECT version FROM schema_version`).Scan(¤tVersion) + if err != nil { + return fmt.Errorf("store: migrate failed: %v", err) + } + + for version := currentVersion; version < schemaVersion; version++ { + newVersion := version + 1 + + tx, err := db.Begin() + if err != nil { + return fmt.Errorf("[Migration v%d] %v", newVersion, err) + } + + if err := migrations[version](tx); err != nil { + tx.Rollback() + return fmt.Errorf("[Migration v%d] %v", newVersion, err) + } + + if _, err := tx.Exec(`DELETE FROM schema_version`); err != nil { + tx.Rollback() + return fmt.Errorf("[Migration v%d] %v", newVersion, err) + } + + if _, err := tx.Exec(`INSERT INTO schema_version (version) VALUES ($1)`, newVersion); err != nil { + tx.Rollback() + return fmt.Errorf("[Migration v%d] %v", newVersion, err) + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("[Migration v%d] %v", newVersion, err) + } + } + + return nil +} diff --git a/storage/migrations.go b/storage/migrations.go new file mode 100644 index 00000000..36b1bbf6 --- /dev/null +++ b/storage/migrations.go @@ -0,0 +1,40 @@ +// Copyright 2024 Wayback Archiver. All rights reserved. +// Use of this source code is governed by the GNU GPL v3 +// license that can be found in the LICENSE file. + +package storage // import "github.com/wabarc/wayback/storage" + +import ( + "database/sql" +) + +var schemaVersion = len(migrations) + +// Order is important. Add new migrations at the end of the list. +var migrations = []func(tx *sql.Tx) error{ + func(tx *sql.Tx) (err error) { + sql := ` + CREATE TABLE schema_version ( + version text not null + ); + + CREATE TABLE wayback ( + id bigserial not null, + source text not null, + created_at timestamp with time zone not null default now(), + primary key (id) + ); + + CREATE TABLE archives ( + id bigserial not null, + wayback_id bigint not null, + slot varchar(255) not null default '', + dest text not null default '', + primary key (id), + foreign key (wayback_id) references wayback(id) on delete cascade + ); + ` + _, err = tx.Exec(sql) + return err + }, +} diff --git a/storage/storage.go b/storage/storage.go index cf028d10..edf0f5d9 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -5,11 +5,11 @@ package storage // import "github.com/wabarc/wayback/storage" import ( + "context" + "database/sql" "encoding/binary" - - "github.com/wabarc/logger" - "github.com/wabarc/wayback/config" - "github.com/wabarc/wayback/errors" + "errors" + "time" bolt "go.etcd.io/bbolt" ) @@ -19,28 +19,34 @@ var ErrDatabaseNotFound = errors.New("database not found") // Storage handles all operations related to the database. type Storage struct { db *bolt.DB + ds *sql.DB } -// Open a bolt database on current directory in given path. -// It is the caller's responsibility to close it. -func Open(opts *config.Options, path string) (*Storage, error) { - if path == "" { - path = opts.BoltPathname() - } - db, err := bolt.Open(path, 0600, nil) - if err != nil { - logger.Fatal("open bolt database failed: %v", err) - return nil, err - } - return &Storage{db: db}, nil +// NewStorage returns a new Storage. It is the caller's responsibility to close it. +func NewStorage(ds *sql.DB, db *bolt.DB) *Storage { + return &Storage{db: db, ds: ds} } // Close the bolt database -func (s *Storage) Close() error { +func (s *Storage) Close() (err error) { if s.db != nil { - return s.db.Close() + err = errors.Join(s.db.Close(), err) + } + if s.ds != nil { + err = errors.Join(s.ds.Close(), err) } - return ErrDatabaseNotFound + if err != nil { + return err + } + return nil +} + +// Ping checks if the database connection works. +func (s *Storage) Ping() error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + return s.ds.PingContext(ctx) } func itob(v int) []byte { diff --git a/storage/storage_test.go b/storage/storage_test.go index a86736e7..bb95513e 100644 --- a/storage/storage_test.go +++ b/storage/storage_test.go @@ -12,7 +12,7 @@ import ( "github.com/wabarc/wayback/config" ) -func TestOpen(t *testing.T) { +func TestNewStorage(t *testing.T) { tests := []struct { name string path string @@ -34,10 +34,14 @@ func TestOpen(t *testing.T) { } defer os.Remove(opts.BoltPathname()) - s, err := Open(opts, file) + db, err := Open(opts, file) if err != nil { t.Fatalf("failed to open database: %v", err) } + s := NewStorage(nil, db) + if err != nil { + t.Fatalf("failed to new storage: %v", err) + } defer s.db.Close() if s == nil { @@ -53,10 +57,14 @@ func TestOpen(t *testing.T) { func TestClose(t *testing.T) { file := path.Join(t.TempDir(), "bolt.db") opts := &config.Options{} - s, err := Open(opts, file) + db, err := Open(opts, file) if err != nil { t.Fatalf("failed to open database: %v", err) } + s := NewStorage(nil, db) + if err != nil { + t.Fatalf("failed to new storage: %v", err) + } err = s.Close() if err != nil { diff --git a/storage/telegram_test.go b/storage/telegram_test.go index c85f2c45..5dd0ba33 100644 --- a/storage/telegram_test.go +++ b/storage/telegram_test.go @@ -19,10 +19,14 @@ func TestCreatePlayback(t *testing.T) { t.Fatalf("Parse environment variables or flags failed, error: %v", err) } - s, err := Open(opts, path.Join(t.TempDir(), "wayback.db")) + db, err := Open(opts, path.Join(t.TempDir(), "wayback.db")) if err != nil { t.Fatalf("Unexpected open a bolt db: %v", err) } + s := NewStorage(nil, db) + if err != nil { + t.Fatalf("failed to new storage: %v", err) + } defer s.Close() pb := &entity.Playback{Source: ":wayback https://example.com"} @@ -39,10 +43,14 @@ func TestPlayback(t *testing.T) { t.Fatalf("Parse environment variables or flags failed, error: %v", err) } - s, err := Open(opts, path.Join(t.TempDir(), "wayback.db")) + db, err := Open(opts, path.Join(t.TempDir(), "wayback.db")) if err != nil { t.Fatalf("Unexpected open a bolt db: %v", err) } + s := NewStorage(nil, db) + if err != nil { + t.Fatalf("failed to new storage: %v", err) + } defer s.Close() dt := ":wayback https://example.com" @@ -71,10 +79,14 @@ func TestRemovePlayback(t *testing.T) { t.Fatalf("Parse environment variables or flags failed, error: %v", err) } - s, err := Open(opts, path.Join(t.TempDir(), "wayback.db")) + db, err := Open(opts, path.Join(t.TempDir(), "wayback.db")) if err != nil { t.Fatalf("Unexpected open a bolt db: %v", err) } + s := NewStorage(nil, db) + if err != nil { + t.Fatalf("failed to new storage: %v", err) + } defer s.Close() if s.RemovePlayback(0) != nil { diff --git a/storage/wayback.go b/storage/wayback.go new file mode 100644 index 00000000..ae5af607 --- /dev/null +++ b/storage/wayback.go @@ -0,0 +1,59 @@ +// Copyright 2024 Wayback Archiver. All rights reserved. +// Use of this source code is governed by the GNU GPL v3 +// license that can be found in the LICENSE file. + +package storage // import "github.com/wabarc/wayback/storage" + +import ( + "context" + "database/sql" + "fmt" + + "github.com/wabarc/helper" + "github.com/wabarc/wayback" +) + +func (s *Storage) CreateWayback(ctx context.Context, cols []wayback.Collect) error { + if len(cols) == 0 { + return fmt.Errorf("store: cols missing") + } + + tx, err := s.ds.Begin() + if err != nil { + return fmt.Errorf("store: unable to begin transaction: %v", err) + } + + var id int64 + query := `INSERT INTO wayback (source) VALUES ($1) RETURNING id` + err = tx.QueryRowContext(ctx, query, cols[0].Src).Scan(&id) + if err != nil { + if err = tx.Rollback(); err != nil { + return fmt.Errorf("store: unable to rollback transaction: %v", err) + } + return fmt.Errorf("store: unable to create wayback: %v", err) + } + + for _, col := range cols { + if !helper.IsURL(col.Dst) { + continue + } + err = s.createArchives(ctx, tx, col, id) + if err != nil { + if err = tx.Rollback(); err != nil { + return fmt.Errorf("store: unable to rollback transaction: %v", err) + } + return fmt.Errorf("store: create archives failed: %v", err) + } + } + + return tx.Commit() +} + +func (s *Storage) createArchives(ctx context.Context, tx *sql.Tx, col wayback.Collect, wayback_id int64) error { + query := `INSERT INTO archives (wayback_id, slot, dest) VALUES ($1, $2, $3)` + _, err := tx.ExecContext(ctx, query, wayback_id, col.Arc, col.Dst) + if err != nil { + return fmt.Errorf("store: unable to create archive: %v", err) + } + return nil +} diff --git a/wayback.1 b/wayback.1 index 6c57f0e4..f0be4c1c 100644 --- a/wayback.1 +++ b/wayback.1 @@ -89,6 +89,11 @@ https://github.com/wabarc/ipfs-pinner\&. IPFS daemon port. default 5001\&. .RE .PP +.B \-\-migrate +.RS 4 +Run database migrations\&. +.RE +.PP .B \-t, \-\-token .RS 4 Telegram Bot API Token\&. @@ -192,6 +197,18 @@ Meilisearch admin API key.\&. .B WAYBACK_OMNIVORE_APIKEY Omnivore API key.\&. .TP +.B WAYBACK_DATABASE_URL +The URL of the Postgres database.\&. +.TP +.B WAYBACK_DATABASE_MAX_CONNS +Maximum connections of the Postgres database.\&. +.TP +.B WAYBACK_DATABASE_MIN_CONNS +Minimum connections of the Postgres database.\&. +.TP +.B WAYBACK_DATABASE_CONNECTION_LIFETIME +Connection lifetime of the Postgres database.\&. +.TP .B WAYBACK_BOLT_PATH File path of bolt database. default ./wayback.db\&. .TP diff --git a/wayback.conf b/wayback.conf index 3dc5f333..24aad414 100644 --- a/wayback.conf +++ b/wayback.conf @@ -6,6 +6,10 @@ WAYBACK_ENABLE_IA=true WAYBACK_ENABLE_IS=true WAYBACK_ENABLE_IP=false WAYBACK_ENABLE_PH=false +WAYBACK_DATABASE_URL=user=postgres password=postgres dbname=wayback sslmode=disable +WAYBACK_DATABASE_MAX_CONNS=20 +WAYBACK_DATABASE_MIN_CONNS=1 +WAYBACK_DATABASE_CONNECTION_LIFETIME=5 WAYBACK_IPFS_HOST=127.0.0.1 WAYBACK_IPFS_PORT=5001 WAYBACK_IPFS_MODE=pinner