Skip to content

Commit

Permalink
feat: Add bulk import queries (#191)
Browse files Browse the repository at this point in the history
* feat: Add bulk import queries

* fix: PR changes

* fix: PR changes

* fix: PR changes

* fix: PR changes

* fix: PR changes

* fix: PR changes

* fix: PR changes

* fix: PR changes
  • Loading branch information
anku255 authored Feb 27, 2024
1 parent 9ed97ab commit 6eccbf6
Show file tree
Hide file tree
Showing 4 changed files with 257 additions and 3 deletions.
47 changes: 46 additions & 1 deletion src/main/java/io/supertokens/storage/postgresql/Start.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import io.supertokens.pluginInterface.authRecipe.AuthRecipeUserInfo;
import io.supertokens.pluginInterface.authRecipe.LoginMethod;
import io.supertokens.pluginInterface.authRecipe.sqlStorage.AuthRecipeSQLStorage;
import io.supertokens.pluginInterface.bulkimport.BulkImportUser;
import io.supertokens.pluginInterface.bulkimport.BulkImportUserInfo;
import io.supertokens.pluginInterface.bulkimport.sqlStorage.BulkImportSQLStorage;
import io.supertokens.pluginInterface.dashboard.DashboardSearchTags;
import io.supertokens.pluginInterface.dashboard.DashboardSessionInfo;
import io.supertokens.pluginInterface.dashboard.DashboardUser;
Expand Down Expand Up @@ -109,7 +112,7 @@ public class Start
implements SessionSQLStorage, EmailPasswordSQLStorage, EmailVerificationSQLStorage, ThirdPartySQLStorage,
JWTRecipeSQLStorage, PasswordlessSQLStorage, UserMetadataSQLStorage, UserRolesSQLStorage, UserIdMappingStorage,
UserIdMappingSQLStorage, MultitenancyStorage, MultitenancySQLStorage, DashboardSQLStorage, TOTPSQLStorage,
ActiveUsersStorage, ActiveUsersSQLStorage, AuthRecipeSQLStorage {
ActiveUsersStorage, ActiveUsersSQLStorage, AuthRecipeSQLStorage, BulkImportSQLStorage {

// these configs are protected from being modified / viewed by the dev using the SuperTokens
// SaaS. If the core is not running in SuperTokens SaaS, this array has no effect.
Expand Down Expand Up @@ -3039,4 +3042,46 @@ public int getDbActivityCount(String dbname) throws SQLException, StorageQueryEx
return -1;
});
}

@Override
public void addBulkImportUsers(AppIdentifier appIdentifier, List<BulkImportUser> users)
throws StorageQueryException,
TenantOrAppNotFoundException,
io.supertokens.pluginInterface.bulkimport.exceptions.DuplicateUserIdException {
try {
BulkImportQueries.insertBulkImportUsers(this, appIdentifier, users);
} catch (SQLException e) {
if (e instanceof PSQLException) {
ServerErrorMessage serverErrorMessage = ((PSQLException) e).getServerErrorMessage();
if (isPrimaryKeyError(serverErrorMessage, Config.getConfig(this).getBulkImportUsersTable())) {
throw new io.supertokens.pluginInterface.bulkimport.exceptions.DuplicateUserIdException();
}
if (isForeignKeyConstraintError(serverErrorMessage, Config.getConfig(this).getBulkImportUsersTable(), "app_id")) {
throw new TenantOrAppNotFoundException(appIdentifier);
}
}
throw new StorageQueryException(e);
}
}

@Override
public List<BulkImportUserInfo> getBulkImportUsers(AppIdentifier appIdentifier, @Nonnull Integer limit, @Nullable BulkImportUserStatus status,
@Nullable String bulkImportUserId, @Nullable Long createdAt) throws StorageQueryException {
try {
return BulkImportQueries.getBulkImportUsers(this, appIdentifier, limit, status, bulkImportUserId, createdAt);
} catch (SQLException e) {
throw new StorageQueryException(e);
}
}

@Override
public void updateBulkImportUserStatus_Transaction(AppIdentifier appIdentifier, TransactionConnection con, @Nonnull String[] bulkImportUserIds, @Nonnull BulkImportUserStatus status)
throws StorageQueryException {
Connection sqlCon = (Connection) con.getConnection();
try {
BulkImportQueries.updateBulkImportUserStatus_Transaction(this, sqlCon, appIdentifier, bulkImportUserIds, status);
} catch (SQLException e) {
throw new StorageQueryException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,10 @@ public String getTotpUsedCodesTable() {
return addSchemaAndPrefixToTableName("totp_used_codes");
}

public String getBulkImportUsersTable() {
return addSchemaAndPrefixToTableName("bulk_import_users");
}

private String addSchemaAndPrefixToTableName(String tableName) {
return addSchemaToTableName(postgresql_table_names_prefix + tableName);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
*
* This software is licensed under the Apache License, Version 2.0 (the
* "License") as published by the Apache Software Foundation.
*
* You may not use this file except in compliance with the License. You may
* obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.supertokens.storage.postgresql.queries;

import static io.supertokens.storage.postgresql.QueryExecutorTemplate.update;
import static io.supertokens.storage.postgresql.QueryExecutorTemplate.execute;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import io.supertokens.pluginInterface.RowMapper;
import io.supertokens.pluginInterface.bulkimport.BulkImportStorage.BulkImportUserStatus;
import io.supertokens.pluginInterface.bulkimport.BulkImportUser;
import io.supertokens.pluginInterface.bulkimport.BulkImportUserInfo;
import io.supertokens.pluginInterface.exceptions.StorageQueryException;
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
import io.supertokens.storage.postgresql.Start;
import io.supertokens.storage.postgresql.config.Config;
import io.supertokens.storage.postgresql.utils.Utils;

public class BulkImportQueries {
static String getQueryToCreateBulkImportUsersTable(Start start) {
String schema = Config.getConfig(start).getTableSchema();
String tableName = Config.getConfig(start).getBulkImportUsersTable();
return "CREATE TABLE IF NOT EXISTS " + tableName + " ("
+ "id CHAR(36),"
+ "app_id VARCHAR(64) NOT NULL DEFAULT 'public',"
+ "raw_data TEXT NOT NULL,"
+ "status VARCHAR(128) DEFAULT 'NEW',"
+ "error_msg TEXT,"
+ "created_at BIGINT DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP),"
+ "updated_at BIGINT DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP),"
+ "CONSTRAINT " + Utils.getConstraintName(schema, tableName, null, "pkey")
+ " PRIMARY KEY(app_id, id),"
+ "CONSTRAINT " + Utils.getConstraintName(schema, tableName, "app_id", "fkey") + " "
+ "FOREIGN KEY(app_id) "
+ "REFERENCES " + Config.getConfig(start).getAppsTable() + " (app_id) ON DELETE CASCADE"
+ " );";
}

public static String getQueryToCreateStatusUpdatedAtIndex(Start start) {
return "CREATE INDEX IF NOT EXISTS bulk_import_users_status_updated_at_index ON "
+ Config.getConfig(start).getBulkImportUsersTable() + " (app_id, status, updated_at)";
}

public static String getQueryToCreateCreatedAtIndex(Start start) {
return "CREATE INDEX IF NOT EXISTS bulk_import_users_created_at_index ON "
+ Config.getConfig(start).getBulkImportUsersTable() + " (app_id, created_at)";
}

public static void insertBulkImportUsers(Start start, AppIdentifier appIdentifier, List<BulkImportUser> users)
throws SQLException, StorageQueryException {
StringBuilder queryBuilder = new StringBuilder(
"INSERT INTO " + Config.getConfig(start).getBulkImportUsersTable() + " (id, app_id, raw_data) VALUES ");

int userCount = users.size();

for (int i = 0; i < userCount; i++) {
queryBuilder.append(" (?, ?, ?)");

if (i < userCount - 1) {
queryBuilder.append(",");
}
}

update(start, queryBuilder.toString(), pst -> {
int parameterIndex = 1;
for (BulkImportUser user : users) {
pst.setString(parameterIndex++, user.id);
pst.setString(parameterIndex++, appIdentifier.getAppId());
pst.setString(parameterIndex++, user.toString());
}
});
}

public static void updateBulkImportUserStatus_Transaction(Start start, Connection con, AppIdentifier appIdentifier, @Nonnull String[] bulkImportUserIds, @Nonnull BulkImportUserStatus status)
throws SQLException, StorageQueryException {
if (bulkImportUserIds.length == 0) {
return;
}

String baseQuery = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable() + " SET status = ?, updated_at = EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) WHERE app_id = ?";
StringBuilder queryBuilder = new StringBuilder(baseQuery);

List<Object> parameters = new ArrayList<>();

parameters.add(status.toString());
parameters.add(appIdentifier.getAppId());

queryBuilder.append(" AND id IN (");
for (int i = 0; i < bulkImportUserIds.length; i++) {
if (i != 0) {
queryBuilder.append(", ");
}
queryBuilder.append("?");
parameters.add(bulkImportUserIds[i]);
}
queryBuilder.append(")");

String query = queryBuilder.toString();

update(con, query, pst -> {
for (int i = 0; i < parameters.size(); i++) {
pst.setObject(i + 1, parameters.get(i));
}
});
}

public static List<BulkImportUserInfo> getBulkImportUsers(Start start, AppIdentifier appIdentifier, @Nonnull Integer limit, @Nullable BulkImportUserStatus status,
@Nullable String bulkImportUserId, @Nullable Long createdAt)
throws SQLException, StorageQueryException {

String baseQuery = "SELECT * FROM " + Config.getConfig(start).getBulkImportUsersTable();

StringBuilder queryBuilder = new StringBuilder(baseQuery);
List<Object> parameters = new ArrayList<>();

queryBuilder.append(" WHERE app_id = ?");
parameters.add(appIdentifier.getAppId());

if (status != null) {
queryBuilder.append(" AND status = ?");
parameters.add(status.toString());
}

if (bulkImportUserId != null && createdAt != null) {
queryBuilder
.append(" AND created_at < ? OR (created_at = ? AND id <= ?)");
parameters.add(createdAt);
parameters.add(createdAt);
parameters.add(bulkImportUserId);
}

queryBuilder.append(" ORDER BY created_at DESC, id DESC LIMIT ?");
parameters.add(limit);

String query = queryBuilder.toString();

return execute(start, query, pst -> {
for (int i = 0; i < parameters.size(); i++) {
pst.setObject(i + 1, parameters.get(i));
}
}, result -> {
List<BulkImportUserInfo> bulkImportUsers = new ArrayList<>();
while (result.next()) {
bulkImportUsers.add(BulkImportUserInfoRowMapper.getInstance().mapOrThrow(result));
}
return bulkImportUsers;
});
}

private static class BulkImportUserInfoRowMapper implements RowMapper<BulkImportUserInfo, ResultSet> {
private static final BulkImportUserInfoRowMapper INSTANCE = new BulkImportUserInfoRowMapper();

private BulkImportUserInfoRowMapper() {
}

private static BulkImportUserInfoRowMapper getInstance() {
return INSTANCE;
}

@Override
public BulkImportUserInfo map(ResultSet result) throws Exception {
return new BulkImportUserInfo(result.getString("id"), result.getString("raw_data"),
BulkImportUserStatus.valueOf(result.getString("status")),
result.getLong("created_at"), result.getLong("updated_at"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,14 @@ public static void createTablesIfNotExists(Start start) throws SQLException, Sto
update(start, TOTPQueries.getQueryToCreateTenantIdIndexForUsedCodesTable(start), NO_OP_SETTER);
}

if (!doesTableExists(start, Config.getConfig(start).getBulkImportUsersTable())) {
getInstance(start).addState(CREATING_NEW_TABLE, null);
update(start, BulkImportQueries.getQueryToCreateBulkImportUsersTable(start), NO_OP_SETTER);
// index:
update(start, BulkImportQueries.getQueryToCreateStatusUpdatedAtIndex(start), NO_OP_SETTER);
update(start, BulkImportQueries.getQueryToCreateCreatedAtIndex(start), NO_OP_SETTER);
}

} catch (Exception e) {
if (e.getMessage().contains("schema") && e.getMessage().contains("does not exist")
&& numberOfRetries < 1) {
Expand Down Expand Up @@ -576,7 +584,14 @@ public static void deleteAllTables(Start start) throws SQLException, StorageQuer
String DROP_QUERY = "DROP INDEX IF EXISTS all_auth_recipe_users_pagination_index";
update(start, DROP_QUERY, NO_OP_SETTER);
}

{
String DROP_QUERY = "DROP INDEX IF EXISTS bulk_import_users_status_updated_at_index";
update(start, DROP_QUERY, NO_OP_SETTER);
}
{
String DROP_QUERY = "DROP INDEX IF EXISTS bulk_import_users_created_at_index";
update(start, DROP_QUERY, NO_OP_SETTER);
}
{
String DROP_QUERY = "DROP TABLE IF EXISTS "
+ getConfig(start).getAppsTable() + ","
Expand Down Expand Up @@ -613,7 +628,8 @@ public static void deleteAllTables(Start start) throws SQLException, StorageQuer
+ getConfig(start).getDashboardSessionsTable() + ","
+ getConfig(start).getTotpUsedCodesTable() + ","
+ getConfig(start).getTotpUserDevicesTable() + ","
+ getConfig(start).getTotpUsersTable();
+ getConfig(start).getTotpUsersTable() + ","
+ getConfig(start).getBulkImportUsersTable();
update(start, DROP_QUERY, NO_OP_SETTER);
}
}
Expand Down

0 comments on commit 6eccbf6

Please sign in to comment.