Skip to content

Commit

Permalink
fix: fixes and error handling changes
Browse files Browse the repository at this point in the history
  • Loading branch information
tamassoltesz committed Nov 22, 2024
1 parent 57f7d04 commit 11bd067
Show file tree
Hide file tree
Showing 13 changed files with 1,020 additions and 137 deletions.
309 changes: 290 additions & 19 deletions src/main/java/io/supertokens/storage/postgresql/Start.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
import io.supertokens.storage.postgresql.Start;
import io.supertokens.storage.postgresql.config.Config;
import io.supertokens.storage.postgresql.utils.Utils;
import org.jetbrains.annotations.TestOnly;

import java.sql.Connection;
import java.sql.SQLException;

import org.jetbrains.annotations.TestOnly;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static io.supertokens.storage.postgresql.QueryExecutorTemplate.execute;
import static io.supertokens.storage.postgresql.QueryExecutorTemplate.update;
Expand Down Expand Up @@ -133,6 +135,30 @@ public static Long getLastActiveByUserId(Start start, AppIdentifier appIdentifie
}
}

public static Map<String, Long> getLastActiveByMultipleUserIds(Start start, AppIdentifier appIdentifier, List<String> userIds)
throws StorageQueryException {
String QUERY = "SELECT user_id, last_active_time FROM " + Config.getConfig(start).getUserLastActiveTable()
+ " WHERE app_id = ? AND user_id IN ( " + Utils.generateCommaSeperatedQuestionMarks(userIds.size())+ " )";

try {
return execute(start, QUERY, pst -> {
pst.setString(1, appIdentifier.getAppId());
for (int i = 0; i < userIds.size(); i++) {
pst.setString(2+i, userIds.get(i));
}
}, res -> {
Map<String, Long> lastActiveByUserIds = new HashMap<>();
if (res.next()) {
String userId = res.getString("user_id");
lastActiveByUserIds.put(userId, res.getLong("last_active_time"));
}
return lastActiveByUserIds;
});
} catch (SQLException e) {
throw new StorageQueryException(e);
}
}

public static void deleteUserActive_Transaction(Connection con, Start start, AppIdentifier appIdentifier,
String userId)
throws StorageQueryException, SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static io.supertokens.storage.postgresql.QueryExecutorTemplate.execute;
import static io.supertokens.storage.postgresql.QueryExecutorTemplate.update;
Expand Down Expand Up @@ -121,6 +123,32 @@ public static void updateBulkImportUserStatus_Transaction(Start start, Connectio
});
}

public static void updateMultipleBulkImportUsersStatusToError_Transaction(Start start, Connection con, AppIdentifier appIdentifier,
@Nonnull Map<String,String> bulkImportUserIdToErrorMessage)
throws SQLException {
BULK_IMPORT_USER_STATUS errorStatus = BULK_IMPORT_USER_STATUS.FAILED;
String query = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable()
+ " SET status = ?, error_msg = ?, updated_at = ? WHERE app_id = ? and id = ?";

PreparedStatement setErrorStatement = con.prepareStatement(query);

int counter = 0;
for(String bulkImportUserId : bulkImportUserIdToErrorMessage.keySet()){
setErrorStatement.setString(1, errorStatus.toString());
setErrorStatement.setString(2, bulkImportUserIdToErrorMessage.get(bulkImportUserId));
setErrorStatement.setLong(3, System.currentTimeMillis());
setErrorStatement.setString(4, appIdentifier.getAppId());
setErrorStatement.setString(5, bulkImportUserId);
setErrorStatement.addBatch();

if(counter % 100 == 0) {
setErrorStatement.executeBatch();
}
}

setErrorStatement.executeBatch();
}

public static List<BulkImportUser> getBulkImportUsersAndChangeStatusToProcessing(Start start,
AppIdentifier appIdentifier,
@Nonnull Integer limit)
Expand All @@ -129,7 +157,7 @@ public static List<BulkImportUser> getBulkImportUsersAndChangeStatusToProcessing
return start.startTransaction(con -> {
Connection sqlCon = (Connection) con.getConnection();
try {
// NOTE: On average, we take about 66 seconds to process 1000 users. If, for any reason, the bulk import users were marked as processing but couldn't be processed within 10 minutes, we'll attempt to process them again.
// NOTE: On average, we take about 60 seconds to process 10k users. If, for any reason, the bulk import users were marked as processing but couldn't be processed within 10 minutes, we'll attempt to process them again.

// "FOR UPDATE" ensures that multiple cron jobs don't read the same rows simultaneously.
// If one process locks the first 1000 rows, others will wait for the lock to be released.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,94 +336,81 @@ public static AuthRecipeUserInfo signUp(Start start, TenantIdentifier tenantIden
});
}

public static void signUpMultiple(Start start, List<EmailPasswordImportUser> usersToSignUp)
throws StorageQueryException, StorageTransactionLogicException {
start.startTransaction(con -> {
Connection sqlCon = (Connection) con.getConnection();
try {
String app_id_to_user_id_QUERY = "INSERT INTO " + getConfig(start).getAppIdToUserIdTable()
+ "(app_id, user_id, primary_or_recipe_user_id, recipe_id)" + " VALUES(?, ?, ?, ?)";
public static void signUpMultipleForBulkImport_Transaction(Start start, Connection sqlCon, List<EmailPasswordImportUser> usersToSignUp)
throws StorageQueryException, StorageTransactionLogicException, SQLException {
try {
String app_id_to_user_id_QUERY = "INSERT INTO " + getConfig(start).getAppIdToUserIdTable()
+ "(app_id, user_id, primary_or_recipe_user_id, recipe_id)" + " VALUES(?, ?, ?, ?)";

String all_auth_recipe_users_QUERY = "INSERT INTO " + getConfig(start).getUsersTable() +
"(app_id, tenant_id, user_id, primary_or_recipe_user_id, recipe_id, time_joined, " +
"primary_or_recipe_user_time_joined)" +
" VALUES(?, ?, ?, ?, ?, ?, ?)";

String emailpassword_users_QUERY = "INSERT INTO " + getConfig(start).getEmailPasswordUsersTable()
+ "(app_id, user_id, email, password_hash, time_joined)" + " VALUES(?, ?, ?, ?, ?)";

String emailpassword_users_to_tenant_QUERY =
"INSERT INTO " + getConfig(start).getEmailPasswordUserToTenantTable()
+ "(app_id, tenant_id, user_id, email)" + " VALUES(?, ?, ?, ?)";

String all_auth_recipe_users_QUERY = "INSERT INTO " + getConfig(start).getUsersTable() +
"(app_id, tenant_id, user_id, primary_or_recipe_user_id, recipe_id, time_joined, " +
"primary_or_recipe_user_time_joined)" +
" VALUES(?, ?, ?, ?, ?, ?, ?)";

String emailpassword_users_QUERY = "INSERT INTO " + getConfig(start).getEmailPasswordUsersTable()
+ "(app_id, user_id, email, password_hash, time_joined)" + " VALUES(?, ?, ?, ?, ?)";

String emailpassword_users_to_tenant_QUERY = "INSERT INTO " + getConfig(start).getEmailPasswordUserToTenantTable()
+ "(app_id, tenant_id, user_id, email)" + " VALUES(?, ?, ?, ?)";

PreparedStatement appIdToUserId = sqlCon.prepareStatement(app_id_to_user_id_QUERY);
PreparedStatement allAuthRecipeUsers = sqlCon.prepareStatement(all_auth_recipe_users_QUERY);
PreparedStatement emailPasswordUsers = sqlCon.prepareStatement(emailpassword_users_QUERY);
PreparedStatement emailPasswordUsersToTenant = sqlCon.prepareStatement(emailpassword_users_to_tenant_QUERY);

int counter = 0;
for(EmailPasswordImportUser user: usersToSignUp) {
String userId = user.userId;
TenantIdentifier tenantIdentifier = user.tenantIdentifier;

appIdToUserId.setString(1, tenantIdentifier.getAppId());
appIdToUserId.setString(2, userId);
appIdToUserId.setString(3, userId);
appIdToUserId.setString(4, EMAIL_PASSWORD.toString());
appIdToUserId.addBatch();


allAuthRecipeUsers.setString(1, tenantIdentifier.getAppId());
allAuthRecipeUsers.setString(2, tenantIdentifier.getTenantId());
allAuthRecipeUsers.setString(3, userId);
allAuthRecipeUsers.setString(4, userId);
allAuthRecipeUsers.setString(5, EMAIL_PASSWORD.toString());
allAuthRecipeUsers.setLong(6, user.timeJoinedMSSinceEpoch);
allAuthRecipeUsers.setLong(7, user.timeJoinedMSSinceEpoch);
allAuthRecipeUsers.addBatch();

emailPasswordUsers.setString(1, tenantIdentifier.getAppId());
emailPasswordUsers.setString(2, userId);
emailPasswordUsers.setString(3, user.email);
emailPasswordUsers.setString(4, user.passwordHash);
emailPasswordUsers.setLong(5, user.timeJoinedMSSinceEpoch);
emailPasswordUsers.addBatch();

emailPasswordUsersToTenant.setString(1, tenantIdentifier.getAppId());
emailPasswordUsersToTenant.setString(2, tenantIdentifier.getTenantId());
emailPasswordUsersToTenant.setString(3, userId);
emailPasswordUsersToTenant.setString(4, user.email);
emailPasswordUsersToTenant.addBatch();
counter++;
if(counter % 100 == 0) {
appIdToUserId.executeBatch();
allAuthRecipeUsers.executeBatch();
emailPasswordUsers.executeBatch();
emailPasswordUsersToTenant.executeBatch();
}
PreparedStatement appIdToUserId = sqlCon.prepareStatement(app_id_to_user_id_QUERY);
PreparedStatement allAuthRecipeUsers = sqlCon.prepareStatement(all_auth_recipe_users_QUERY);
PreparedStatement emailPasswordUsers = sqlCon.prepareStatement(emailpassword_users_QUERY);
PreparedStatement emailPasswordUsersToTenant = sqlCon.prepareStatement(emailpassword_users_to_tenant_QUERY);

int counter = 0;
for (EmailPasswordImportUser user : usersToSignUp) {
String userId = user.userId;
TenantIdentifier tenantIdentifier = user.tenantIdentifier;

appIdToUserId.setString(1, tenantIdentifier.getAppId());
appIdToUserId.setString(2, userId);
appIdToUserId.setString(3, userId);
appIdToUserId.setString(4, EMAIL_PASSWORD.toString());
appIdToUserId.addBatch();


allAuthRecipeUsers.setString(1, tenantIdentifier.getAppId());
allAuthRecipeUsers.setString(2, tenantIdentifier.getTenantId());
allAuthRecipeUsers.setString(3, userId);
allAuthRecipeUsers.setString(4, userId);
allAuthRecipeUsers.setString(5, EMAIL_PASSWORD.toString());
allAuthRecipeUsers.setLong(6, user.timeJoinedMSSinceEpoch);
allAuthRecipeUsers.setLong(7, user.timeJoinedMSSinceEpoch);
allAuthRecipeUsers.addBatch();

emailPasswordUsers.setString(1, tenantIdentifier.getAppId());
emailPasswordUsers.setString(2, userId);
emailPasswordUsers.setString(3, user.email);
emailPasswordUsers.setString(4, user.passwordHash);
emailPasswordUsers.setLong(5, user.timeJoinedMSSinceEpoch);
emailPasswordUsers.addBatch();

emailPasswordUsersToTenant.setString(1, tenantIdentifier.getAppId());
emailPasswordUsersToTenant.setString(2, tenantIdentifier.getTenantId());
emailPasswordUsersToTenant.setString(3, userId);
emailPasswordUsersToTenant.setString(4, user.email);
emailPasswordUsersToTenant.addBatch();
counter++;
if (counter % 100 == 0) {
appIdToUserId.executeBatch();
allAuthRecipeUsers.executeBatch();
emailPasswordUsers.executeBatch();
emailPasswordUsersToTenant.executeBatch();
}
}

//execute the remaining ones
appIdToUserId.executeBatch();
allAuthRecipeUsers.executeBatch();
emailPasswordUsers.executeBatch();
emailPasswordUsersToTenant.executeBatch();
//execute the remaining ones
appIdToUserId.executeBatch();
allAuthRecipeUsers.executeBatch();
emailPasswordUsers.executeBatch();
emailPasswordUsersToTenant.executeBatch();

//UserInfoPartial userInfo = new UserInfoPartial(userId, email, passwordHash, timeJoined);
// fillUserInfoWithTenantIds_transaction(start, sqlCon, tenantIdentifier.toAppIdentifier(), userInfo);
// fillUserInfoWithVerified_transaction(start, sqlCon, tenantIdentifier.toAppIdentifier(), userInfo);
sqlCon.commit();
//return AuthRecipeUserInfo.create(userId, false, userInfo.toLoginMethod());
} catch (SQLException throwables) {
throwables.printStackTrace(System.out);
SQLException next = throwables.getNextException();
while(next != null) {
next.printStackTrace(System.out);
next = next.getNextException();
}
throw new StorageTransactionLogicException(throwables);
}
return null;
});
sqlCon.commit();
} catch (SQLException throwables) {
throw new StorageTransactionLogicException(throwables);
}
}

public static void deleteUser_Transaction(Connection sqlCon, Start start, AppIdentifier appIdentifier,
Expand Down Expand Up @@ -569,6 +556,30 @@ public static String lockEmail_Transaction(Start start, Connection con,
});
}

public static List<String> lockEmail_Transaction(Start start, Connection con,
AppIdentifier appIdentifier,
List<String> emails)
throws StorageQueryException, SQLException {
if(emails == null || emails.isEmpty()){
return new ArrayList<>();
}
String QUERY = "SELECT user_id FROM " + getConfig(start).getEmailPasswordUsersTable() +
" WHERE app_id = ? AND email IN (" + Utils.generateCommaSeperatedQuestionMarks(emails.size()) + ") FOR UPDATE";

return execute(con, QUERY, pst -> {
pst.setString(1, appIdentifier.getAppId());
for (int i = 0; i < emails.size(); i++) {
pst.setString(2 + i, emails.get(i));
}
}, result -> {
List<String> results = new ArrayList<>();
while (result.next()) {
results.add(result.getString("user_id"));
}
return results;
});
}

public static String getPrimaryUserIdUsingEmail(Start start, TenantIdentifier tenantIdentifier,
String email)
throws StorageQueryException, SQLException {
Expand Down Expand Up @@ -612,6 +623,33 @@ public static List<String> getPrimaryUserIdsUsingEmail_Transaction(Start start,
});
}

public static List<String> getPrimaryUserIdsUsingMultipleEmails_Transaction(Start start, Connection con,
AppIdentifier appIdentifier,
List<String> emails)
throws StorageQueryException, SQLException {
if(emails.isEmpty()){
return new ArrayList<>();
}
String QUERY = "SELECT DISTINCT all_users.primary_or_recipe_user_id AS user_id "
+ "FROM " + getConfig(start).getEmailPasswordUsersTable() + " AS ep" +
" JOIN " + getConfig(start).getAppIdToUserIdTable() + " AS all_users" +
" ON ep.app_id = all_users.app_id AND ep.user_id = all_users.user_id" +
" WHERE ep.app_id = ? AND ep.email IN ( " + Utils.generateCommaSeperatedQuestionMarks(emails.size()) + " )";

return execute(con, QUERY, pst -> {
pst.setString(1, appIdentifier.getAppId());
for (int i = 0; i < emails.size(); i++) {
pst.setString(2+i, emails.get(i));
}
}, result -> {
List<String> userIds = new ArrayList<>();
while (result.next()) {
userIds.add(result.getString("user_id"));
}
return userIds;
});
}

public static boolean addUserIdToTenant_Transaction(Start start, Connection sqlCon,
TenantIdentifier tenantIdentifier, String userId)
throws SQLException, StorageQueryException, UnknownUserIdException {
Expand Down
Loading

0 comments on commit 11bd067

Please sign in to comment.