Skip to content

Commit

Permalink
Merge branch 'release/0.9.96'
Browse files Browse the repository at this point in the history
  • Loading branch information
jfallows committed Oct 1, 2024
2 parents d81b462 + e984824 commit e70a261
Show file tree
Hide file tree
Showing 180 changed files with 5,833 additions and 184 deletions.
24 changes: 23 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,29 @@

## [Unreleased](https://github.com/aklivity/zilla/tree/HEAD)

[Full Changelog](https://github.com/aklivity/zilla/compare/0.9.94...HEAD)
[Full Changelog](https://github.com/aklivity/zilla/compare/0.9.95...HEAD)

**Implemented enhancements:**

- Support Kafka topics create, alter, delete [\#1059](https://github.com/aklivity/zilla/issues/1059)

**Fixed bugs:**

- `zilla` Fails to Load Configuration from Specified location if the initial attempts are unsuccessful [\#1226](https://github.com/aklivity/zilla/issues/1226)

**Merged pull requests:**

- Risingwave SInk primary key fix [\#1273](https://github.com/aklivity/zilla/pull/1273) ([akrambek](https://github.com/akrambek))
- Risingwave and PsqlKafka bug fixes [\#1272](https://github.com/aklivity/zilla/pull/1272) ([akrambek](https://github.com/akrambek))
- create external function issue fix [\#1271](https://github.com/aklivity/zilla/pull/1271) ([ankitk-me](https://github.com/ankitk-me))
- Remove produceRecordFramingSize constraints [\#1270](https://github.com/aklivity/zilla/pull/1270) ([akrambek](https://github.com/akrambek))
- External header pattern fix [\#1269](https://github.com/aklivity/zilla/pull/1269) ([ankitk-me](https://github.com/ankitk-me))
- Detect config update after initial 404 status [\#1267](https://github.com/aklivity/zilla/pull/1267) ([jfallows](https://github.com/jfallows))
- Support Kafka topics alter, delete [\#1265](https://github.com/aklivity/zilla/pull/1265) ([akrambek](https://github.com/akrambek))

## [0.9.95](https://github.com/aklivity/zilla/tree/0.9.95) (2024-09-23)

[Full Changelog](https://github.com/aklivity/zilla/compare/0.9.94...0.9.95)

**Fixed bugs:**

Expand Down
2 changes: 1 addition & 1 deletion build/flyweight-maven-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>build</artifactId>
<version>0.9.95</version>
<version>0.9.96</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion build/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>zilla</artifactId>
<version>0.9.95</version>
<version>0.9.96</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion cloud/docker-image/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>cloud</artifactId>
<version>0.9.95</version>
<version>0.9.96</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion cloud/helm-chart/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>cloud</artifactId>
<version>0.9.95</version>
<version>0.9.96</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion cloud/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>zilla</artifactId>
<version>0.9.95</version>
<version>0.9.96</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion conf/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>zilla</artifactId>
<version>0.9.95</version>
<version>0.9.96</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion incubator/binding-amqp.spec/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>incubator</artifactId>
<version>0.9.95</version>
<version>0.9.96</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion incubator/binding-amqp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>incubator</artifactId>
<version>0.9.95</version>
<version>0.9.96</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion incubator/binding-pgsql-kafka.spec/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>incubator</artifactId>
<version>0.9.95</version>
<version>0.9.96</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ write zilla:begin.ext ${kafka:beginEx()
.name("dev.cities")
.partitionCount(1)
.replicas(1)
.config("cleanup.policy", "compact")
.config("cleanup.policy", "delete")
.build()
.timeout(30000)
.validateOnly("false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ read zilla:begin.ext ${kafka:matchBeginEx()
.name("dev.cities")
.partitionCount(1)
.replicas(1)
.config("cleanup.policy", "compact")
.config("cleanup.policy", "delete")
.build()
.timeout(30000)
.validateOnly("false")
Expand Down
2 changes: 1 addition & 1 deletion incubator/binding-pgsql-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>incubator</artifactId>
<version>0.9.95</version>
<version>0.9.96</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ protected String convertPgsqlTypeToAvro(
{
return switch (pgsqlType.toLowerCase())
{
case "varchar", "text", "char", "bpchar" -> // Blank-padded char in PG
"string";
case "int", "integer", "serial" -> "int";
case "bigint", "bigserial" -> "long";
case "boolean", "bool" -> "boolean";
case "real", "float4" -> "float";
case "double precision", "float8" -> "double"; // Timestamp with time zone
case "timestamp", "timestamptz", "date", "time" ->
"timestamp-millis"; // Avro logical type for date/time values
case "varchar", "text", "char", "bpchar" -> "\\\"string\\\"";
case "int", "integer", "serial" -> "\\\"int\\\"";
case "numeric" -> "\\\"double\\\"";
case "bigint", "bigserial" -> "\\\"long\\\"";
case "boolean", "bool" -> "\\\"boolean\\\"";
case "real", "float4" -> "\\\"float\\\"";
case "double", "double precision", "float8" -> "\\\"double\\\"";
case "timestamp", "timestampz", "date", "time" ->
"{ \\\"type\\\": \\\"long\\\", \\\"logicalTyp\\\": \\\"timestamp-millis\\\" }";
default -> null;
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public String generateSchema(
String avroType = convertPgsqlTypeToAvro(pgsqlType);

schemaBuilder.append(" {\\\"name\\\": \\\"").append(fieldName).append("\\\",");
schemaBuilder.append(" \\\"type\\\": [\\\"").append(avroType).append("\\\", \\\"null\\\"] },");
schemaBuilder.append(" \\\"type\\\": [").append(avroType).append(", \\\"null\\\"] },");
}

// Remove the last comma and close the fields array
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public String generateSchema(
String avroType = convertPgsqlTypeToAvro(pgsqlType);

schemaBuilder.append(" {\\\"name\\\": \\\"").append(fieldName).append("\\\",");
schemaBuilder.append(" \\\"type\\\": \\\"").append(avroType).append("\\\"},");
schemaBuilder.append(" \\\"type\\\": ").append(avroType).append("},");
}

// Remove the last comma and close the fields array
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1256,14 +1256,14 @@ else if (server.commandsProcessed == 0)

final PgsqlKafkaBindingConfig binding = server.binding;
final String primaryKey = binding.avroValueSchema.primaryKey(createTable);
final int primaryKeyCount = binding.avroValueSchema.primaryKeyCount(createTable);

int versionId = NO_ERROR_SCHEMA_VERSION_ID;
if (primaryKey != null)
{
//TODO: assign versionId to avoid test failure
final String subjectKey = String.format("%s.%s-key", server.database, topic);

final int primaryKeyCount = binding.avroValueSchema.primaryKeyCount(createTable);
String keySchema = primaryKeyCount > 1
? binding.avroKeySchema.generateSchema(server.database, createTable)
: AVRO_KEY_SCHEMA;
Expand All @@ -1278,7 +1278,7 @@ else if (server.commandsProcessed == 0)

if (versionId != NO_VERSION_ID)
{
final String policy = primaryKey != null
final String policy = primaryKey != null && primaryKeyCount == 1
? "compact"
: "delete";

Expand Down
2 changes: 1 addition & 1 deletion incubator/binding-pgsql.spec/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>incubator</artifactId>
<version>0.9.95</version>
<version>0.9.96</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion incubator/binding-pgsql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>incubator</artifactId>
<version>0.9.95</version>
<version>0.9.96</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion incubator/binding-risingwave.spec/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>incubator</artifactId>
<version>0.9.95</version>
<version>0.9.96</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#
# Copyright 2021-2023 Aklivity Inc
#
# Licensed under the Aklivity Community License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy of the
# License at
#
# https://www.aklivity.io/aklivity-community-license/
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#

connect "zilla://streams/app1"
option zilla:window 8192
option zilla:transmission "duplex"

write zilla:begin.ext ${pgsql:beginEx()
.typeId(zilla:id("pgsql"))
.parameter("user", "root")
.parameter("database", "dev")
.parameter("application_name", "psql")
.parameter("client_encoding", "UTF8")
.build()}

connected

write zilla:data.ext ${pgsql:dataEx()
.typeId(zilla:id("pgsql"))
.query()
.build()
.build()}
write "CREATE FUNCTION series(int)\n"
"RETURNS TABLE (x int)\n"
"AS series\n"
"LANGUAGE java\n"
"USING LINK 'http://localhost:8815';"
[0x00]

write flush

read advised zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.completion()
.tag("CREATE_FUNCTION")
.build()
.build()}

read advised zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.ready()
.status("IDLE")
.build()
.build()}

Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#
# Copyright 2021-2023 Aklivity Inc
#
# Licensed under the Aklivity Community License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy of the
# License at
#
# https://www.aklivity.io/aklivity-community-license/
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#

accept "zilla://streams/app1"
option zilla:window 8192
option zilla:transmission "duplex"

accepted

read zilla:begin.ext ${pgsql:beginEx()
.typeId(zilla:id("pgsql"))
.parameter("user", "root")
.parameter("database", "dev")
.parameter("application_name", "psql")
.parameter("client_encoding", "UTF8")
.build()}

connected

read zilla:data.ext ${pgsql:dataEx()
.typeId(zilla:id("pgsql"))
.query()
.build()
.build()}
read "CREATE FUNCTION series(int)\n"
"RETURNS TABLE (x int)\n"
"AS series\n"
"LANGUAGE java\n"
"USING LINK 'http://localhost:8815';"
[0x00]

write advise zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.completion()
.tag("CREATE_FUNCTION")
.build()
.build()}

write advise zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.ready()
.status("IDLE")
.build()
.build()}

Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ write zilla:data.ext ${pgsql:dataEx()
.query()
.build()
.build()}
write "CREATE FUNCTION series(int)\n"
"RETURNS TABLE (x int)\n"
"AS series\n"
write "CREATE FUNCTION gcd(int , int)\n"
"RETURNS int\n"
"AS gcd\n"
"LANGUAGE java\n"
"USING LINK 'http://localhost:8815';"
[0x00]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ read zilla:data.ext ${pgsql:dataEx()
.query()
.build()
.build()}
read "CREATE FUNCTION series(int)\n"
"RETURNS TABLE (x int)\n"
"AS series\n"
read "CREATE FUNCTION gcd(int , int)\n"
"RETURNS int\n"
"AS gcd\n"
"LANGUAGE java\n"
"USING LINK 'http://localhost:8815';"
[0x00]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,10 @@ write "CREATE SINK distinct_cities_sink\n"
" connector='kafka',\n"
" properties.bootstrap.server='localhost:9092',\n"
" topic='dev.distinct_cities',\n"
" primary_key='city'\n"
" primary_key='id'\n"
") FORMAT UPSERT ENCODE AVRO (\n"
" schema.registry='http://localhost:8081'\n"
");"
") KEY ENCODE TEXT;"
[0x00]
write flush

Expand Down
Loading

0 comments on commit e70a261

Please sign in to comment.