Skip to content

Commit

Permalink
Reactive Refactoring: Namespace, DataSource, Collector (#587)
Browse files Browse the repository at this point in the history
  • Loading branch information
DementevNikita authored Apr 29, 2022
1 parent 7be183b commit 5c7eef4
Show file tree
Hide file tree
Showing 75 changed files with 9,893 additions and 4,011 deletions.
4 changes: 4 additions & 0 deletions odd-platform-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ dependencies {
jooqGenerationContainer("org.testcontainers:postgresql:$testContainersVersion")
}

bootRun {
jvmArgs = ["-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005"]
}

springBoot {
buildInfo()
}
Expand Down
2 changes: 1 addition & 1 deletion odd-platform-api/gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
oddIngestionApiVersion=0.1.9
oddIngestionApiVersion=0.1.10
oddrnGeneratorVersion=0.1.8

flywayVersion=7.8.2
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package org.opendatadiscovery.oddplatform.auth.filter;

import lombok.extern.slf4j.Slf4j;
import org.opendatadiscovery.oddplatform.dto.DataSourceDto;
import org.opendatadiscovery.oddplatform.dto.CollectorDto;
import org.opendatadiscovery.oddplatform.exception.NotFoundException;
import org.opendatadiscovery.oddplatform.ingestion.contract.model.DataEntityList;
import org.opendatadiscovery.oddplatform.repository.DataSourceRepository;
import org.opendatadiscovery.oddplatform.repository.reactive.ReactiveCollectorRepository;
import org.opendatadiscovery.oddplatform.repository.reactive.ReactiveDataSourceRepository;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpMethod;
Expand All @@ -14,18 +14,20 @@
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.core.publisher.Mono;

@Component
@ConditionalOnProperty(value = "auth.ingestion.filter.enabled", havingValue = "true")
@Slf4j
public class IngestionDataEntitiesFilter extends AbstractIngestionFilter {

private final DataSourceRepository dataSourceRepository;
private final ReactiveDataSourceRepository dataSourceRepository;
private final ReactiveCollectorRepository collectorRepository;

public IngestionDataEntitiesFilter(final DataSourceRepository dataSourceRepository) {
public IngestionDataEntitiesFilter(final ReactiveDataSourceRepository dataSourceRepository,
final ReactiveCollectorRepository collectorRepository) {
super(new PathPatternParserServerWebExchangeMatcher("/ingestion/entities", HttpMethod.POST));
this.dataSourceRepository = dataSourceRepository;
this.collectorRepository = collectorRepository;
}

@Override
Expand All @@ -34,18 +36,31 @@ protected ServerHttpRequestDecorator getRequestDecorator(final ServerWebExchange
@Override
public Flux<DataBuffer> getBody() {
return super.getBody().collectList()
.publishOn(Schedulers.boundedElastic())
.doOnNext(dataBuffer -> {
.flatMapMany(dataBuffer -> {
final DataEntityList body = readBody(dataBuffer, DataEntityList.class);
final String token = resolveToken(exchange.getRequest());
final DataSourceDto dataSourceDto = dataSourceRepository.getByOddrn(body.getDataSourceOddrn())
.orElseThrow(() -> new NotFoundException(
String.format("DataSource with oddrn %s doesn't exist", body.getDataSourceOddrn())
));
if (!dataSourceDto.token().tokenPojo().getValue().equals(token)) {
throw new AccessDeniedException("Token is not correct");
}
}).flatMapIterable(list -> list);

return dataSourceRepository.getDtoByOddrn(body.getDataSourceOddrn())
.switchIfEmpty(Mono.error(new NotFoundException(
"DataSource with oddrn %s doesn't exist".formatted(body.getDataSourceOddrn()))))
.flatMap(dto -> {
if (dto.token() != null) {
return Mono.just(dto.token());
} else {
return collectorRepository.getDto(dto.dataSource().getCollectorId())
.switchIfEmpty(Mono.error(new NotFoundException(
"Collector with id %s doesn't exist".formatted(dto.dataSource()
.getCollectorId()))))
.map(CollectorDto::tokenDto);
}
})
.doOnNext(dto -> {
if (!dto.tokenPojo().getValue().equals(token)) {
throw new AccessDeniedException("Token is not correct");
}
})
.flatMapIterable(ignored -> dataBuffer);
});
}
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,21 @@
package org.opendatadiscovery.oddplatform.auth.filter;

import lombok.extern.slf4j.Slf4j;
import org.opendatadiscovery.oddplatform.dto.CollectorDto;
import org.opendatadiscovery.oddplatform.exception.NotFoundException;
import org.opendatadiscovery.oddplatform.ingestion.contract.model.DataSourceList;
import org.opendatadiscovery.oddplatform.repository.CollectorRepository;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import java.nio.file.AccessDeniedException;
import org.opendatadiscovery.oddplatform.repository.reactive.ReactiveCollectorRepository;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpMethod;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.security.access.AccessDeniedException;
import org.springframework.security.web.server.util.matcher.PathPatternParserServerWebExchangeMatcher;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.core.publisher.Mono;

@Component
@ConditionalOnProperty(value = "auth.ingestion.filter.enabled", havingValue = "true")
@Slf4j
public class IngestionDataSourceFilter extends AbstractIngestionFilter {
private final ReactiveCollectorRepository collectorRepository;

private final CollectorRepository collectorRepository;

public IngestionDataSourceFilter(final CollectorRepository collectorRepository) {
public IngestionDataSourceFilter(final ReactiveCollectorRepository collectorRepository) {
super(new PathPatternParserServerWebExchangeMatcher("/ingestion/datasources", HttpMethod.POST));
this.collectorRepository = collectorRepository;
}
Expand All @@ -34,18 +26,17 @@ protected ServerHttpRequestDecorator getRequestDecorator(final ServerWebExchange
@Override
public Flux<DataBuffer> getBody() {
return super.getBody().collectList()
.publishOn(Schedulers.boundedElastic())
.doOnNext(dataBuffer -> {
final DataSourceList body = readBody(dataBuffer, DataSourceList.class);
.flatMapMany(dataBuffer -> {
final String token = resolveToken(exchange.getRequest());
final CollectorDto collectorDto = collectorRepository.getByOddrn(body.getProviderOddrn())
.orElseThrow(() -> new NotFoundException(
String.format("Collector with oddrn %s doesn't exist", body.getProviderOddrn())
));
if (!collectorDto.tokenDto().tokenPojo().getValue().equals(token)) {
throw new AccessDeniedException("Token is not correct");
}
}).flatMapIterable(list -> list);

return collectorRepository.getByToken(token)
.switchIfEmpty(
Mono.error(new AccessDeniedException("Collector with such token doesn't exist")))
.zipWith(exchange.getSession())
.doOnNext(t -> t.getT2().getAttributes()
.put(SessionConstants.COLLECTOR_ID_SESSION_KEY, t.getT1().getId()))
.flatMapIterable(i -> dataBuffer);
});
}
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.opendatadiscovery.oddplatform.auth.filter;

public class SessionConstants {
public static String COLLECTOR_ID_SESSION_KEY = "collectorId";
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.opendatadiscovery.oddplatform.controller;

import lombok.RequiredArgsConstructor;
import org.opendatadiscovery.oddplatform.api.contract.api.CollectorApi;
import org.opendatadiscovery.oddplatform.api.contract.model.Collector;
import org.opendatadiscovery.oddplatform.api.contract.model.CollectorFormData;
Expand All @@ -12,42 +13,41 @@
import reactor.core.publisher.Mono;

@RestController
public class CollectorController extends AbstractCRUDController<Collector, CollectorList, CollectorFormData,
CollectorUpdateFormData, CollectorService> implements CollectorApi {

public CollectorController(final CollectorService collectorService) {
super(collectorService);
}
@RequiredArgsConstructor
public class CollectorController implements CollectorApi {
private final CollectorService collectorService;

@Override
public Mono<ResponseEntity<CollectorList>> getCollectorsList(final Integer page, final Integer size,
public Mono<ResponseEntity<CollectorList>> getCollectorsList(final Integer page,
final Integer size,
final String query,
final ServerWebExchange exchange) {
return list(page, size, query);
return collectorService.list(page, size, query).map(ResponseEntity::ok);
}

@Override
public Mono<ResponseEntity<Collector>> registerCollector(final Mono<CollectorFormData> collectorFormData,
final ServerWebExchange exchange) {
return create(collectorFormData);
return collectorFormData.flatMap(collectorService::create).map(ResponseEntity::ok);
}

@Override
public Mono<ResponseEntity<Collector>> updateCollector(final Long collectorId,
final Mono<CollectorUpdateFormData> collectorUpdateFormData,
final ServerWebExchange exchange) {
return update(collectorId, collectorUpdateFormData);
return collectorUpdateFormData
.flatMap(form -> collectorService.update(collectorId, form))
.map(ResponseEntity::ok);
}

@Override
public Mono<ResponseEntity<Void>> deleteCollector(final Long collectorId, final ServerWebExchange exchange) {
return delete(collectorId);
return collectorService.delete(collectorId).map(ign -> ResponseEntity.noContent().build());
}

@Override
public Mono<ResponseEntity<Collector>> regenerateCollectorToken(final Long collectorId,
final ServerWebExchange exchange) {
return entityService.regenerateDataSourceToken(collectorId)
.map(ResponseEntity::ok);
return collectorService.regenerateToken(collectorId).map(ResponseEntity::ok);
}
}
Original file line number Diff line number Diff line change
@@ -1,78 +1,65 @@
package org.opendatadiscovery.oddplatform.controller;

import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import lombok.RequiredArgsConstructor;
import org.opendatadiscovery.oddplatform.api.contract.api.DataSourceApi;
import org.opendatadiscovery.oddplatform.api.contract.model.DataSource;
import org.opendatadiscovery.oddplatform.api.contract.model.DataSourceFormData;
import org.opendatadiscovery.oddplatform.api.contract.model.DataSourceList;
import org.opendatadiscovery.oddplatform.api.contract.model.DataSourceUpdateFormData;
import org.opendatadiscovery.oddplatform.service.DataSourceService;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@RestController
public class DataSourceController
extends
AbstractCRUDController<DataSource, DataSourceList, DataSourceFormData,
DataSourceUpdateFormData, DataSourceService>
implements DataSourceApi {

public DataSourceController(final DataSourceService entityService) {
super(entityService);
}
@RequiredArgsConstructor
public class DataSourceController implements DataSourceApi {
private final DataSourceService dataSourceService;

@Override
public Mono<ResponseEntity<Void>> deleteDataSource(final Long dataSourceId, final ServerWebExchange exchange) {
return delete(dataSourceId);
public Mono<ResponseEntity<DataSourceList>> getDataSourceList(final Integer page, final Integer size,
final String query,
final ServerWebExchange exchange) {
return dataSourceService
.list(page, size, query)
.map(ResponseEntity::ok);
}

@Override
public Mono<ResponseEntity<DataSourceList>> getDataSourceList(
@NotNull @Valid final Integer page,
@NotNull @Valid final Integer size,
@Valid final String query,
final ServerWebExchange exchange
) {
return list(page, size, query);
public Mono<ResponseEntity<Flux<DataSource>>> getActiveDataSourceList(final ServerWebExchange exchange) {
return Mono.just(ResponseEntity.ok(dataSourceService.listActive()));
}

@Override
public Mono<ResponseEntity<Flux<DataSource>>> getActiveDataSourceList(final ServerWebExchange exchange) {
final Flux<DataSource> response = entityService.listActive()
.subscribeOn(Schedulers.boundedElastic());

return Mono.just(ResponseEntity.ok(response));
public Mono<ResponseEntity<DataSource>> registerDataSource(final Mono<DataSourceFormData> dataSourceFormData,
final ServerWebExchange exchange) {
return dataSourceFormData
.flatMap(dataSourceService::create)
.map(ResponseEntity::ok);
}

@Override
public Mono<ResponseEntity<DataSource>> registerDataSource(
@Valid final Mono<DataSourceFormData> dataSourceFormData,
final ServerWebExchange exchange
) {
return create(dataSourceFormData);
public Mono<ResponseEntity<DataSource>> updateDataSource(final Long dataSourceId,
final Mono<DataSourceUpdateFormData> formData,
final ServerWebExchange exchange) {
return formData
.flatMap(form -> dataSourceService.update(dataSourceId, form))
.map(ResponseEntity::ok);
}

@Override
public Mono<ResponseEntity<DataSource>> updateDataSource(
final Long dataSourceId,
@Valid final Mono<DataSourceUpdateFormData> dataSourceUpdateFormData,
final ServerWebExchange exchange
) {
return update(dataSourceId, dataSourceUpdateFormData);
public Mono<ResponseEntity<Void>> deleteDataSource(final Long dataSourceId, final ServerWebExchange exchange) {
return dataSourceService.delete(dataSourceId)
.then(Mono.just(ResponseEntity.noContent().build()));
}

@Override
public Mono<ResponseEntity<DataSource>> regenerateDataSourceToken(
final Long dataSourceId,
final ServerWebExchange exchange
) {
return entityService.regenerateDataSourceToken(dataSourceId)
.map(entity -> new ResponseEntity<>(entity, HttpStatus.OK));
public Mono<ResponseEntity<DataSource>> regenerateDataSourceToken(final Long dataSourceId,
final ServerWebExchange exchange) {
return dataSourceService
.regenerateDataSourceToken(dataSourceId)
.map(ResponseEntity::ok);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import javax.validation.Valid;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.opendatadiscovery.oddplatform.auth.filter.SessionConstants;
import org.opendatadiscovery.oddplatform.ingestion.contract.api.IngestionApi;
import org.opendatadiscovery.oddplatform.ingestion.contract.model.DataEntityList;
import org.opendatadiscovery.oddplatform.ingestion.contract.model.DataSourceList;
Expand Down Expand Up @@ -36,9 +37,18 @@ public Mono<ResponseEntity<Void>> postDataEntityList(
@Override
public Mono<ResponseEntity<Void>> createDataSource(@Valid final Mono<DataSourceList> dataSourceList,
final ServerWebExchange exchange) {
final Mono<Long> collectorIdMono = exchange.getSession()
.map(ws -> {
final Object collectorId = ws.getAttribute(SessionConstants.COLLECTOR_ID_SESSION_KEY);
if (collectorId == null) {
throw new IllegalStateException("Collector id is null");
}
return collectorId;
})
.cast(Long.class);
return dataSourceList
.publishOn(Schedulers.boundedElastic())
.flatMap(dataSourceIngestionService::createDataSourcesFromIngestion)
.map(ignored -> ResponseEntity.ok().build());
.zipWhen(l -> collectorIdMono)
.flatMapMany(t -> dataSourceIngestionService.createDataSources(t.getT2(), t.getT1()))
.then(Mono.just(ResponseEntity.ok().build()));
}
}
Loading

0 comments on commit 5c7eef4

Please sign in to comment.