forked from WasabiAiR/stow
-
Notifications
You must be signed in to change notification settings - Fork 10
/
stow.go
297 lines (264 loc) · 8.99 KB
/
stow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
package stow
import (
"context"
"errors"
"io"
"net/url"
"sync"
"time"
)
var (
lock sync.RWMutex // protects locations, kinds and kindmatches
// kinds holds a list of location kinds.
kinds = []string{}
// locations is a map of installed location providers,
// supplying a function that creates a new instance of
// that Location.
locations = map[string]func(Config) (Location, error){}
// configurations is a map of installed location providers,
// supplying a function that validates the configuration
configurations = map[string]func(Config) error{}
// kindmatches is a slice of functions that take turns
// trying to match the kind of Location for a given
// URL. Functions return an empty string if it does not
// match.
kindmatches []func(*url.URL) string
)
var (
// ErrNotFound is returned when something could not be found.
ErrNotFound = errors.New("not found")
// ErrBadCursor is returned by paging methods when the specified
// cursor is invalid.
ErrBadCursor = errors.New("bad cursor")
)
var (
// CursorStart is a string representing a cursor pointing
// to the first page of items or containers.
CursorStart = ""
// NoPrefix is a string representing no prefix. It can be used
// in any function that asks for a prefix value, but where one is
// not appropriate.
NoPrefix = ""
)
// HttpMethod defines an alias type for string to represent http Methods. These are defined in RFC 7231 section 4.3.
type HttpMethod = string
//go:generate enumer --type=ClientMethod --trimprefix=ClientMethod -json
// ClientMethod defines common client methods across storage providers
type ClientMethod int
const (
ClientMethodGet ClientMethod = iota
ClientMethodPut
)
const FlyteContentMD5 = "flyteContentMD5"
// IsCursorEnd checks whether the cursor indicates there are no
// more items or not.
func IsCursorEnd(cursor string) bool {
return cursor == ""
}
// Location represents a storage location.
type Location interface {
io.Closer
// CreateContainer creates a new Container with the
// specified name.
CreateContainer(name string) (Container, error)
// Containers gets a page of containers
// with the specified prefix from this Location.
// The specified cursor is a pointer to the start of
// the containers to get. It it obtained from a previous
// call to this method, or should be CursorStart for the
// first page.
// count is the number of items to return per page.
// The returned cursor can be checked with IsCursorEnd to
// decide if there are any more items or not.
Containers(prefix string, cursor string, count int) ([]Container, string, error)
// Container gets the Container with the specified
// identifier.
Container(id string) (Container, error)
// RemoveContainer removes the container with the specified ID.
RemoveContainer(id string) error
// ItemByURL gets an Item at this location with the
// specified URL.
ItemByURL(url *url.URL) (Item, error)
}
type PresignRequestParams struct {
ExpiresIn time.Duration
ContentMD5 string
ExtraParams map[string]interface{}
HttpMethod HttpMethod
AddContentMD5Metadata bool
}
type PresignResponse struct {
Url string
RequiredRequestHeaders map[string]string
}
// Container represents a container.
type Container interface {
// ID gets a unique string describing this Container.
ID() string
// Name gets a human-readable name describing this Container.
Name() string
// Item gets an item by its ID.
Item(id string) (Item, error)
// Items gets a page of items with the specified
// prefix for this Container.
// The specified cursor is a pointer to the start of
// the items to get. It should be obtained from a previous
// call to this method, or should be CursorStart for the
// first page.
// count is the number of items to return per page.
// The returned cursor can be checked with IsCursorEnd to
// decide if there are any more items or not.
Items(prefix, cursor string, count int) ([]Item, string, error)
// RemoveItem removes the Item with the specified ID.
RemoveItem(id string) error
// Put creates a new Item with the specified name, and contents
// read from the reader.
Put(name string, r io.Reader, size int64, metadata map[string]interface{}) (Item, error)
// PreSignRequest generates a pre-signed url for the given id (key after bucket/container) and a given clientMethod.
PreSignRequest(ctx context.Context, clientMethod ClientMethod, id string, params PresignRequestParams) (response PresignResponse, err error)
}
// Item represents an item inside a Container.
// Such as a file.
type Item interface {
// ID gets a unique string describing this Item.
ID() string
// Name gets a human-readable name describing this Item.
Name() string
// URL gets a URL for this item.
// For example:
// local: file:///path/to/something
// azure: azure://host:port/api/something
// s3: s3://host:post/etc
URL() *url.URL
// Size gets the size of the Item's contents in bytes.
Size() (int64, error)
// Open opens the Item for reading.
// Calling code must close the io.ReadCloser.
Open() (io.ReadCloser, error)
// ETag is a string that is different when the Item is
// different, and the same when the item is the same.
// Usually this is the last modified datetime.
ETag() (string, error)
// LastMod returns the last modified date of the file.
LastMod() (time.Time, error)
// Metadata gets a map of key/values that belong
// to this Item.
Metadata() (map[string]interface{}, error)
}
// ItemRanger represents an item that can be partially downloaded.
type ItemRanger interface {
// OpenRange opens the item for reading starting at byte start and ending
// at byte end.
OpenRange(start, end uint64) (io.ReadCloser, error)
}
// Taggable represents a taggable Item
type Taggable interface {
// Tags returns a list of tags that belong to a given Item
Tags() (map[string]interface{}, error)
}
// Config represents key/value configuration.
type Config interface {
// Config gets a string configuration value and a
// bool indicating whether the value was present or not.
Config(name string) (string, bool)
// Set sets the configuration name to specified value
Set(name, value string)
}
// Register adds a Location implementation, with two helper functions.
// makefn should make a Location with the given Config.
// kindmatchfn should inspect a URL and return whether it represents a Location
// of this kind or not. Code can call KindByURL to get a kind string
// for any given URL and all registered implementations will be consulted.
// Register is usually called in an implementation package's init method.
func Register(kind string, makefn func(Config) (Location, error), kindmatchfn func(*url.URL) bool, validatefn func(Config) error) {
lock.Lock()
defer lock.Unlock()
// if already registered, leave
if _, ok := locations[kind]; ok {
return
}
locations[kind] = makefn
configurations[kind] = validatefn
kinds = append(kinds, kind)
kindmatches = append(kindmatches, func(u *url.URL) string {
if kindmatchfn(u) {
return kind // match
}
return "" // empty string means no match
})
}
// Dial gets a new Location with the given kind and
// configuration.
func Dial(kind string, config Config) (Location, error) {
fn, ok := locations[kind]
if !ok {
return nil, errUnknownKind(kind)
}
return fn(config)
}
// Validate validates the config for a location
func Validate(kind string, config Config) error {
fn, ok := configurations[kind]
if !ok {
return errUnknownKind(kind)
}
return fn(config)
}
// Kinds gets a list of installed location kinds.
func Kinds() []string {
lock.RLock()
defer lock.RUnlock()
return kinds
}
// KindByURL gets the kind represented by the given URL.
// It consults all registered locations.
// Error returned if no match is found.
func KindByURL(u *url.URL) (string, error) {
lock.RLock()
defer lock.RUnlock()
for _, fn := range kindmatches {
kind := fn(u)
if kind == "" {
continue
}
return kind, nil
}
return "", errUnknownKind("")
}
// ConfigMap is a map[string]string that implements
// the Config method.
type ConfigMap map[string]string
// Config gets a string configuration value and a
// bool indicating whether the value was present or not.
func (c ConfigMap) Config(name string) (string, bool) {
val, ok := c[name]
return val, ok
}
// Set sets name configuration to value
func (c ConfigMap) Set(name, value string) {
c[name] = value
}
// errUnknownKind indicates that a kind is unknown.
type errUnknownKind string
func (e errUnknownKind) Error() string {
s := string(e)
if len(s) > 0 {
return "stow: unknown kind \"" + string(e) + "\""
}
return "stow: unknown kind"
}
type errNotSupported string
func (e errNotSupported) Error() string {
return "not supported: " + string(e)
}
// IsNotSupported gets whether the error is due to
// a feature not being supported by a specific implementation.
func IsNotSupported(err error) bool {
_, ok := err.(errNotSupported)
return ok
}
// NotSupported gets an error describing the feature
// as not supported by this implementation.
func NotSupported(feature string) error {
return errNotSupported(feature)
}