Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

market: Real-world fixes #319

Open
wants to merge 96 commits into
base: feat/market
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
96 commits
Select commit Hold shift + click to select a range
878d472
market: Improve indexing logging
magik6k Nov 6, 2024
cadbad6
Add missing ipni peerid constraint
magik6k Nov 6, 2024
8b05e61
cuhttp: allow external tls termination
magik6k Nov 7, 2024
f3bf486
better errors in indexing
magik6k Nov 7, 2024
8ba33aa
don't error on nil errors
magik6k Nov 7, 2024
6090d04
indexing: pass correct errors
magik6k Nov 7, 2024
7b8412e
indexing: More aggresive retry
magik6k Nov 7, 2024
c1b087d
indexing: More conservative batch size
magik6k Nov 7, 2024
94b2881
ipni: log index rate
magik6k Nov 7, 2024
cc2a139
indexing: Schedule ipni first
magik6k Nov 7, 2024
4b5ce67
indexing: Bump cql params
magik6k Nov 7, 2024
e3f7a1a
indexing: ab test sorted inserts
magik6k Nov 7, 2024
a241542
nope
magik6k Nov 7, 2024
323c315
indexing: Less consistency
magik6k Nov 8, 2024
3f0d183
ipni: Fix prev parse in serve
magik6k Nov 8, 2024
3c2afd8
ipni: Set ad addresses when serving
magik6k Nov 8, 2024
883c238
ipni: Bump some log levels
magik6k Nov 8, 2024
267787d
ipni: debugging chunk reader
magik6k Nov 8, 2024
bea66fa
Export dagstore metrics
magik6k Nov 8, 2024
cbfcb6f
remotebstore: Fix metrics
magik6k Nov 8, 2024
6382414
export metrics properly this time
magik6k Nov 8, 2024
40472f2
tune min read size
magik6k Nov 8, 2024
274e663
indexing: Try a completely different schema
magik6k Nov 10, 2024
8079187
use bufio for faster indexing readers
magik6k Nov 10, 2024
c2184be
debug get piecehashrange
magik6k Nov 10, 2024
3f8601b
more ipni debug
magik6k Nov 10, 2024
c7fca6b
ipni from-car also needs next node
magik6k Nov 10, 2024
acd6851
check numblocks in indexstore
magik6k Nov 10, 2024
b05f3bc
flag for ipni chunk debug
magik6k Nov 10, 2024
96a901c
indexing: Fix offset checks
magik6k Nov 11, 2024
1480b09
fastparamfetch: Improve aria opts
magik6k Nov 11, 2024
74dd728
http is bad
magik6k Nov 12, 2024
4c4264e
ipni: Announce correct address
magik6k Nov 12, 2024
129a9a1
make gen
magik6k Nov 12, 2024
de7e5b7
ipni/dealfilter fixes
magik6k Nov 14, 2024
e88c15b
more dealfilter fixes
magik6k Nov 14, 2024
586ef42
fix http header marshal
magik6k Nov 16, 2024
d473622
fix max concurrent deal size backpressure
magik6k Nov 16, 2024
db679ff
fix null error on deal status check
magik6k Nov 17, 2024
6e36bb6
fix dealstatus/sector insert
magik6k Nov 17, 2024
71909c7
fix panic in sector alloc
magik6k Nov 17, 2024
c7e9385
make gen
magik6k Nov 17, 2024
0f99dbd
fix seal start duration
magik6k Nov 17, 2024
4628a7c
deal ui fixes
magik6k Nov 17, 2024
35f7c3e
improve piece state webui
magik6k Nov 17, 2024
38be43b
fix how indexing tasks are created
magik6k Nov 17, 2024
8c8d216
webui: move task page under pages
magik6k Nov 17, 2024
e6e0025
webui: task run page
magik6k Nov 17, 2024
60717bf
webui: make sector page load fast
magik6k Nov 17, 2024
baca744
webui: Better snap page
magik6k Nov 17, 2024
692d4f5
webui: Add snap data to the sector page
magik6k Nov 17, 2024
7c8e063
webui: make piece page readable
magik6k Nov 17, 2024
ed2fb9b
webui: show multiple deals on the piece page
magik6k Nov 17, 2024
ab418de
webui: Show related pipelines in task page
magik6k Nov 18, 2024
407f1c7
webui: nicer deal list
magik6k Nov 18, 2024
ed078a4
market: Fix sector to deal assignment
magik6k Nov 18, 2024
4ba98d1
webui: Fix deal page pefore publish
magik6k Nov 18, 2024
9af311d
webui: Improve deal status display
magik6k Nov 18, 2024
04c74a9
market: Fix sector offset finding
magik6k Nov 19, 2024
3a22bde
webui: show piecepark on the piece page
magik6k Nov 19, 2024
77cd326
urlpiecereader: Improve errors
magik6k Nov 19, 2024
e41c50e
webui: Advanced task display on piece page
magik6k Nov 19, 2024
7f0a28e
webui: Small deal page improvements
magik6k Nov 19, 2024
d093fee
webui: Fix deal page on sealed deals
magik6k Nov 19, 2024
865aaa7
market: Correctly mark deals as sealed
magik6k Nov 19, 2024
ebbd3ec
indexing: Fix non-imported deal indexing
magik6k Nov 19, 2024
cf124df
ipni: Limit max from-car entry size
magik6k Nov 19, 2024
e989f11
ipni: Small entry cache to support retry after read timeout
magik6k Nov 19, 2024
76eaac5
ipni: Bump write timeout
magik6k Nov 19, 2024
1afbcde
ipni: Speculatively precompute next entry
magik6k Nov 19, 2024
0624532
ipni: More informative logs
magik6k Nov 20, 2024
7796976
market: Set deal sector regprooftype in ingest
magik6k Nov 20, 2024
e30ecf8
make gen
magik6k Nov 20, 2024
be604f4
fix snap deal ingest
magik6k Nov 20, 2024
8e5659e
webui: Show failed snap info on the sector page
magik6k Nov 20, 2024
a2f06ba
park piece: better retry backoff
magik6k Nov 20, 2024
918f143
webui: Improve the Storage Deals page
magik6k Nov 21, 2024
f4b3201
Reindexing
magik6k Nov 21, 2024
949632d
fix ipni retrievals
magik6k Nov 22, 2024
2dd6ae5
cleanup indexing tasks more correctly
magik6k Nov 22, 2024
55f66cb
ipni debug
magik6k Nov 22, 2024
65492b3
ipni: Detailed ad output, entry scan
magik6k Nov 22, 2024
93f6417
webui: better scanned entry select
magik6k Nov 22, 2024
68561f1
More ipni debugging
magik6k Nov 24, 2024
d108b48
libp2p: Handle boost transports proto
magik6k Nov 24, 2024
171b945
make gen
magik6k Nov 24, 2024
2d9f2ca
http: Add missing info endpoint
magik6k Nov 24, 2024
d05a830
ipni: Make db reads really work this time
magik6k Nov 24, 2024
77dc948
storage: Show reservations in expvar
magik6k Nov 25, 2024
5b97a53
storage debug; proposal cid in deals
magik6k Nov 25, 2024
fa7689d
webui: Show proposal CID
magik6k Nov 25, 2024
60c8054
webui: Nicer time format on tasks pages
magik6k Nov 26, 2024
ac84664
ipni: wip always index from db
magik6k Nov 26, 2024
d8cc0d5
ipni: Auto-repair missing Ads
magik6k Nov 26, 2024
34cc69f
webui: Fix IPNI status
magik6k Nov 26, 2024
5ce7137
webui: typo in ipni
magik6k Nov 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions cmd/curio/debug-ipni.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package main

import (
"bufio"
"errors"
"fmt"
"io"
"os"

"github.com/ipfs/go-cid"
carv2 "github.com/ipld/go-car/v2"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/curio/market/ipni/chunker"
)

var testDebugIpniChunks = &cli.Command{
Name: "ipni-piece-chunks",
Usage: "generate ipni chunks from a file",
Action: func(c *cli.Context) error {
ck := chunker.NewInitialChunker()

f, err := os.Open(c.Args().First())
if err != nil {
return xerrors.Errorf("opening file: %w", err)
}
defer f.Close()

opts := []carv2.Option{carv2.ZeroLengthSectionAsEOF(true)}
blockReader, err := carv2.NewBlockReader(bufio.NewReaderSize(f, 4<<20), opts...)
if err != nil {
return fmt.Errorf("getting block reader over piece: %w", err)
}

blockMetadata, err := blockReader.SkipNext()
for err == nil {
if err := ck.Accept(blockMetadata.Cid.Hash(), int64(blockMetadata.Offset), blockMetadata.Size+40); err != nil {
return xerrors.Errorf("accepting block: %w", err)
}

blockMetadata, err = blockReader.SkipNext()
}
if !errors.Is(err, io.EOF) {
return xerrors.Errorf("reading block: %w", err)
}

_, err = ck.Finish(c.Context, nil, cid.Undef)
if err != nil {
return xerrors.Errorf("chunking CAR multihash iterator: %w", err)
}

return nil
},
}
2 changes: 1 addition & 1 deletion cmd/curio/seal.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ var sealStartCmd = &cli.Command{
}

for _, n := range num {
_, err := tx.Exec("insert into sectors_sdr_pipeline (sp_id, sector_number, reg_seal_proof) values ($1, $2, $3)", mid, n, spt)
_, err := tx.Exec("insert into sectors_sdr_pipeline (sp_id, sector_number, reg_seal_proof, user_sector_duration_epochs) values ($1, $2, $3, $4)", mid, n, spt, userDuration)
if err != nil {
return false, xerrors.Errorf("inserting into sectors_sdr_pipeline: %w", err)
}
Expand Down
12 changes: 9 additions & 3 deletions cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
cfg.Subsystems.EnableBatchSeal ||
cfg.Subsystems.EnableUpdateEncode ||
cfg.Subsystems.EnableUpdateProve ||
cfg.Subsystems.EnableUpdateSubmit
cfg.Subsystems.EnableUpdateSubmit ||
cfg.Subsystems.EnableCommP

if hasAnySealingTask {
sealingTasks, err := addSealingTasks(ctx, hasAnySealingTask, db, full, sender, as, cfg, slrLazy, asyncParams, si, stor, bstore, machine)
Expand Down Expand Up @@ -237,7 +238,10 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
// PSD and Deal find task do not require many resources. They can run on all machines
psdTask := storage_market.NewPSDTask(dm, db, sender, as, &cfg.Market.StorageMarketConfig.MK12, full)
dealFindTask := storage_market.NewFindDealTask(dm, db, full, &cfg.Market.StorageMarketConfig.MK12)
activeTasks = append(activeTasks, psdTask, dealFindTask)

checkIndexesTask := indexing.NewCheckIndexesTask(db, iStore)

activeTasks = append(activeTasks, psdTask, dealFindTask, checkIndexesTask)

// Start libp2p hosts and handle streams
err = libp2p.NewDealProvider(ctx, db, cfg, dm.MK12Handler, full, sender, miners, machine)
Expand All @@ -250,7 +254,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task

indexingTask := indexing.NewIndexingTask(db, sc, iStore, pp, cfg, idxMax)
ipniTask := indexing.NewIPNITask(db, sc, iStore, pp, cfg, idxMax)
activeTasks = append(activeTasks, indexingTask, ipniTask)
activeTasks = append(activeTasks, ipniTask, indexingTask)

if cfg.HTTP.Enable {
err = cuhttp.StartHTTPServer(ctx, dependencies)
Expand All @@ -277,6 +281,8 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
}
go machineDetails(dependencies, activeTasks, ht.ResourcesAvailable().MachineID, dependencies.Name)

*dependencies.MachineID = int64(ht.ResourcesAvailable().MachineID)

if hasAnySealingTask {
watcher, err := message.NewMessageWatcher(db, ht, chainSched, full)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions cmd/curio/test-cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var testCmd = &cli.Command{
Subcommands: []*cli.Command{
//provingInfoCmd,
wdPostCmd,
testDebugCmd,
},
Before: func(cctx *cli.Context) error {
return nil
Expand Down
74 changes: 74 additions & 0 deletions cmd/curio/test-debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package main

import (
"encoding/json"
"fmt"
"github.com/filecoin-project/curio/deps"
"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/filecoin-project/curio/lib/reqcontext"
"github.com/filecoin-project/go-state-types/builtin/v9/market"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
)

var testDebugCmd = &cli.Command{
Name: "debug",
Usage: "Collection of debugging utilities",
Subcommands: []*cli.Command{
testDebugIpniChunks,
testDebugMigPcid,
},
}

var testDebugMigPcid = &cli.Command{
Name: "mig-pcid",
Action: func(cctx *cli.Context) error {
ctx := reqcontext.ReqContext(cctx)
dep, err := deps.GetDepsCLI(ctx, cctx)
if err != nil {
return err
}

_, err = dep.DB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
var bdeals []struct {
Prop json.RawMessage `db:"proposal"`
UUID string `db:"uuid"`
}

err = tx.Select(&bdeals, `SELECT
uuid,
proposal
FROM
market_mk12_deals`)
if err != nil {
return false, xerrors.Errorf("getting deals from db: %w", err)
}

for _, d := range bdeals {
var prop market.DealProposal
err = json.Unmarshal(d.Prop, &prop)
if err != nil {
return false, xerrors.Errorf("unmarshal proposal: %w", err)
}

pcid, err := prop.Cid()
if err != nil {
return false, xerrors.Errorf("get cid: %w", err)
}

fmt.Println(d.UUID, pcid)

n, err := tx.Exec(`UPDATE market_mk12_deals SET proposal_cid = $1 WHERE uuid = $2`, pcid, d.UUID)
if err != nil {
return false, xerrors.Errorf("update deals: %w", err)
}
if n == 0 {
return false, xerrors.Errorf("update deals: deal not found")
}
}
return true, nil
})

return err
},
}
38 changes: 23 additions & 15 deletions cuhttp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/tls"
"fmt"
"net/http"
"strings"
"time"

"github.com/CAFxX/httpcompression"
Expand Down Expand Up @@ -73,7 +74,7 @@ func compressionMiddleware(config *config.CompressionConfig) (func(http.Handler)
func libp2pConnMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Check if the request path is "/"
if r.URL.Path == "/" {
if r.URL.Path == "/" || r.URL.Path == "" {
// Check if the request is a WebSocket upgrade request
if isWebSocketUpgrade(r) {
// Rewrite the path to "/libp2p"
Expand All @@ -92,26 +93,18 @@ func isWebSocketUpgrade(r *http.Request) bool {
if r.Method != http.MethodGet {
return false
}
if r.Header.Get("Upgrade") != "websocket" {
if strings.ToLower(r.Header.Get("Upgrade")) != "websocket" {
return false
}
if r.Header.Get("Connection") != "Upgrade" {
if strings.ToLower(r.Header.Get("Connection")) != "upgrade" {
return false
}
return true
}

func StartHTTPServer(ctx context.Context, d *deps.Deps) error {
ch := cache{db: d.DB}
cfg := d.Cfg.HTTP

// Set up the autocert manager for Let's Encrypt
certManager := autocert.Manager{
Cache: ch,
Prompt: autocert.AcceptTOS, // Automatically accept the Terms of Service
HostPolicy: autocert.HostWhitelist(cfg.DomainName),
}

// Setup the Chi router for more complex routing (if needed in the future)
chiRouter := chi.NewRouter()

Expand Down Expand Up @@ -160,20 +153,35 @@ func StartHTTPServer(ctx context.Context, d *deps.Deps) error {
Addr: cfg.ListenAddress,
Handler: libp2pConnMiddleware(loggingMiddleware(compressionMw(chiRouter))), // Attach middlewares
ReadTimeout: cfg.ReadTimeout,
WriteTimeout: cfg.WriteTimeout,
WriteTimeout: time.Hour * 2,
IdleTimeout: cfg.IdleTimeout,
ReadHeaderTimeout: cfg.ReadHeaderTimeout,
TLSConfig: &tls.Config{
}

if !cfg.DelegateTLS {
// Set up the autocert manager for Let's Encrypt
certManager := autocert.Manager{
Cache: cache{db: d.DB},
Prompt: autocert.AcceptTOS, // Automatically accept the Terms of Service
HostPolicy: autocert.HostWhitelist(cfg.DomainName),
}

server.TLSConfig = &tls.Config{
GetCertificate: certManager.GetCertificate,
},
}
}

// We don't need to run an HTTP server. Any HTTP request should simply be handled as HTTPS.

// Start the server with TLS
go func() {
log.Infof("Starting HTTPS server for https://%s on %s", cfg.DomainName, cfg.ListenAddress)
serr := server.ListenAndServeTLS("", "")
var serr error
if !cfg.DelegateTLS {
serr = server.ListenAndServeTLS("", "")
} else {
serr = server.ListenAndServe()
}
if serr != nil {
log.Errorf("Failed to start HTTPS server: %s", serr)
panic(serr)
Expand Down
23 changes: 8 additions & 15 deletions deps/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 8 additions & 14 deletions deps/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func DefaultCurioConfig() *CurioConfig {
StorageMarketConfig: StorageMarketConfig{
PieceLocator: []PieceLocatorConfig{},
Indexing: IndexingConfig{
InsertConcurrency: 8,
InsertBatchSize: 15000,
InsertConcurrency: 10,
InsertBatchSize: 1000,
},
MK12: MK12Config{
PublishMsgPeriod: Duration(5 * time.Minute),
Expand All @@ -89,15 +89,13 @@ func DefaultCurioConfig() *CurioConfig {
IPNI: IPNIConfig{
ServiceURL: []string{"https://cid.contact"},
DirectAnnounceURLs: []string{"https://cid.contact/ingest/announce"},
AnnounceAddresses: []string{},
},
},
},
HTTP: HTTPConfig{
DomainName: "",
ListenAddress: "0.0.0.0:12310",
ReadTimeout: time.Second * 10,
WriteTimeout: time.Second * 10,
IdleTimeout: time.Minute * 2,
ReadHeaderTimeout: time.Second * 5,
EnableCORS: true,
Expand Down Expand Up @@ -588,10 +586,10 @@ type MK12Config struct {
// DisabledMiners is a list of miner addresses that should be excluded from online deal making protocols
DisabledMiners []string

// MaxConcurrentDealSize is a sum of all size of all deals which are waiting to be added to a sector
// MaxConcurrentDealSizeGiB is a sum of all size of all deals which are waiting to be added to a sector
// When the cumulative size of all deals in process reaches this number, new deals will be rejected.
// (Default: 0 = unlimited)
MaxConcurrentDealSize int64
MaxConcurrentDealSizeGiB int64

// DenyUnknownClients determines the default behaviour for the deal of clients which are not in allow/deny list
// If True then all deals coming from unknown clients will be rejected.
Expand Down Expand Up @@ -623,11 +621,6 @@ type IPNIConfig struct {
// The list of URLs of indexing nodes to announce to. This is a list of hosts we talk to tell them about new
// heads.
DirectAnnounceURLs []string

// AnnounceAddresses is a list of addresses indexer clients can use to reach to the HTTP market node.
// Curio allows running more than one node for HTTP server and thus all addressed can be announced
// simultaneously to the client. Example: ["https://mycurio.com", "http://myNewCurio:433/XYZ", "http://1.2.3.4:433"]
AnnounceAddresses []string
}

// HTTPConfig represents the configuration for an HTTP server.
Expand All @@ -642,12 +635,13 @@ type HTTPConfig struct {
// ListenAddress is the address that the server listens for HTTP requests.
ListenAddress string

// DelegateTLS allows the server to delegate TLS to a reverse proxy. When enabled the listen address will serve
// HTTP and the reverse proxy will handle TLS termination.
DelegateTLS bool

// ReadTimeout is the maximum duration for reading the entire or next request, including body, from the client.
ReadTimeout time.Duration

// WriteTimeout is the maximum duration before timing out writes of the response to the client.
WriteTimeout time.Duration

// IdleTimeout is the maximum duration of an idle session. If set, idle connections are closed after this duration.
IdleTimeout time.Duration

Expand Down
Loading