Skip to content

Commit

Permalink
add a catch-all unknown metadata codec (#273)
Browse files Browse the repository at this point in the history
* add a catch-all unknown metadata codec + refactor metadata API
  • Loading branch information
willscott authored Oct 18, 2022
1 parent 0014174 commit 719c0df
Show file tree
Hide file tree
Showing 16 changed files with 176 additions and 66 deletions.
4 changes: 2 additions & 2 deletions cmd/provider/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var (
Before: beforeImportCar,
Action: doImportCar,
}
md metadata.Metadata
md = metadata.Default.New()
)

func beforeImportCar(cctx *cli.Context) error {
Expand Down Expand Up @@ -68,7 +68,7 @@ func beforeImportCar(cctx *cli.Context) error {
if err != nil {
return err
}
md = metadata.New(tp)
md = metadata.Default.New(tp)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/provider/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func indexCommand(cctx *cli.Context) error {
if err != nil {
return errors.New("metadata is not a valid base64 encoded string")
}
md = metadata.New()
md = metadata.Default.New()
err = md.UnmarshalBinary(decoded)
if err != nil {
return err
Expand Down
10 changes: 5 additions & 5 deletions e2e_retrieve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func testRetrievalRoundTripWithTestCase(t *testing.T, tc testCase) {
contextID := []byte("applesauce")
tp, err := cardatatransfer.TransportFromContextID(contextID)
require.NoError(t, err)
md := metadata.New(tp)
md := metadata.Default.New(tp)
advCid, err := server.cs.Put(ctx, contextID, filepath.Join(testutil.ThisDir(t), "./testdata/sample-v1-2.car"), md)
require.NoError(t, err)

Expand Down Expand Up @@ -183,7 +183,7 @@ func testReimportCarWtihTestCase(t *testing.T, tc testCase) {
contextID := []byte("applesauce")
tp, err := cardatatransfer.TransportFromContextID(contextID)
require.NoError(t, err)
md := metadata.New(tp)
md := metadata.Default.New(tp)
advCid, err := server.cs.Put(ctx, contextID, filepath.Join(testutil.ThisDir(t), "./testdata/sample-v1-2.car"), md)
require.NoError(t, err)

Expand All @@ -203,7 +203,7 @@ func testReimportCarWtihTestCase(t *testing.T, tc testCase) {
adv, err := schema.UnwrapAdvertisement(advNode)
require.NoError(t, err)

var receivedMd metadata.Metadata
receivedMd := metadata.Default.New()
err = receivedMd.UnmarshalBinary(adv.Metadata)
require.NoError(t, err)

Expand All @@ -216,7 +216,7 @@ func testReimportCarWtihTestCase(t *testing.T, tc testCase) {
contextID2 := []byte("applesauce2")
tp2, err := cardatatransfer.TransportFromContextID(contextID2)
require.NoError(t, err)
md2 := metadata.New(tp2)
md2 := metadata.Default.New(tp2)
advCid2, err := server.cs.Put(ctx, contextID, filepath.Join(testutil.ThisDir(t), "./testdata/sample-v1-2.car"), md2)
require.NoError(t, err)

Expand All @@ -235,7 +235,7 @@ func testReimportCarWtihTestCase(t *testing.T, tc testCase) {
adv2, err := schema.UnwrapAdvertisement(advNode2)
require.NoError(t, err)

var receivedMd2 metadata.Metadata
receivedMd2 := metadata.Default.New()
err = receivedMd2.UnmarshalBinary(adv2.Metadata)
require.NoError(t, err)

Expand Down
8 changes: 4 additions & 4 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ func (e *Engine) publishAdvForIndex(ctx context.Context, p peer.ID, addrs []mult

// The advertisement still requires a valid metadata even though
// metadata is not used for removal. Create a valid empty metadata.
md = metadata.New(metadata.Bitswap{})
md = metadata.Default.New()
}

mdBytes, err := md.MarshalBinary()
Expand Down Expand Up @@ -680,13 +680,13 @@ func (e *Engine) putKeyMetadataMap(ctx context.Context, provider peer.ID, contex
}

func (e *Engine) getKeyMetadataMap(ctx context.Context, provider peer.ID, contextID []byte) (metadata.Metadata, error) {
md := metadata.Default.New()
data, err := e.ds.Get(ctx, e.keyToMetadataKey(provider, contextID))
if err != nil {
return metadata.Metadata{}, err
return md, err
}
var md metadata.Metadata
if err := md.UnmarshalBinary(data); err != nil {
return metadata.Metadata{}, err
return md, err
}
return md, nil
}
Expand Down
36 changes: 18 additions & 18 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func Test_NewEngineWithNoPublisherAndRoot(t *testing.T) {
subject.RegisterMultihashLister(func(_ context.Context, _ peer.ID, _ []byte) (provider.MultihashIterator, error) {
return provider.SliceMultihashIterator(mhs), nil
})
adCid, err := subject.NotifyPut(ctx, nil, contextID, metadata.New(metadata.Bitswap{}))
adCid, err := subject.NotifyPut(ctx, nil, contextID, metadata.Default.New(metadata.Bitswap{}))
require.NoError(t, err)
require.NotNil(t, adCid)
require.NotEqual(t, adCid, cid.Undef)
Expand All @@ -86,7 +86,7 @@ func TestEngine_PublishLocal(t *testing.T) {
chunkLnk, err := subject.Chunker().Chunk(ctx, provider.SliceMultihashIterator(mhs))
require.NoError(t, err)

md := metadata.New(metadata.Bitswap{})
md := metadata.Default.New(metadata.Bitswap{})
mdBytes, err := md.MarshalBinary()
require.NoError(t, err)
wantAd := schema.Advertisement{
Expand Down Expand Up @@ -224,7 +224,7 @@ func TestEngine_PublishWithDataTransferPublisher(t *testing.T) {

chunkLnk, err := subject.Chunker().Chunk(ctx, provider.SliceMultihashIterator(mhs))
require.NoError(t, err)
md := metadata.New(metadata.Bitswap{})
md := metadata.Default.New(metadata.Bitswap{})
mdBytes, err := md.MarshalBinary()
require.NoError(t, err)

Expand Down Expand Up @@ -308,7 +308,7 @@ func TestEngine_NotifyPutWithoutListerIsError(t *testing.T) {
require.NoError(t, err)
defer subject.Shutdown()

gotCid, err := subject.NotifyPut(ctx, nil, []byte("fish"), metadata.New(metadata.Bitswap{}))
gotCid, err := subject.NotifyPut(ctx, nil, []byte("fish"), metadata.Default.New(metadata.Bitswap{}))
require.Error(t, err, provider.ErrNoMultihashLister)
require.Equal(t, cid.Undef, gotCid)
}
Expand All @@ -333,7 +333,7 @@ func TestEngine_NotifyPutThenNotifyRemove(t *testing.T) {
return nil, errors.New("not found")
})

gotPutAdCid, err := subject.NotifyPut(ctx, nil, wantContextID, metadata.New(metadata.Bitswap{}))
gotPutAdCid, err := subject.NotifyPut(ctx, nil, wantContextID, metadata.Default.New(metadata.Bitswap{}))
require.NoError(t, err)
require.NotEqual(t, cid.Undef, gotPutAdCid)

Expand Down Expand Up @@ -371,7 +371,7 @@ func TestEngine_NotifyRemoveWithDefaultProvider(t *testing.T) {
return nil, errors.New("not found")
})

_, err = subject.NotifyPut(ctx, nil, wantContextID, metadata.New(metadata.Bitswap{}))
_, err = subject.NotifyPut(ctx, nil, wantContextID, metadata.Default.New(metadata.Bitswap{}))
require.NoError(t, err)
gotRemoveAdCid, err := subject.NotifyRemove(ctx, "", wantContextID)
require.NoError(t, err)
Expand Down Expand Up @@ -406,7 +406,7 @@ func TestEngine_NotifyRemoveWithCustomProvider(t *testing.T) {
providerId := testutil.NewID(t)
providerAddrs, _ := multiaddr.NewMultiaddr("/ip4/0.0.0.0/tcp/1234/http")

_, err = subject.NotifyPut(ctx, &peer.AddrInfo{ID: providerId, Addrs: []multiaddr.Multiaddr{providerAddrs}}, wantContextID, metadata.New(metadata.Bitswap{}))
_, err = subject.NotifyPut(ctx, &peer.AddrInfo{ID: providerId, Addrs: []multiaddr.Multiaddr{providerAddrs}}, wantContextID, metadata.Default.New(metadata.Bitswap{}))
require.NoError(t, err)
gotRemoveAdCid, err := subject.NotifyRemove(ctx, providerId, wantContextID)
require.NoError(t, err)
Expand Down Expand Up @@ -447,7 +447,7 @@ func TestEngine_ProducesSingleChainForMultipleProviders(t *testing.T) {
return nil, errors.New("not found")
})

gotPutAdCid1, err := subject.NotifyPut(ctx, &peer.AddrInfo{ID: provider1id, Addrs: []multiaddr.Multiaddr{provider1Addrs}}, wantContextID1, metadata.New(metadata.Bitswap{}))
gotPutAdCid1, err := subject.NotifyPut(ctx, &peer.AddrInfo{ID: provider1id, Addrs: []multiaddr.Multiaddr{provider1Addrs}}, wantContextID1, metadata.Default.New(metadata.Bitswap{}))
require.NoError(t, err)
require.NotEqual(t, cid.Undef, gotPutAdCid1)

Expand All @@ -457,7 +457,7 @@ func TestEngine_ProducesSingleChainForMultipleProviders(t *testing.T) {
require.Equal(t, ad.Provider, provider1id.String())
require.Equal(t, ad.Addresses, []string{"/ip4/0.0.0.0/tcp/1234/http"})

gotPutAdCid2, err := subject.NotifyPut(ctx, &peer.AddrInfo{ID: provider2id, Addrs: []multiaddr.Multiaddr{provider2Addrs}}, wantContextID2, metadata.New(metadata.Bitswap{}))
gotPutAdCid2, err := subject.NotifyPut(ctx, &peer.AddrInfo{ID: provider2id, Addrs: []multiaddr.Multiaddr{provider2Addrs}}, wantContextID2, metadata.Default.New(metadata.Bitswap{}))
require.NoError(t, err)
require.NotEqual(t, cid.Undef, gotPutAdCid2)

Expand Down Expand Up @@ -490,7 +490,7 @@ func TestEngine_NotifyPutUseDefaultProviderAndAddressesWhenNoneGiven(t *testing.
})

// addresses should be ignored as provider is an empty string
gotPutAdCid1, err := subject.NotifyPut(ctx, nil, wantContextID, metadata.New(metadata.Bitswap{}))
gotPutAdCid1, err := subject.NotifyPut(ctx, nil, wantContextID, metadata.Default.New(metadata.Bitswap{}))
require.NoError(t, err)
require.NotEqual(t, cid.Undef, gotPutAdCid1)

Expand Down Expand Up @@ -521,15 +521,15 @@ func TestEngine_VerifyErrAlreadyAdvertised(t *testing.T) {
return nil, errors.New("not found")
})

gotPutAdCid1, err := subject.NotifyPut(ctx, nil, wantContextID, metadata.New(metadata.Bitswap{}))
gotPutAdCid1, err := subject.NotifyPut(ctx, nil, wantContextID, metadata.Default.New(metadata.Bitswap{}))
require.NoError(t, err)
require.NotEqual(t, cid.Undef, gotPutAdCid1)

_, err = subject.NotifyPut(ctx, nil, wantContextID, metadata.New(metadata.Bitswap{}))
_, err = subject.NotifyPut(ctx, nil, wantContextID, metadata.Default.New(metadata.Bitswap{}))
require.Error(t, err, provider.ErrAlreadyAdvertised)

p := testutil.NewID(t)
_, err = subject.NotifyPut(ctx, &peer.AddrInfo{ID: p}, wantContextID, metadata.New(metadata.Bitswap{}))
_, err = subject.NotifyPut(ctx, &peer.AddrInfo{ID: p}, wantContextID, metadata.Default.New(metadata.Bitswap{}))
require.NoError(t, err, provider.ErrAlreadyAdvertised)
}

Expand Down Expand Up @@ -558,12 +558,12 @@ func TestEngine_ShouldHaveSameChunksInChunkerForSameCIDs(t *testing.T) {
return nil, errors.New("not found")
})

gotPutAdCid1, err := subject.NotifyPut(ctx, &peer.AddrInfo{ID: provider1id, Addrs: []multiaddr.Multiaddr{provider1Addrs}}, wantContextID, metadata.New(metadata.Bitswap{}))
gotPutAdCid1, err := subject.NotifyPut(ctx, &peer.AddrInfo{ID: provider1id, Addrs: []multiaddr.Multiaddr{provider1Addrs}}, wantContextID, metadata.Default.New(metadata.Bitswap{}))
require.NoError(t, err)
require.NotEqual(t, cid.Undef, gotPutAdCid1)
require.Equal(t, 1, subject.Chunker().Len())

gotPutAdCid2, err := subject.NotifyPut(ctx, &peer.AddrInfo{ID: provider2id, Addrs: []multiaddr.Multiaddr{provider2Addrs}}, wantContextID, metadata.New(metadata.Bitswap{}))
gotPutAdCid2, err := subject.NotifyPut(ctx, &peer.AddrInfo{ID: provider2id, Addrs: []multiaddr.Multiaddr{provider2Addrs}}, wantContextID, metadata.Default.New(metadata.Bitswap{}))
require.NoError(t, err)
require.NotEqual(t, cid.Undef, gotPutAdCid2)
require.Equal(t, 1, subject.Chunker().Len())
Expand Down Expand Up @@ -631,7 +631,7 @@ func TestEngine_DatastoreBackwardsCompatibilityTest(t *testing.T) {
verifyAd(t, ctx, subject, createAd(t, []byte("byte"), "QmPxKFBM2A7VZURXZhZLCpEnhMFtZ7WSZwFLneFEiYneES", []string{"/ip6/::1/tcp/62698", "/ip4/192.168.1.161/tcp/62695", "/ip4/127.0.0.1/tcp/62695"}, "baguqeera7k7x5kayh2yzp44wq6cd3i2z24o5rxyyedkwtgmwkaq63npcig4q", false, ""), ad)

// try to create a deplicate to make sure that they are processed correctly against the previously created datastore
_, err = subject.NotifyPut(ctx, nil, []byte("tree"), metadata.New(metadata.Bitswap{}))
_, err = subject.NotifyPut(ctx, nil, []byte("tree"), metadata.Default.New(metadata.Bitswap{}))
require.Equal(t, provider.ErrAlreadyAdvertised, err)

mmap := make(map[string][]multihash.Multihash)
Expand All @@ -644,7 +644,7 @@ func TestEngine_DatastoreBackwardsCompatibilityTest(t *testing.T) {
})

// publishing new add for the default provider
adId, err := subject.NotifyPut(ctx, nil, []byte("pong"), metadata.New(metadata.Bitswap{}))
adId, err := subject.NotifyPut(ctx, nil, []byte("pong"), metadata.Default.New(metadata.Bitswap{}))
require.NoError(t, err)
ad, _ = subject.GetAdv(ctx, adId)
require.Equal(t, existingRoot, ad.PreviousID.(cidlink.Link).Cid)
Expand All @@ -655,7 +655,7 @@ func TestEngine_DatastoreBackwardsCompatibilityTest(t *testing.T) {

// try publishing for new provider
newPID := testutil.NewID(t)
_, err = subject.NotifyPut(ctx, &peer.AddrInfo{ID: newPID}, []byte("has"), metadata.New(metadata.Bitswap{}))
_, err = subject.NotifyPut(ctx, &peer.AddrInfo{ID: newPID}, []byte("has"), metadata.Default.New(metadata.Bitswap{}))
require.NoError(t, err)
}

Expand Down
2 changes: 1 addition & 1 deletion engine/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func Example_advertiseHelloWorld() {
fmt.Println("✓ Provider engine started.")

// Multiple transports can be included in metadata.
md := metadata.New(metadata.Bitswap{})
md := metadata.Default.New(metadata.Bitswap{})

// Note that this example publishes an ad with bitswap metadata as an example.
// But it does not instantiate a bitswap server to serve retrievals.
Expand Down
2 changes: 1 addition & 1 deletion engine/linksystem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/stretchr/testify/require"
)

var testMetadata = metadata.New(metadata.Bitswap{})
var testMetadata = metadata.Default.New(metadata.Bitswap{})

func Test_SchemaNoEntriesErr(t *testing.T) {
ctx := contextWithTimeout(t)
Expand Down
53 changes: 43 additions & 10 deletions metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type (
// Metadata is data that provides information about how to retrieve the advertised content.
// Note that the content may be avaiable for retrieval over multiple protocols.
Metadata struct {
mc *metadataContext
protocols []Protocol
}

Expand All @@ -44,9 +45,44 @@ type (
}
)

// metadataContext holds context for metadata serialization and deserialization.
type metadataContext struct {
protocols map[multicodec.Code]func() Protocol
}

// MetadataContext holds context for metadata serialization and deserialization.
type MetadataContext interface {
WithProtocol(id multicodec.Code, factory func() Protocol) MetadataContext
New(t ...Protocol) Metadata
}

var Default MetadataContext

func init() {
d := metadataContext{
protocols: make(map[multicodec.Code]func() Protocol),
}
d.protocols[multicodec.TransportBitswap] = func() Protocol { return &Bitswap{} }
d.protocols[multicodec.TransportGraphsyncFilecoinv1] = func() Protocol { return &GraphsyncFilecoinV1{} }
Default = &d
}

// WithProtocol dervies a new MetadataContext including the additional protocol mapping.
func (mc *metadataContext) WithProtocol(id multicodec.Code, factory func() Protocol) MetadataContext {
derived := metadataContext{
protocols: make(map[multicodec.Code]func() Protocol),
}
for k, v := range mc.protocols {
derived.protocols[k] = v
}
derived.protocols[id] = factory
return &derived
}

// New instantiates a new Metadata with the given transports.
func New(t ...Protocol) Metadata {
func (mc *metadataContext) New(t ...Protocol) Metadata {
metadata := Metadata{
mc: mc,
protocols: t,
}
sort.Sort(&metadata)
Expand Down Expand Up @@ -128,7 +164,7 @@ func (m *Metadata) UnmarshalBinary(data []byte) error {
return err
}
id := multicodec.Code(v)
t, err := newTransport(id)
t, err := m.mc.newTransport(id)
if err != nil {
return err
}
Expand Down Expand Up @@ -175,13 +211,10 @@ func protocolEqual(one, other Protocol) bool {
return bytes.Equal(oneBytes, otherBytes)
}

func newTransport(id multicodec.Code) (Protocol, error) {
switch id {
case multicodec.TransportBitswap:
return &Bitswap{}, nil
case multicodec.TransportGraphsyncFilecoinv1:
return &GraphsyncFilecoinV1{}, nil
default:
return nil, fmt.Errorf("unknwon transport id: %s", id.String())
func (mc *metadataContext) newTransport(id multicodec.Code) (Protocol, error) {
if factory, ok := mc.protocols[id]; ok {
return factory(), nil
}

return nil, fmt.Errorf("unknown transport id: %s", id.String())
}
Loading

0 comments on commit 719c0df

Please sign in to comment.