Skip to content

Commit

Permalink
fix: fixing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tamassoltesz committed Nov 26, 2024
1 parent 11bd067 commit ec02abb
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 26 deletions.
4 changes: 2 additions & 2 deletions src/main/java/io/supertokens/storage/postgresql/Start.java
Original file line number Diff line number Diff line change
Expand Up @@ -2299,7 +2299,7 @@ public void addRoleToUser(TenantIdentifier tenantIdentifier, String userId, Stri

@Override
public void addRolesToUsers_Transaction(TransactionConnection connection,
Map<TenantIdentifier, Map<String, String>> rolesToUserByTenants)
Map<TenantIdentifier, Map<String, List<String>>> rolesToUserByTenants)
throws StorageQueryException {
try {
UserRolesQueries.addRolesToUsers_Transaction(this, (Connection) connection.getConnection(), rolesToUserByTenants);
Expand Down Expand Up @@ -2514,7 +2514,7 @@ public boolean doesRoleExist_Transaction(AppIdentifier appIdentifier, Transactio
}

@Override
public List<Boolean> doesMultipleRoleExist_Transaction(AppIdentifier appIdentifier, TransactionConnection con,
public List<String> doesMultipleRoleExist_Transaction(AppIdentifier appIdentifier, TransactionConnection con,
List<String> roles) throws StorageQueryException {
Connection sqlCon = (Connection) con.getConnection();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,14 @@ public static List<BulkImportUser> getBulkImportUsersAndChangeStatusToProcessing
return start.startTransaction(con -> {
Connection sqlCon = (Connection) con.getConnection();
try {
// NOTE: On average, we take about 60 seconds to process 10k users. If, for any reason, the bulk import users were marked as processing but couldn't be processed within 10 minutes, we'll attempt to process them again.

// "FOR UPDATE" ensures that multiple cron jobs don't read the same rows simultaneously.
// If one process locks the first 1000 rows, others will wait for the lock to be released.
// "SKIP LOCKED" allows other processes to skip locked rows and select the next 1000 available rows.
String selectQuery = "SELECT * FROM " + Config.getConfig(start).getBulkImportUsersTable()
+ " WHERE app_id = ?"
+ " AND (status = 'NEW' OR (status = 'PROCESSING' AND updated_at < (EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000) - 10 * 60 * 1000))" /* 10 mins */
//+ " AND (status = 'NEW' OR (status = 'PROCESSING' AND updated_at < (EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000) - 10 * 60 * 1000))" /* 10 mins */
+ " AND (status = 'NEW' OR status = 'PROCESSING' )"
+ " LIMIT ? FOR UPDATE SKIP LOCKED";

List<BulkImportUser> bulkImportUsers = new ArrayList<>();
Expand Down Expand Up @@ -249,7 +249,6 @@ public static List<BulkImportUser> getBulkImportUsers(Start start, AppIdentifier

public static List<String> deleteBulkImportUsers(Start start, AppIdentifier appIdentifier,
@Nonnull String[] bulkImportUserIds) throws SQLException, StorageQueryException {
System.out.println("Deleting bulkimportuser ids: " + bulkImportUserIds.length);
if (bulkImportUserIds.length == 0) {
return new ArrayList<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ public static void updateMultipleUsersIsEmailVerified_Transaction(Start start, C
int counter = 0;
for(Map.Entry<String, String> emailToUser : emailToUserIds.entrySet()){
insertQuery.setString(1, appIdentifier.getAppId());
insertQuery.setString(2, emailToUser.getValue());
insertQuery.setString(3, emailToUser.getKey());
insertQuery.setString(2, emailToUser.getKey());
insertQuery.setString(3, emailToUser.getValue());
insertQuery.addBatch();

counter++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,25 +204,26 @@ public static int addRoleToUser(Start start, TenantIdentifier tenantIdentifier,
});
}

public static void addRolesToUsers_Transaction(Start start, Connection connection, Map<TenantIdentifier, Map<String, String>> rolesToUserByTenants) //tenant -> user -> role
public static void addRolesToUsers_Transaction(Start start, Connection connection, Map<TenantIdentifier, Map<String, List<String>>> rolesToUserByTenants) //tenant -> user -> role
throws SQLException, StorageQueryException {
String QUERY = "INSERT INTO " + getConfig(start).getUserRolesTable()
+ "(app_id, tenant_id, user_id, role) VALUES(?, ?, ?, ?);";
PreparedStatement insertStatement = connection.prepareStatement(QUERY);

int counter = 0;
for(Map.Entry<TenantIdentifier, Map<String, String>> tenantsEntry : rolesToUserByTenants.entrySet()) {
for(Map.Entry<String, String> rolesToUser : tenantsEntry.getValue().entrySet()) {

insertStatement.setString(1, tenantsEntry.getKey().getAppId());
insertStatement.setString(2, tenantsEntry.getKey().getTenantId());
insertStatement.setString(3, rolesToUser.getKey());
insertStatement.setString(4, rolesToUser.getValue());
insertStatement.addBatch();
counter++;

if(counter % 100 == 0) {
insertStatement.executeBatch();
for(Map.Entry<TenantIdentifier, Map<String, List<String>>> tenantsEntry : rolesToUserByTenants.entrySet()) {
for(Map.Entry<String, List<String>> rolesToUser : tenantsEntry.getValue().entrySet()) {
for(String roleForUser : rolesToUser.getValue()){
insertStatement.setString(1, tenantsEntry.getKey().getAppId());
insertStatement.setString(2, tenantsEntry.getKey().getTenantId());
insertStatement.setString(3, rolesToUser.getKey());
insertStatement.setString(4, roleForUser);
insertStatement.addBatch();
counter++;

if(counter % 100 == 0) {
insertStatement.executeBatch();
}
}
}
}
Expand Down Expand Up @@ -315,7 +316,7 @@ public static boolean doesRoleExist_transaction(Start start, Connection con, App
}, ResultSet::next);
}

public static List<Boolean> doesMultipleRoleExist_transaction(Start start, Connection con, AppIdentifier appIdentifier,
public static List<String> doesMultipleRoleExist_transaction(Start start, Connection con, AppIdentifier appIdentifier,
List<String> roles)
throws SQLException, StorageQueryException {
String QUERY = "SELECT role FROM " + getConfig(start).getRolesTable()
Expand All @@ -326,15 +327,11 @@ public static List<Boolean> doesMultipleRoleExist_transaction(Start start, Conne
pst.setString(2+i, roles.get(i));
}
}, result -> {
List<Boolean> rolesExistsAnswer = new ArrayList<>();
List<String> rolesFound = new ArrayList<>();
while(result.next()){
rolesFound.add(result.getString("role"));
}
for(String role : roles){
rolesExistsAnswer.add(rolesFound.contains(role));
}
return rolesExistsAnswer;
return rolesFound;
});
}

Expand Down

0 comments on commit ec02abb

Please sign in to comment.