diff --git a/src/main/java/liquibase/ext/opensearch/database/OpenSearchConnection.java b/src/main/java/liquibase/ext/opensearch/database/OpenSearchConnection.java index 54730ba..d005195 100644 --- a/src/main/java/liquibase/ext/opensearch/database/OpenSearchConnection.java +++ b/src/main/java/liquibase/ext/opensearch/database/OpenSearchConnection.java @@ -5,6 +5,7 @@ import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; +import lombok.SneakyThrows; import org.apache.hc.client5.http.auth.AuthScope; import org.apache.hc.client5.http.auth.UsernamePasswordCredentials; import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider; @@ -28,10 +29,15 @@ import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.sql.Driver; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.Properties; +import java.util.stream.Collectors; import static liquibase.ext.opensearch.database.OpenSearchLiquibaseDatabase.OPENSEARCH_PREFIX; +import static liquibase.ext.opensearch.database.OpenSearchLiquibaseDatabase.OPENSEARCH_URI_SEPARATOR; @Getter @Setter @@ -41,7 +47,7 @@ public class OpenSearchConnection extends AbstractNoSqlConnection { private OpenSearchClient openSearchClient; private Optional openSearchVersion = Optional.empty(); - private URI uri; + private List uris; private Properties connectionProperties; @Override @@ -62,18 +68,29 @@ public void open(final String url, final Driver driverObject, final Properties d this.connectionProperties = driverProperties; try { - this.uri = new URI(realUrl); + this.uris = Arrays.stream(realUrl.split(OPENSEARCH_URI_SEPARATOR)) + .map(this::toUri) + .filter(Objects::nonNull) + .toList(); this.connect(); } catch (final Exception e) { throw new DatabaseException("Could not open connection to database: " + realUrl, e); } } + private URI toUri(String uri) { + try { + return URI.create(uri); + } catch (IllegalArgumentException ex) { + return null; + } + } + @Override public void close() throws DatabaseException { this.openSearchClient = null; this.connectionProperties = null; - this.uri = null; + this.uris = null; } @Override @@ -88,7 +105,9 @@ public String getDatabaseProductName() throws DatabaseException { @Override public String getURL() { - return this.uri.toString(); + return this.uris.stream() + .map(URI::toString) + .collect(Collectors.joining(OPENSEARCH_URI_SEPARATOR)); } @Override @@ -101,21 +120,21 @@ public boolean isClosed() throws DatabaseException { return this.openSearchClient == null; } - private void connect() throws DatabaseException { - final HttpHost host = HttpHost.create(this.uri); + private void connect() { + final var hosts = this.uris.stream().map(HttpHost::create).toList(); + final var hostsArray = hosts.toArray(HttpHost[]::new); final var transport = ApacheHttpClient5TransportBuilder - .builder(host) + .builder(hostsArray) .setHttpClientConfigCallback(httpClientBuilder -> { // TODO: support other credential providers final var username = Optional.ofNullable(this.connectionProperties.getProperty("user")); final var password = Optional.ofNullable(this.connectionProperties.getProperty("password")); if (username.isPresent()) { - final BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials(new AuthScope(host), - new UsernamePasswordCredentials(username.get(), password.orElse("").toCharArray())); - + final var credentialsProvider = new BasicCredentialsProvider(); + final var credentials = new UsernamePasswordCredentials(username.get(), password.orElse("").toCharArray()); + hosts.forEach(host -> credentialsProvider.setCredentials(new AuthScope(host), credentials)); httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } else if (password.isPresent()) { throw new RuntimeException("password provided but username not set!"); diff --git a/src/main/java/liquibase/ext/opensearch/database/OpenSearchLiquibaseDatabase.java b/src/main/java/liquibase/ext/opensearch/database/OpenSearchLiquibaseDatabase.java index 23a40fc..f88ce66 100644 --- a/src/main/java/liquibase/ext/opensearch/database/OpenSearchLiquibaseDatabase.java +++ b/src/main/java/liquibase/ext/opensearch/database/OpenSearchLiquibaseDatabase.java @@ -10,6 +10,7 @@ public class OpenSearchLiquibaseDatabase extends AbstractNoSqlDatabase { public static final String PRODUCT_NAME = "OpenSearch"; public static final String PRODUCT_SHORT_NAME = "opensearch"; public static final String OPENSEARCH_PREFIX = PRODUCT_SHORT_NAME + ":"; + public static final String OPENSEARCH_URI_SEPARATOR = ","; @Override public void dropDatabaseObjects(final CatalogAndSchema schemaToDrop) throws LiquibaseException { diff --git a/src/test/java/liquibase/ext/opensearch/AbstractOpenSearchLiquibaseIT.java b/src/test/java/liquibase/ext/opensearch/AbstractOpenSearchLiquibaseIT.java index b3ea5e3..b1cf852 100644 --- a/src/test/java/liquibase/ext/opensearch/AbstractOpenSearchLiquibaseIT.java +++ b/src/test/java/liquibase/ext/opensearch/AbstractOpenSearchLiquibaseIT.java @@ -25,10 +25,12 @@ @Testcontainers public abstract class AbstractOpenSearchLiquibaseIT { protected OpenSearchLiquibaseDatabase database; - private OpenSearchConnection connection; + protected OpenSearchConnection connection; + + protected static final String OPENSEARCH_DOCKER_IMAGE_NAME = "opensearchproject/opensearch:2.18.0"; @Container - public OpensearchContainer container = new OpensearchContainer<>(DockerImageName.parse("opensearchproject/opensearch:2.18.0")); + protected OpensearchContainer container = new OpensearchContainer<>(DockerImageName.parse(OPENSEARCH_DOCKER_IMAGE_NAME)); @SneakyThrows diff --git a/src/test/java/liquibase/ext/opensearch/MultipleOpenSearchNodesLiquibaseIT.java b/src/test/java/liquibase/ext/opensearch/MultipleOpenSearchNodesLiquibaseIT.java new file mode 100644 index 0000000..c040119 --- /dev/null +++ b/src/test/java/liquibase/ext/opensearch/MultipleOpenSearchNodesLiquibaseIT.java @@ -0,0 +1,34 @@ +package liquibase.ext.opensearch; + +import liquibase.database.DatabaseFactory; +import liquibase.ext.opensearch.database.OpenSearchConnection; +import liquibase.ext.opensearch.database.OpenSearchLiquibaseDatabase; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class MultipleOpenSearchNodesLiquibaseIT extends AbstractOpenSearchLiquibaseIT { + @SneakyThrows + @BeforeEach + @Override + protected void beforeEach() { + // if we launch two testcontainers they can't see each other and thus don't form a cluster => just use the same URL twice to show that it's being accepted + // note: don't use the constants here to detect if we ever change them (to make sure that we actively decide on doing a breaking change rather than making it by mistake). + final String url = "opensearch:" + this.container.getHttpHostAddress() + "," + this.container.getHttpHostAddress(); + final String username = container.getUsername(); + final String password = container.getPassword(); + database = (OpenSearchLiquibaseDatabase) DatabaseFactory.getInstance().openDatabase(url, username, password, null, null); + connection = (OpenSearchConnection) this.database.getConnection(); + } + + @SneakyThrows + @Test + void itCreatesTheChangelogAndLockIndices() { + this.doLiquibaseUpdate("liquibase/ext/changelog.empty.yaml"); + assertThat(this.indexExists(this.database.getDatabaseChangeLogLockTableName())).isTrue(); + assertThat(this.indexExists(this.database.getDatabaseChangeLogTableName())).isTrue(); + } + +}