Skip to content

Commit

Permalink
Merge pull request #2444 from telefonicaid/fix/arcgis_exception_to_cy…
Browse files Browse the repository at this point in the history
…gnus_persistence_error

Fix/arcgis exception to cygnus persistence error to allow batch retries
  • Loading branch information
fgalan authored Nov 20, 2024
2 parents eb0eb4c + edcceb2 commit 1dec6be
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
- [cygnus-ngsi][cygnus-common] Fix way to handle CygnusPersistenceException to allow batch retries in arcgis-sink if `.batch_ttl` configured
- [cygnus-ngsi][cygnus-common] Remove new line chars from Arcgis logs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ protected int featuresBatched() {
* @return The persistence backend
* @throws CygnusRuntimeError
*/
protected ArcgisFeatureTable getPersistenceBackend(String featureServiceUrl) throws CygnusRuntimeError {
protected ArcgisFeatureTable getPersistenceBackend(String featureServiceUrl) throws CygnusPersistenceError, CygnusRuntimeError {

LOGGER.debug("Current persistenceBackend has " + arcgisPersistenceBackend.size() +" tables ");

Expand All @@ -187,7 +187,7 @@ protected ArcgisFeatureTable getPersistenceBackend(String featureServiceUrl) thr

if (newTable.hasError() || !newTable.connected()) {
LOGGER.error("Error creating new persistence Backend. " + newTable.getErrorDesc());
throw new CygnusRuntimeError("[" + this.getName() + "Error creating Persistence backend: "
throw new CygnusPersistenceError("[" + this.getName() + "Error creating Persistence backend: "
+ newTable.getErrorCode() + " - " + newTable.getErrorDesc());
} else {
arcgisPersistenceBackend.put(featureServiceUrl, newTable);
Expand All @@ -199,7 +199,7 @@ protected ArcgisFeatureTable getPersistenceBackend(String featureServiceUrl) thr
LOGGER.error("Error creating new persistence Backend. " +e.getClass().getSimpleName());
LOGGER.debug(stackTrace);

throw new CygnusRuntimeError("Error creating new persistence Backend. ", e.getClass().getName(),
throw new CygnusPersistenceError("Error creating new persistence Backend. ", e.getClass().getName(),
e.getMessage());
}
}
Expand Down Expand Up @@ -306,34 +306,29 @@ void persistBatch(NGSIBatch batch)
return;
} // if

try {
// Iterate on the destinations
batch.startIterator();
// Iterate on the destinations
batch.startIterator();

while (batch.hasNext()) {
String destination = batch.getNextDestination();
LOGGER.debug(
"[" + this.getName() + "] Processing sub-batch regarding the " + destination + " destination");
while (batch.hasNext()) {
String destination = batch.getNextDestination();
LOGGER.debug(
"[" + this.getName() + "] Processing sub-batch regarding the " + destination + " destination");

// Get the events within the current sub-batch
ArrayList<NGSIEvent> events = batch.getNextEvents();
// Get the events within the current sub-batch
ArrayList<NGSIEvent> events = batch.getNextEvents();

// Get an aggregator for this destination and initialize it
NGSIArcgisAggregator aggregator = new NGSIArcgisAggregator(getrAcgisServicesUrl(), enableNameMappings);

for (NGSIEvent event : events) {
aggregator.aggregate(event);
} // for
// Get an aggregator for this destination and initialize it
NGSIArcgisAggregator aggregator = new NGSIArcgisAggregator(getrAcgisServicesUrl(), enableNameMappings);

// Persist the aggregation
persistAggregation(aggregator);
batch.setNextPersisted(true);
for (NGSIEvent event : events) {
aggregator.aggregate(event);
} // for

} // while
} catch (Exception e) {
LOGGER.error("[" + this.getName() + "] Error persisting batch, " + e.getClass().getSimpleName() + "." + e.getMessage());
throw new CygnusRuntimeError(e.getMessage());
}
// Persist the aggregation
persistAggregation(aggregator);
batch.setNextPersisted(true);

} // while
} // persistBatch

/*
Expand Down Expand Up @@ -396,7 +391,7 @@ public void expirateRecords(long expirationTime) throws CygnusExpiratingError {
* @param aggregator
* @throws CygnusRuntimeError
*/
public void persistAggregation(NGSIArcgisAggregator aggregator) throws CygnusRuntimeError {
public void persistAggregation(NGSIArcgisAggregator aggregator) throws CygnusPersistenceError, CygnusRuntimeError, CygnusBadContextData {
try {
List<ArcgisAggregatorDomain> aggregationList = aggregator.getListArcgisAggregatorDomain();
LOGGER.debug("[" + this.getName() + "] persisting aggregation, "
Expand All @@ -422,8 +417,16 @@ public void persistAggregation(NGSIArcgisAggregator aggregator) throws CygnusRun
}
} catch (CygnusRuntimeError e) {
String stackTrace = ExceptionUtils.getFullStackTrace(e);
LOGGER.debug(" PersistAggregation Error: " + stackTrace);
LOGGER.debug(" PersistAggregation CygnusRuntimeError: " + stackTrace);
throw (e);
} catch (CygnusPersistenceError e) {
String stackTrace = ExceptionUtils.getFullStackTrace(e);
LOGGER.debug(" PersistAggregation CygnusPersistenceError: " + stackTrace);
throw (e);
} catch (ArcgisException e) {
LOGGER.error("[" + this.getName() + "] Error persisting batch, " + e.getClass().getSimpleName() + " - "
+ e.getMessage());
throw new CygnusPersistenceError(e.getMessage());
} catch (Exception e) {
LOGGER.error("[" + this.getName() + "] Error persisting batch, " + e.getClass().getSimpleName() + " - "
+ e.getMessage());
Expand Down

0 comments on commit 1dec6be

Please sign in to comment.