diff --git a/src/main/java/ru/rt/restream/reindexer/Query.java b/src/main/java/ru/rt/restream/reindexer/Query.java index 95262d5..f46113a 100644 --- a/src/main/java/ru/rt/restream/reindexer/Query.java +++ b/src/main/java/ru/rt/restream/reindexer/Query.java @@ -100,6 +100,9 @@ public class Query { private static final int QUERY_UPDATE_FIELD_V2 = 25; private static final int QUERY_BETWEEN_FIELDS_CONDITION = 26; private static final int QUERY_ALWAYS_FALSE_CONDITION = 27; + private static final int QUERY_ALWAYS_TRUE_CONDITION = 28; + private static final int QUERY_SUB_QUERY_CONDITION = 29; + private static final int QUERY_FIELD_SUB_QUERY_CONDITION = 30; /** * Condition types. @@ -187,7 +190,7 @@ public enum Condition { private final List> namespaces = new ArrayList<>(); - private Deque openedBrackets = new ArrayDeque<>(); + private final Deque openedBrackets = new ArrayDeque<>(); private final QueryLogBuilder logBuilder = new QueryLogBuilder(); @@ -388,6 +391,69 @@ public Query where(String indexName, Condition condition, Object... values) { return this; } + /** + * Queries are possible only on the indexed fields, marked with reindex annotation. + * + * @param subquery query returning aggregated values + * @param condition condition value {@link Condition} + * @param values values to match + * @return the {@link Query} for further customizations + */ + public Query where(Query subquery, Condition condition, Collection values) { + if (values == null) { + values = Collections.emptyList(); + } + return where(subquery, condition, values.toArray()); + } + + /** + * Queries are possible only on the indexed fields, marked with reindex annotation. + * + * @param subquery query returning aggregated values + * @param condition condition value {@link Condition} + * @param values values to match + * @return the {@link Query} for further customizations + */ + public Query where(Query subquery, Condition condition, Object... values) { + logBuilder.where(nextOperation, subquery, condition.code, values); + buffer.putVarUInt32(QUERY_SUB_QUERY_CONDITION) + .putVarUInt32(nextOperation) + .putVBytes(subquery.buffer.bytes()) + .putVarUInt32(condition.code); + + this.nextOperation = OP_AND; + this.queryCount++; + + buffer.putVarUInt32(values.length); + for (Object key : values) { + putValue(key); + } + + return this; + } + + /** + * Queries are possible only on the indexed fields, marked with reindex annotation. + * + * @param indexName index name + * @param condition condition value {@link Condition} + * @param subquery query returning aggregated values + * @return the {@link Query} for further customizations + */ + public Query where(String indexName, Condition condition, Query subquery) { + logBuilder.where(nextOperation, indexName, condition.code, subquery); + buffer.putVarUInt32(QUERY_FIELD_SUB_QUERY_CONDITION) + .putVarUInt32(nextOperation) + .putVString(indexName) + .putVarUInt32(condition.code) + .putVBytes(subquery.buffer.bytes()); + + this.nextOperation = OP_AND; + this.queryCount++; + + return this; + } + /** * Where - Add comparing two fields where condition to DB query. * @@ -1140,4 +1206,13 @@ public List getJoinFields() { return joinFields; } + /** + * Get constructed sql log string. + * + * @return SQL-like representation of reindexer query + */ + String getSql() { + return logBuilder.getSql(); + } + } diff --git a/src/main/java/ru/rt/restream/reindexer/QueryLogBuilder.java b/src/main/java/ru/rt/restream/reindexer/QueryLogBuilder.java index e8e1b6c..b29cb24 100644 --- a/src/main/java/ru/rt/restream/reindexer/QueryLogBuilder.java +++ b/src/main/java/ru/rt/restream/reindexer/QueryLogBuilder.java @@ -280,6 +280,7 @@ void where(int operationCode, String field, int conditionCode, Collection val whereEntries.add(queryEntry); } } + void where(int operationCode, String field, int conditionCode, Object... values) { QueryEntry queryEntry = new QueryEntry(); queryEntry.operation = getOperation(operationCode); @@ -294,6 +295,10 @@ void where(int operationCode, String field, int conditionCode, Object... values) } } + void where(int operationCode, Query subquery, int conditionCode, Object... values) { + where(operationCode, mapToString(subquery), conditionCode, values); + } + void whereBetweenFields(int operationCode, String firstField, int conditionCode, String secondField) { QueryEntry queryEntry = new QueryEntry(); queryEntry.operation = getOperation(operationCode); @@ -661,6 +666,9 @@ private String mapToString(Object whereEntryValue) { return Arrays.stream((Object[]) whereEntryValue) .map(v -> v instanceof String ? addQuotes(v) : String.valueOf(v)) .collect(Collectors.joining(", ", "{", "}")); + } else if (whereEntryValue instanceof Query) { + Query subquery = (Query) whereEntryValue; + return "(" + subquery.getSql() + ")"; } return whereEntryValue instanceof String ? addQuotes(whereEntryValue) : String.valueOf(whereEntryValue); } diff --git a/src/test/java/ru/rt/restream/reindexer/connector/BuiltinSubQueryTest.java b/src/test/java/ru/rt/restream/reindexer/connector/BuiltinSubQueryTest.java new file mode 100644 index 0000000..c060b87 --- /dev/null +++ b/src/test/java/ru/rt/restream/reindexer/connector/BuiltinSubQueryTest.java @@ -0,0 +1,27 @@ +/* + * Copyright 2020 Restream + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ru.rt.restream.reindexer.connector; + +import ru.rt.restream.category.BuiltinTest; + +/** + * Tests for Builtin implementation. + */ +@BuiltinTest +public class BuiltinSubQueryTest extends SubQueryTest { + +} diff --git a/src/test/java/ru/rt/restream/reindexer/connector/CprotoSubQueryTest.java b/src/test/java/ru/rt/restream/reindexer/connector/CprotoSubQueryTest.java new file mode 100644 index 0000000..2b8bf18 --- /dev/null +++ b/src/test/java/ru/rt/restream/reindexer/connector/CprotoSubQueryTest.java @@ -0,0 +1,27 @@ +/* + * Copyright 2020 Restream + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ru.rt.restream.reindexer.connector; + +import ru.rt.restream.category.CprotoTest; + +/** + * Tests for Cproto implementation. + */ +@CprotoTest +public class CprotoSubQueryTest extends SubQueryTest { + +} diff --git a/src/test/java/ru/rt/restream/reindexer/connector/SubQueryTest.java b/src/test/java/ru/rt/restream/reindexer/connector/SubQueryTest.java new file mode 100644 index 0000000..796e6a4 --- /dev/null +++ b/src/test/java/ru/rt/restream/reindexer/connector/SubQueryTest.java @@ -0,0 +1,269 @@ +/* + * Copyright 2020 Restream + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ru.rt.restream.reindexer.connector; + +import org.junit.jupiter.api.Test; +import ru.rt.restream.reindexer.Namespace; +import ru.rt.restream.reindexer.NamespaceOptions; +import ru.rt.restream.reindexer.Query; +import ru.rt.restream.reindexer.ResultIterator; +import ru.rt.restream.reindexer.annotations.Reindex; +import ru.rt.restream.reindexer.db.DbBaseTest; + +import java.util.ArrayList; +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static ru.rt.restream.reindexer.Query.Condition.EQ; +import static ru.rt.restream.reindexer.Query.Condition.GE; +import static ru.rt.restream.reindexer.Query.Condition.GT; +import static ru.rt.restream.reindexer.Query.Condition.LT; +import static ru.rt.restream.reindexer.Query.Condition.SET; + +/** + * Base SubQuery test. + */ +public abstract class SubQueryTest extends DbBaseTest { + + @Test + public void testWhereWithArgsIndexConditionQueryAndOneNs() { + Namespace personsNs = db.openNamespace("persons", NamespaceOptions.defaultOptions(), Person.class); + for (int i = 0; i < 20; i++) { + int age = 9 + (i % 8) * 10; + String name = "Person" + i + "Age" + age; + personsNs.insert(new Person(i, name, age)); + } + + // select * from persons p + // where p.age = (select max(age) from person) + Query maxAgeSubQuery = personsNs.query().aggregateMax("age"); + Query eldestPersonsQuery = personsNs.query() + .where("age", EQ, maxAgeSubQuery); + ResultIterator iterator = eldestPersonsQuery.execute(); + + List actualEldestPersons = new ArrayList<>(); + while (iterator.hasNext()) { + actualEldestPersons.add(iterator.next()); + } + + assertThat(actualEldestPersons.size(), is(2)); + } + + @Test + public void testWhereWithArgsIndexConditionQueryAndTwoNs() { + Namespace personsNs = db.openNamespace("persons", NamespaceOptions.defaultOptions(), Person.class); + Namespace purchasesNs = db.openNamespace("purchases", NamespaceOptions.defaultOptions(), Purchase.class); + int purchaseId = 0; + // 24 persons, everyone has from 0 to 3 purchases, for a total of 36 purchases. + for (int i = 0; i < 24; i++) { + int age = 9 + (i % 8) * 10; + String name = "Person" + i + "Age" + age; + personsNs.insert(new Person(i, name, age)); + for (int j = 0; j < i % 4; j++) { + int price = (j + 1) * 10; + purchasesNs.insert(new Purchase(purchaseId++, i, price, "Asset" + j)); + } + } + + // Aggregation 'distinct' doesn't support in subquery, so use 'max' in there. + // select * from purchases p + // where p.person_id in (select max(id) from persons where age > 60) + Query retireeSubQuery = personsNs.query() + .where("age", GT, 60) + .aggregateMax("id"); + Query purchasesQuery = purchasesNs.query() + .where("person_id", EQ, retireeSubQuery); + ResultIterator iterator = purchasesQuery.execute(); + + List actualRetireesPurchases = new ArrayList<>(); + while (iterator.hasNext()) { + actualRetireesPurchases.add(iterator.next()); + } + + assertThat(actualRetireesPurchases.size(), is(3)); + } + + @Test + public void testWhereWithArgsQueryConditionValues() { + Namespace bannersNs = db.openNamespace("banners", NamespaceOptions.defaultOptions(), Banner.class); + Namespace purchasesNs = db.openNamespace("purchases", NamespaceOptions.defaultOptions(), Purchase.class); + bannersNs.insert(new Banner(1, "Banner")); + int purchaseId = 0; + // 24 persons, everyone has from 0 to 3 purchases, for a total of 36 purchases. + for (int i = 0; i < 24; i++) { + for (int j = 0; j < i % 4; j++) { + int price = (j + 1) * 10; + purchasesNs.insert(new Purchase(purchaseId++, i, price, "Asset" + j)); + } + } + + int personId = 14; + int personPurchasesCnt = 2; + int sumPrices = 30; // 10 + 20 + + // select * from banners b + // where b.id = 1 and (select sum(p.price) from purchases p where p.person_id = 14) = 30 + Query subQuery = purchasesNs.query() + .where("person_id", EQ, personId) + .aggregateSum("price"); + Query bannerExistsOnEqQuery = bannersNs.query() + .where("id", EQ, 1) + .where(subQuery, EQ, sumPrices); + ResultIterator bannerExistsOnEqIterator = bannerExistsOnEqQuery.execute(); + assertThat(bannerExistsOnEqIterator.hasNext(), is(true)); + + // select * from banners b + // where b.id = 1 and (select sum(p.price) from purchases p where p.person_id = 14) >= 30 + Query bannerExistsQuery = bannersNs.query() + .where("id", EQ, 1) + .where(subQuery, GE, sumPrices); + ResultIterator bannerExistsIterator = bannerExistsQuery.execute(); + assertThat(bannerExistsIterator.hasNext(), is(true)); + + // select * from banners b + // where b.id = 1 and (select sum(p.price) from purchases p where p.person_id = 14) < 30 + Query bannerNotExistsQuery = bannersNs.query() + .where("id", EQ, 1) + .where(subQuery, LT, 30); + ResultIterator bannerNotExistsIterator = bannerNotExistsQuery.execute(); + assertThat(bannerNotExistsIterator.hasNext(), is(false)); + } + + public static class Person { + @Reindex(name = "id", isPrimaryKey = true) + private int id; + @Reindex(name = "full_name") + private String fullName; + @Reindex(name = "age") + private int age; + + public Person() { + } + + public Person(int id, String fullName, int age) { + this.id = id; + this.fullName = fullName; + this.age = age; + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public String getFullName() { + return fullName; + } + + public void setFullName(String fullName) { + this.fullName = fullName; + } + + public int getAge() { + return age; + } + + public void setAge(int age) { + this.age = age; + } + } + + public static class Purchase { + @Reindex(name = "id", isPrimaryKey = true) + private int id; + @Reindex(name = "person_id") + private int personId; + @Reindex(name = "price") + private int price; + private String assetName; + + public Purchase() { + } + + public Purchase(int id, int personId, int price, String assetName) { + this.id = id; + this.personId = personId; + this.price = price; + this.assetName = assetName; + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public int getPersonId() { + return personId; + } + + public void setPersonId(int personId) { + this.personId = personId; + } + + public int getPrice() { + return price; + } + + public void setPrice(int price) { + this.price = price; + } + + public String getAssetName() { + return assetName; + } + + public void setAssetName(String assetName) { + this.assetName = assetName; + } + } + + public static class Banner { + @Reindex(name = "id", isPrimaryKey = true) + private int id; + private String name; + + public Banner() { + } + + public Banner(int id, String name) { + this.id = id; + this.name = name; + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } +}