Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP 714 with compression support #4721

Merged
Show file tree
Hide file tree
Changes from 87 commits
Commits
Show all changes
99 commits
Select commit Hold shift + click to select a range
3f0eed3
Add boilerplate for the entire flow (#4365)
milindl Jul 25, 2023
56fa55a
Added new data type UUID for topic id. Added topic id support in Meta…
pranavrth May 29, 2023
9546d71
Merge Get and Push Protocol (#4377)
anchitj Aug 1, 2023
e11aa51
Clang format fixes
anchitj Aug 1, 2023
ca6a8e5
Fix memory leaks (#4378)
milindl Aug 2, 2023
ef07a95
Add broker selection and client termination [KIP-714] (#4382)
milindl Aug 7, 2023
530eb0b
Integrate nanopb to encode and decode metrics (#4388)
anchitj Aug 21, 2023
5c943ba
Handle delta temporality and some code refactoring (#4410)
anchitj Aug 29, 2023
9e3cf26
Use rd_kafka_compression_t (#4419)
anchitj Aug 31, 2023
21f7072
Add method to get requests from mock cluster
milindl Sep 7, 2023
f1c4fc4
Fix to ns
anchitj Sep 7, 2023
7fdbfbe
Add capacity in mock broker to change PushIntervalMs
milindl Sep 20, 2023
e6a9d66
Fix memory leak
milindl Sep 20, 2023
e0b4f3e
Fix jitter: should not be decimal
milindl Sep 20, 2023
d7e4b47
Add test for telemetry using mock broker
milindl Sep 20, 2023
a614d2c
Address comments
milindl Sep 28, 2023
1216b39
Unit tests push telemetry encode decode (#4440)
anchitj Sep 29, 2023
4ed6abd
Initial code for metrics compression
anchitj Nov 7, 2023
3d66f64
Style and some bug fixes
anchitj Nov 7, 2023
88dcfc1
Little refactoring
anchitj Nov 8, 2023
81913fe
Add max telemetry bytes (#4507)
anchitj Dec 6, 2023
ccc1a26
Add other producer metrics
anchitj Oct 17, 2023
b6c04ac
More changes
anchitj Oct 17, 2023
94545fe
Add enable.metrics.push config
anchitj Oct 17, 2023
aa8a668
Fix bugs
anchitj Oct 18, 2023
8c38f8e
Refactor
anchitj Oct 18, 2023
3987849
Fix warnings
anchitj Oct 18, 2023
4d1d192
Fix UT
anchitj Oct 18, 2023
2f6afa2
Nullify after free
anchitj Oct 23, 2023
eff9351
Fix bugs
anchitj Oct 18, 2023
15fd89c
Refactor
anchitj Oct 18, 2023
d534720
PR Feedback
anchitj Dec 7, 2023
7a911d9
Upgrade vcpkg (#4531)
emasab Dec 8, 2023
40489d4
Add available consumer metrics
anchitj Oct 30, 2023
4bc1c48
Style fix
anchitj Oct 30, 2023
430745b
PR Feedback
anchitj Dec 7, 2023
e9b2bba
labels
anchitj Nov 20, 2023
73d14ee
Add per broker labels
anchitj Nov 22, 2023
4bd7946
Style fix
anchitj Nov 22, 2023
34fca5f
Update all metrics subscribed string
anchitj Nov 22, 2023
ebd36c8
Style fix
anchitj Nov 22, 2023
a5988d6
PR Feedback
anchitj Dec 7, 2023
52a6150
Update matching logic
anchitj Nov 22, 2023
5a4b645
Style fix
anchitj Nov 22, 2023
36c4f83
Add per broker labels
anchitj Nov 22, 2023
217ceca
Style fix
anchitj Nov 22, 2023
ce9f6a1
Bug fixes
anchitj Nov 27, 2023
5806b0f
Don't push terminating telemetry if metrics aren't enabled
anchitj Nov 28, 2023
7525eb4
Match with temporality
anchitj Dec 5, 2023
d6cdce5
Style fix
anchitj Dec 5, 2023
89cfa2a
Add new broker errors
anchitj Dec 6, 2023
8ca308b
Memory leak fixes
anchitj Dec 6, 2023
40edc31
PR feedback
anchitj Dec 7, 2023
e6a88cb
Style fix
anchitj Dec 7, 2023
d541a2b
Immediate schedule getTelemetry for unknown subscription id
anchitj Dec 11, 2023
6abdf47
Rebase fix
anchitj Dec 12, 2023
b92899b
Use rd_uclock
anchitj Dec 12, 2023
ccf2ba9
Master merge
anchitj Dec 15, 2023
c30a432
Master merge
anchitj Dec 15, 2023
8d7e70b
Add more mock test
anchitj Feb 1, 2024
c815cb5
Master merge
anchitj Feb 1, 2024
e516730
Style fix
anchitj Feb 1, 2024
3cb7286
Merge branch 'master' into dev_kip_714_mock_broker_integration_tests_…
anchitj Apr 3, 2024
d15171b
Merge master
anchitj May 10, 2024
9bf7966
Merge compression
anchitj May 10, 2024
5c17295
Merge branch 'master' into dev_kip_714_mock_broker_integration_tests_…
anchitj Jun 21, 2024
9a0c0c2
PR feedback first phase
anchitj Jun 24, 2024
604d18f
Merge branch 'master' into dev_kip_714_mock_broker_integration_tests_…
anchitj Jun 25, 2024
ae64b34
Minor fixes
anchitj Jun 25, 2024
4f2d428
Segfault fixes
anchitj Jun 28, 2024
2772552
PR Feedback 3rd round
anchitj Jul 1, 2024
b537675
Use rkb instead of rkb_selected
anchitj Jul 1, 2024
5192995
Write to rbuf before returning
anchitj Jul 1, 2024
77fc568
Use rd_avg_rollover instead of rd_avg_current
anchitj Jul 1, 2024
1a04f33
Fix exp_max for avg counters
anchitj Jul 1, 2024
4ac880c
Fix unit test
anchitj Jul 1, 2024
4566133
Reset current avg's max
anchitj Jul 2, 2024
e19102f
Don't use histogram in telemetry avgs
anchitj Jul 2, 2024
97589f3
Remove compress direct methods
anchitj Jul 2, 2024
97b48fa
Revert "Don't use histogram in telemetry avgs"
anchitj Jul 3, 2024
a5b398d
PR Feedback 4th round
anchitj Jul 3, 2024
9ae5c1c
Replace TELTERM by TERM
anchitj Jul 3, 2024
387ae01
Move first_push to telemetry.c
anchitj Jul 3, 2024
7a36b53
Merge branch 'master' into dev_kip_714_mock_broker_integration_tests_…
anchitj Jul 3, 2024
77a75b4
Fix unit test
anchitj Jul 3, 2024
23c3b8f
PR feedback 5th round
anchitj Jul 3, 2024
69f9dcf
Rename interface to decode_interface
anchitj Jul 3, 2024
77ce834
PR feedback 6th round
anchitj Jul 3, 2024
1924e3f
PR feedback
anchitj Jul 4, 2024
476038a
PR feedback 7th round
anchitj Jul 4, 2024
dd7d1fc
PR feedback 8th round
anchitj Jul 5, 2024
bb53d5e
Merge branch 'master' into dev_kip_714_mock_broker_integration_tests_…
anchitj Jul 5, 2024
b78751c
Change failure message in rd_assert
anchitj Jul 5, 2024
652b0ea
PR feedback 9th round
anchitj Jul 5, 2024
40b0ad8
Update copyright year rdkafka_conf.h
anchitj Jul 5, 2024
46e4579
Remove arg being cast as test_data
anchitj Jul 5, 2024
0fbc703
Add changelog entry
anchitj Jul 5, 2024
bbd316b
Remove changes to 0009-mock_cluster.c
anchitj Jul 5, 2024
4732ff6
PR feedback 10th round
anchitj Jul 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .formatignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,16 @@ src/snappy_compat.h
src/tinycthread.c
src/tinycthread.h
src/regexp.h
src/nanopb/pb_common.c
src/nanopb/pb_common.h
src/nanopb/pb_decode.c
src/nanopb/pb_decode.h
src/nanopb/pb_encode.c
src/nanopb/pb_encode.h
src/nanopb/pb.h
src/opentelemetry/common.pb.c
src/opentelemetry/common.pb.h
src/opentelemetry/metrics.pb.c
src/opentelemetry/metrics.pb.h
src/opentelemetry/resource.pb.c
src/opentelemetry/resource.pb.h
3 changes: 2 additions & 1 deletion CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ topic.metadata.refresh.fast.cnt | * | 0 .. 1000 | 10
topic.metadata.refresh.sparse | * | true, false | true | low | Sparse metadata requests (consumes less network bandwidth) <br>*Type: boolean*
topic.metadata.propagation.max.ms | * | 0 .. 3600000 | 30000 | low | Apache Kafka topic creation is asynchronous and it takes some time for a new topic to propagate throughout the cluster to all brokers. If a client requests topic metadata after manual topic creation but before the topic has been fully propagated to the broker the client is requesting metadata from, the topic will seem to be non-existent and the client will mark the topic as such, failing queued produced messages with `ERR__UNKNOWN_TOPIC`. This setting delays marking a topic as non-existent until the configured propagation max time has passed. The maximum propagation time is calculated from the time the topic is first referenced in the client, e.g., on produce(). <br>*Type: integer*
topic.blacklist | * | | | low | Topic blacklist, a comma-separated list of regular expressions for matching topic names that should be ignored in broker metadata information as if the topics did not exist. <br>*Type: pattern list*
debug | * | generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, mock, assignor, conf, all | | medium | A comma-separated list of debug contexts to enable. Detailed Producer debugging: broker,topic,msg. Consumer: consumer,cgrp,topic,fetch <br>*Type: CSV flags*
debug | * | generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, mock, assignor, conf, telemetry, all | | medium | A comma-separated list of debug contexts to enable. Detailed Producer debugging: broker,topic,msg. Consumer: consumer,cgrp,topic,fetch <br>*Type: CSV flags*
socket.timeout.ms | * | 10 .. 300000 | 60000 | low | Default timeout for network requests. Producer: ProduceRequests will use the lesser value of `socket.timeout.ms` and remaining `message.timeout.ms` for the first message in the batch. Consumer: FetchRequests will use `fetch.wait.max.ms` + `socket.timeout.ms`. Admin: Admin requests will use `socket.timeout.ms` or explicitly set `rd_kafka_AdminOptions_set_operation_timeout()` value. <br>*Type: integer*
socket.blocking.max.ms | * | 1 .. 60000 | 1000 | low | **DEPRECATED** No longer used. <br>*Type: integer*
socket.send.buffer.bytes | * | 0 .. 100000000 | 0 | low | Broker socket send buffer size. System default is used if 0. <br>*Type: integer*
Expand Down Expand Up @@ -156,6 +156,7 @@ dr_cb | P | |
dr_msg_cb | P | | | low | Delivery report callback (set with rd_kafka_conf_set_dr_msg_cb()) <br>*Type: see dedicated API*
sticky.partitioning.linger.ms | P | 0 .. 900000 | 10 | low | Delay in milliseconds to wait to assign new sticky partitions for each topic. By default, set to double the time of linger.ms. To disable sticky behavior, set to 0. This behavior affects messages with the key NULL in all cases, and messages with key lengths of zero when the consistent_random partitioner is in use. These messages would otherwise be assigned randomly. A higher value allows for more effective batching of these messages. <br>*Type: integer*
client.dns.lookup | * | use_all_dns_ips, resolve_canonical_bootstrap_servers_only | use_all_dns_ips | low | Controls how the client uses DNS lookups. By default, when the lookup returns multiple IP addresses for a hostname, they will all be attempted for connection before the connection is considered failed. This applies to both bootstrap and advertised servers. If the value is set to `resolve_canonical_bootstrap_servers_only`, each entry will be resolved and expanded into a list of canonical names. **WARNING**: `resolve_canonical_bootstrap_servers_only` must only be used with `GSSAPI` (Kerberos) as `sasl.mechanism`, as it's the only purpose of this configuration value. **NOTE**: Default here is different from the Java client's default behavior, which connects only to the first IP address returned for a hostname. <br>*Type: enum value*
enable.metrics.push | * | true, false | true | low | Whether to enable pushing of client metrics to the cluster, if the cluster has a client metrics subscription which matches this client <br>*Type: boolean*


## Topic configuration properties
Expand Down
22 changes: 22 additions & 0 deletions LICENSE.nanopb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
For files in src/nanopb : https://github.com/nanopb/nanopb/blob/8ef41e0ebd45daaf19459a011f67e66224b247cd/LICENSE.txt

Copyright (c) 2011 Petteri Aimonen <jpa at nanopb.mail.kapsi.fi>

This software is provided 'as-is', without any express or
implied warranty. In no event will the authors be held liable
for any damages arising from the use of this software.

Permission is granted to anyone to use this software for any
purpose, including commercial applications, and to alter it and
redistribute it freely, subject to the following restrictions:

1. The origin of this software must not be misrepresented; you
must not claim that you wrote the original software. If you use
this software in a product, an acknowledgment in the product
documentation would be appreciated but is not required.

2. Altered source versions must be plainly marked as such, and
must not be misrepresented as being the original software.

3. This notice may not be removed or altered from any source
distribution.
203 changes: 203 additions & 0 deletions LICENSE.opentelemetry
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
For files in src/opentelemetry: https://github.com/open-telemetry/opentelemetry-proto/blob/81a296f9dba23e32d77f46d58c8ea4244a2157a6/LICENSE

Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/

TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION

1. Definitions.

"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.

"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.

"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.

"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.

"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.

"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.

"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).

"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.

"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."

"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.

2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.

3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.

4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:

(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and

(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and

(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and

(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.

You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.

5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.

6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.

7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.

8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.

9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.

END OF TERMS AND CONDITIONS

APPENDIX: How to apply the Apache License to your work.

To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.

Copyright [yyyy] [name of copyright owner]

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Loading