Skip to content

Commit

Permalink
support multiple URLs in connection string
Browse files Browse the repository at this point in the history
the underlying OpenSearch library supports multiple URLs, thus we should
do the same here as well.

since Liquibase has no concept of this on its own we have to use a
workaround and use a character to concatenate multiple URLs together.
`,` is being used for this as it's normally not part of URLs and it's
already used in the OpenSearch client for this purpose.
the `opensearch:` prefix should not be repeated.

i.e. a URL with multiple addresses should be defined like this:
```
opensearch:http://localhost:9200,http://localhost:9201,http://localhost:9203
```
the login information is the same for all, since they must form a
cluster.
  • Loading branch information
rursprung committed Nov 22, 2024
1 parent f97cb86 commit 8adb9b9
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.SneakyThrows;
import org.apache.hc.client5.http.auth.AuthScope;
import org.apache.hc.client5.http.auth.UsernamePasswordCredentials;
import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
Expand All @@ -28,10 +29,15 @@
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.sql.Driver;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;

import static liquibase.ext.opensearch.database.OpenSearchLiquibaseDatabase.OPENSEARCH_PREFIX;
import static liquibase.ext.opensearch.database.OpenSearchLiquibaseDatabase.OPENSEARCH_URI_SEPARATOR;

@Getter
@Setter
Expand All @@ -41,7 +47,7 @@ public class OpenSearchConnection extends AbstractNoSqlConnection {
private OpenSearchClient openSearchClient;
private Optional<OpenSearchVersionInfo> openSearchVersion = Optional.empty();

private URI uri;
private List<URI> uris;
private Properties connectionProperties;

@Override
Expand All @@ -62,18 +68,29 @@ public void open(final String url, final Driver driverObject, final Properties d
this.connectionProperties = driverProperties;

try {
this.uri = new URI(realUrl);
this.uris = Arrays.stream(realUrl.split(OPENSEARCH_URI_SEPARATOR))
.map(this::toUri)
.filter(Objects::nonNull)
.toList();
this.connect();
} catch (final Exception e) {
throw new DatabaseException("Could not open connection to database: " + realUrl, e);
}
}

private URI toUri(String uri) {
try {
return URI.create(uri);
} catch (IllegalArgumentException ex) {
return null;
}
}

@Override
public void close() throws DatabaseException {
this.openSearchClient = null;
this.connectionProperties = null;
this.uri = null;
this.uris = null;
}

@Override
Expand All @@ -88,7 +105,9 @@ public String getDatabaseProductName() throws DatabaseException {

@Override
public String getURL() {
return this.uri.toString();
return this.uris.stream()
.map(URI::toString)
.collect(Collectors.joining(OPENSEARCH_URI_SEPARATOR));
}

@Override
Expand All @@ -101,21 +120,21 @@ public boolean isClosed() throws DatabaseException {
return this.openSearchClient == null;
}

private void connect() throws DatabaseException {
final HttpHost host = HttpHost.create(this.uri);
private void connect() {
final var hosts = this.uris.stream().map(HttpHost::create).toList();
final var hostsArray = hosts.toArray(HttpHost[]::new);

final var transport = ApacheHttpClient5TransportBuilder
.builder(host)
.builder(hostsArray)
.setHttpClientConfigCallback(httpClientBuilder -> {
// TODO: support other credential providers
final var username = Optional.ofNullable(this.connectionProperties.getProperty("user"));
final var password = Optional.ofNullable(this.connectionProperties.getProperty("password"));

if (username.isPresent()) {
final BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(new AuthScope(host),
new UsernamePasswordCredentials(username.get(), password.orElse("").toCharArray()));

final var credentialsProvider = new BasicCredentialsProvider();
final var credentials = new UsernamePasswordCredentials(username.get(), password.orElse("").toCharArray());
hosts.forEach(host -> credentialsProvider.setCredentials(new AuthScope(host), credentials));
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
} else if (password.isPresent()) {
throw new RuntimeException("password provided but username not set!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public class OpenSearchLiquibaseDatabase extends AbstractNoSqlDatabase {
public static final String PRODUCT_NAME = "OpenSearch";
public static final String PRODUCT_SHORT_NAME = "opensearch";
public static final String OPENSEARCH_PREFIX = PRODUCT_SHORT_NAME + ":";
public static final String OPENSEARCH_URI_SEPARATOR = ",";

@Override
public void dropDatabaseObjects(final CatalogAndSchema schemaToDrop) throws LiquibaseException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
@Testcontainers
public abstract class AbstractOpenSearchLiquibaseIT {
protected OpenSearchLiquibaseDatabase database;
private OpenSearchConnection connection;
protected OpenSearchConnection connection;

protected static final String OPENSEARCH_DOCKER_IMAGE_NAME = "opensearchproject/opensearch:2.18.0";

@Container
public OpensearchContainer<?> container = new OpensearchContainer<>(DockerImageName.parse("opensearchproject/opensearch:2.18.0"));
protected OpensearchContainer<?> container = new OpensearchContainer<>(DockerImageName.parse(OPENSEARCH_DOCKER_IMAGE_NAME));


@SneakyThrows
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package liquibase.ext.opensearch;

import liquibase.database.DatabaseFactory;
import liquibase.ext.opensearch.database.OpenSearchConnection;
import liquibase.ext.opensearch.database.OpenSearchLiquibaseDatabase;
import lombok.SneakyThrows;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

class MultipleOpenSearchNodesLiquibaseIT extends AbstractOpenSearchLiquibaseIT {
@SneakyThrows
@BeforeEach
@Override
protected void beforeEach() {
// if we launch two testcontainers they can't see each other and thus don't form a cluster => just use the same URL twice to show that it's being accepted
// note: don't use the constants here to detect if we ever change them (to make sure that we actively decide on doing a breaking change rather than making it by mistake).
final String url = "opensearch:" + this.container.getHttpHostAddress() + "," + this.container.getHttpHostAddress();
final String username = container.getUsername();
final String password = container.getPassword();
database = (OpenSearchLiquibaseDatabase) DatabaseFactory.getInstance().openDatabase(url, username, password, null, null);
connection = (OpenSearchConnection) this.database.getConnection();
}

@SneakyThrows
@Test
void itCreatesTheChangelogAndLockIndices() {
this.doLiquibaseUpdate("liquibase/ext/changelog.empty.yaml");
assertThat(this.indexExists(this.database.getDatabaseChangeLogLockTableName())).isTrue();
assertThat(this.indexExists(this.database.getDatabaseChangeLogTableName())).isTrue();
}

}

0 comments on commit 8adb9b9

Please sign in to comment.