Skip to content

Commit

Permalink
Merge pull request #106 from Altinity/backport/21.3_rework_22988_2397…
Browse files Browse the repository at this point in the history
…6_24311_24885_26249_27176_27484
  • Loading branch information
Enmk committed Oct 12, 2021
1 parent 134c281 commit c16bad8
Show file tree
Hide file tree
Showing 17 changed files with 6,559 additions and 54 deletions.
1 change: 1 addition & 0 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class IColumn;
M(UInt64, connections_with_failover_max_tries, DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, "The maximum number of attempts to connect to replicas.", 0) \
M(UInt64, s3_min_upload_part_size, 512*1024*1024, "The minimum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, s3_max_single_part_upload_size, 64*1024*1024, "The maximum size of object to upload using singlepart upload to S3.", 0) \
M(UInt64, s3_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \
M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \
M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \
M(Bool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \
Expand Down
52 changes: 38 additions & 14 deletions src/Disks/S3/DiskS3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,13 @@ DiskS3::Metadata DiskS3::createMeta(const String & path) const
class ReadIndirectBufferFromS3 final : public ReadBufferFromFileBase
{
public:
ReadIndirectBufferFromS3(
std::shared_ptr<Aws::S3::S3Client> client_ptr_, const String & bucket_, DiskS3::Metadata metadata_, size_t buf_size_)
: client_ptr(std::move(client_ptr_)), bucket(bucket_), metadata(std::move(metadata_)), buf_size(buf_size_)
explicit ReadIndirectBufferFromS3(
std::shared_ptr<Aws::S3::S3Client> client_ptr_, const String & bucket_, DiskS3::Metadata metadata_, size_t max_single_read_retries_, size_t buf_size_)
: client_ptr(std::move(client_ptr_))
, bucket(bucket_)
, metadata(std::move(metadata_))
, max_single_read_retries(max_single_read_retries_)
, buf_size(buf_size_)
{
}

Expand Down Expand Up @@ -296,7 +300,7 @@ class ReadIndirectBufferFromS3 final : public ReadBufferFromFileBase
const auto & [path, size] = metadata.s3_objects[i];
if (size > offset)
{
auto buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, metadata.s3_root_path + path, buf_size);
auto buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, metadata.s3_root_path + path, max_single_read_retries, buf_size);
buf->seek(offset, SEEK_SET);
return buf;
}
Expand All @@ -312,11 +316,11 @@ class ReadIndirectBufferFromS3 final : public ReadBufferFromFileBase
current_buf = initialize();

/// If current buffer has remaining data - use it.
if (current_buf && current_buf->next())
if (current_buf)
{
working_buffer = current_buf->buffer();
absolute_position += working_buffer.size();
return true;
bool result = nextAndShiftPosition();
if (result)
return true;
}

/// If there is no available buffers - nothing to read.
Expand All @@ -325,17 +329,32 @@ class ReadIndirectBufferFromS3 final : public ReadBufferFromFileBase

++current_buf_idx;
const auto & path = metadata.s3_objects[current_buf_idx].first;
current_buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, metadata.s3_root_path + path, buf_size);
current_buf->next();
working_buffer = current_buf->buffer();
absolute_position += working_buffer.size();

return true;
current_buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, metadata.s3_root_path + path, max_single_read_retries, buf_size);

return nextAndShiftPosition();
}

bool nextAndShiftPosition()
{
/// Transfer current position and working_buffer to actual ReadBuffer
swap(*current_buf);
/// Position and working_buffer will be updated in next() call
auto result = current_buf->next();
/// and assigned to current buffer.
swap(*current_buf);

/// absolute position is shifted by a data size that was read in next() call above.
if (result)
absolute_position += working_buffer.size();

return result;
}

std::shared_ptr<Aws::S3::S3Client> client_ptr;
const String & bucket;
DiskS3::Metadata metadata;
UInt64 max_single_read_retries;
size_t buf_size;

size_t absolute_position = 0;
Expand Down Expand Up @@ -549,6 +568,7 @@ DiskS3::DiskS3(
String bucket_,
String s3_root_path_,
String metadata_path_,
UInt64 max_single_read_retries_,
size_t min_upload_part_size_,
size_t max_single_part_upload_size_,
size_t min_bytes_for_seek_,
Expand All @@ -562,6 +582,7 @@ DiskS3::DiskS3(
, bucket(std::move(bucket_))
, s3_root_path(std::move(s3_root_path_))
, metadata_path(std::move(metadata_path_))
, max_single_read_retries(max_single_read_retries_)
, min_upload_part_size(min_upload_part_size_)
, max_single_part_upload_size(max_single_part_upload_size_)
, min_bytes_for_seek(min_bytes_for_seek_)
Expand Down Expand Up @@ -659,7 +680,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, si
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Read from file by path: {}. Existing S3 objects: {}",
backQuote(metadata_path + path), metadata.s3_objects.size());

auto reader = std::make_unique<ReadIndirectBufferFromS3>(client, bucket, metadata, buf_size);
auto reader = std::make_unique<ReadIndirectBufferFromS3>(client, bucket, metadata, max_single_read_retries, buf_size);
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), min_bytes_for_seek);
}

Expand Down Expand Up @@ -916,6 +937,9 @@ void DiskS3::createFileOperationObject(const String & operation_name, UInt64 rev

void DiskS3::startup()
{
/// Need to be enabled if it was disabled during shutdown() call.
client->EnableRequestProcessing();

if (!send_metadata)
return;

Expand Down
2 changes: 2 additions & 0 deletions src/Disks/S3/DiskS3.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class DiskS3 : public IDisk
String bucket_,
String s3_root_path_,
String metadata_path_,
UInt64 max_single_read_retries_,
size_t min_upload_part_size_,
size_t max_single_part_upload_size_,
size_t min_bytes_for_seek_,
Expand Down Expand Up @@ -158,6 +159,7 @@ class DiskS3 : public IDisk
const String bucket;
const String s3_root_path;
const String metadata_path;
UInt64 max_single_read_retries;
size_t min_upload_part_size;
size_t max_single_part_upload_size;
size_t min_bytes_for_seek;
Expand Down
1 change: 1 addition & 0 deletions src/Disks/S3/registerDiskS3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ void registerDiskS3(DiskFactory & factory)
uri.bucket,
uri.key,
metadata_path,
context.getSettingsRef().s3_max_single_read_retries,
context.getSettingsRef().s3_min_upload_part_size,
context.getSettingsRef().s3_max_single_part_upload_size,
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
Expand Down
79 changes: 62 additions & 17 deletions src/IO/ReadBufferFromS3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ namespace ProfileEvents
{
extern const Event S3ReadMicroseconds;
extern const Event S3ReadBytes;
extern const Event S3ReadRequestsErrors;
}

namespace DB
Expand All @@ -29,38 +30,83 @@ namespace ErrorCodes


ReadBufferFromS3::ReadBufferFromS3(
std::shared_ptr<Aws::S3::S3Client> client_ptr_, const String & bucket_, const String & key_, size_t buffer_size_)
: SeekableReadBuffer(nullptr, 0), client_ptr(std::move(client_ptr_)), bucket(bucket_), key(key_), buffer_size(buffer_size_)
std::shared_ptr<Aws::S3::S3Client> client_ptr_, const String & bucket_, const String & key_, UInt64 max_single_read_retries_, size_t buffer_size_)
: SeekableReadBuffer(nullptr, 0)
, client_ptr(std::move(client_ptr_))
, bucket(bucket_)
, key(key_)
, max_single_read_retries(max_single_read_retries_)
, buffer_size(buffer_size_)
{
}


bool ReadBufferFromS3::nextImpl()
{
if (!initialized)
bool next_result = false;

if (impl)
{
/// `impl` has been initialized earlier and now we're at the end of the current portion of data.
impl->position() = position();
assert(!impl->hasPendingData());
}
else
{
/// `impl` is not initialized and we're about to read the first portion of data.
impl = initialize();
initialized = true;
next_result = impl->hasPendingData();
}

Stopwatch watch;
auto res = impl->next();
watch.stop();
ProfileEvents::increment(ProfileEvents::S3ReadMicroseconds, watch.elapsedMicroseconds());
auto sleep_time_with_backoff_milliseconds = std::chrono::milliseconds(100);
for (size_t attempt = 0; (attempt < max_single_read_retries) && !next_result; ++attempt)
{
Stopwatch watch;
try
{
/// Try to read a next portion of data.
next_result = impl->next();
watch.stop();
ProfileEvents::increment(ProfileEvents::S3ReadMicroseconds, watch.elapsedMicroseconds());
break;
}
catch (const Exception & e)
{
watch.stop();
ProfileEvents::increment(ProfileEvents::S3ReadMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::S3ReadRequestsErrors, 1);

LOG_INFO(log, "Caught exception while reading S3 object. Bucket: {}, Key: {}, Offset: {}, Attempt: {}, Message: {}",
bucket, key, getPosition(), attempt, e.message());

if (attempt + 1 == max_single_read_retries)
throw;

/// Pause before next attempt.
std::this_thread::sleep_for(sleep_time_with_backoff_milliseconds);
sleep_time_with_backoff_milliseconds *= 2;

/// Try to reinitialize `impl`.
impl.reset();

impl = initialize();
next_result = impl->hasPendingData();
}
}

if (!res)
if (!next_result)
return false;
internal_buffer = impl->buffer();

ProfileEvents::increment(ProfileEvents::S3ReadBytes, internal_buffer.size());
BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset()); /// use the buffer returned by `impl`

working_buffer = internal_buffer;
ProfileEvents::increment(ProfileEvents::S3ReadBytes, working_buffer.size());
offset += working_buffer.size();
return true;
}

off_t ReadBufferFromS3::seek(off_t offset_, int whence)
{
if (initialized)
if (impl)
throw Exception("Seek is allowed only before first read attempt from the buffer.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);

if (whence != SEEK_SET)
Expand All @@ -77,18 +123,17 @@ off_t ReadBufferFromS3::seek(off_t offset_, int whence)

off_t ReadBufferFromS3::getPosition()
{
return offset + count();
return offset - available();
}

std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
{
LOG_TRACE(log, "Read S3 object. Bucket: {}, Key: {}, Offset: {}", bucket, key, std::to_string(offset));
LOG_TRACE(log, "Read S3 object. Bucket: {}, Key: {}, Offset: {}", bucket, key, offset);

Aws::S3::Model::GetObjectRequest req;
req.SetBucket(bucket);
req.SetKey(key);
if (offset != 0)
req.SetRange("bytes=" + std::to_string(offset) + "-");
req.SetRange(fmt::format("bytes={}-", offset));

Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req);

Expand Down
3 changes: 2 additions & 1 deletion src/IO/ReadBufferFromS3.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ class ReadBufferFromS3 : public SeekableReadBuffer
std::shared_ptr<Aws::S3::S3Client> client_ptr;
String bucket;
String key;
UInt64 max_single_read_retries;
size_t buffer_size;
bool initialized = false;
off_t offset = 0;
Aws::S3::Model::GetObjectResult read_result;
std::unique_ptr<ReadBuffer> impl;
Expand All @@ -40,6 +40,7 @@ class ReadBufferFromS3 : public SeekableReadBuffer
std::shared_ptr<Aws::S3::S3Client> client_ptr_,
const String & bucket_,
const String & key_,
UInt64 max_single_read_retries_,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE);

bool nextImpl() override;
Expand Down
8 changes: 7 additions & 1 deletion src/Storages/StorageS3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ namespace
const Context & context,
const ColumnsDescription & columns,
UInt64 max_block_size,
UInt64 max_single_read_retries_,
const CompressionMethod compression_method,
const std::shared_ptr<Aws::S3::S3Client> & client,
const String & bucket,
Expand All @@ -82,7 +83,7 @@ namespace
, with_path_column(need_path)
, file_path(bucket + "/" + key)
{
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromS3>(client, bucket, key), compression_method);
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromS3>(client, bucket, key, max_single_read_retries_), compression_method);
auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
reader = std::make_shared<InputStreamFromInputFormat>(input_format);

Expand Down Expand Up @@ -199,6 +200,7 @@ StorageS3::StorageS3(
const String & secret_access_key_,
const StorageID & table_id_,
const String & format_name_,
UInt64 max_single_read_retries_,
UInt64 min_upload_part_size_,
UInt64 max_single_part_upload_size_,
UInt64 max_connections_,
Expand All @@ -213,6 +215,7 @@ StorageS3::StorageS3(
, max_connections(max_connections_)
, global_context(context_.getGlobalContext())
, format_name(format_name_)
, max_single_read_retries(max_single_read_retries_)
, min_upload_part_size(min_upload_part_size_)
, max_single_part_upload_size(max_single_part_upload_size_)
, compression_method(compression_method_)
Expand Down Expand Up @@ -318,6 +321,7 @@ Pipe StorageS3::read(
context,
metadata_snapshot->getColumns(),
max_block_size,
max_single_read_retries,
chooseCompressionMethod(uri.key, compression_method),
client,
uri.bucket,
Expand Down Expand Up @@ -402,6 +406,7 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
secret_access_key = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
}

UInt64 max_single_read_retries = args.local_context.getSettingsRef().s3_max_single_read_retries;
UInt64 min_upload_part_size = args.local_context.getSettingsRef().s3_min_upload_part_size;
UInt64 max_single_part_upload_size = args.local_context.getSettingsRef().s3_max_single_part_upload_size;
UInt64 max_connections = args.local_context.getSettingsRef().s3_max_connections;
Expand All @@ -425,6 +430,7 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
secret_access_key,
args.table_id,
format_name,
max_single_read_retries,
min_upload_part_size,
max_single_part_upload_size,
max_connections,
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/StorageS3.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class StorageS3 : public ext::shared_ptr_helper<StorageS3>, public IStorage
const String & secret_access_key,
const StorageID & table_id_,
const String & format_name_,
UInt64 max_single_read_retries_,
UInt64 min_upload_part_size_,
UInt64 max_single_part_upload_size_,
UInt64 max_connections_,
Expand Down Expand Up @@ -65,6 +66,7 @@ class StorageS3 : public ext::shared_ptr_helper<StorageS3>, public IStorage
const Context & global_context;

String format_name;
UInt64 max_single_read_retries;
size_t min_upload_part_size;
size_t max_single_part_upload_size;
String compression_method;
Expand Down
2 changes: 2 additions & 0 deletions src/TableFunctions/TableFunctionS3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, const C
{
Poco::URI uri (filename);
S3::URI s3_uri (uri);
UInt64 max_single_read_retries = context.getSettingsRef().s3_max_single_read_retries;
UInt64 min_upload_part_size = context.getSettingsRef().s3_min_upload_part_size;
UInt64 max_single_part_upload_size = context.getSettingsRef().s3_max_single_part_upload_size;
UInt64 max_connections = context.getSettingsRef().s3_max_connections;
Expand All @@ -76,6 +77,7 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, const C
secret_access_key,
StorageID(getDatabaseName(), table_name),
format,
max_single_read_retries,
min_upload_part_size,
max_single_part_upload_size,
max_connections,
Expand Down
Loading

0 comments on commit c16bad8

Please sign in to comment.