Skip to content

Commit

Permalink
Activate auto querying of all goals
Browse files Browse the repository at this point in the history
  • Loading branch information
hvarg committed Apr 4, 2024
1 parent 1c7f3b3 commit 1e6cd9f
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 52 deletions.
11 changes: 8 additions & 3 deletions server/src/main/java/org/diskproject/server/quering/Match.java
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,16 @@ private List<WorkflowInstantiation> createWorkflowInstance (WorkflowSeed seed) {
if (queryResult == null || queryVariables == null || !ready || queryResult.size() == 0 || queryVariables.size() == 0) {
return null;
}
List<DataResult> filteredResults = filterQueryResults(selectVariables);
List<DataResult> filteredResults = filterQueryResults(seedVariables);
Set<String> filteredVariables = new HashSet<String>();
for (String varSeed: seedVariables) {
filteredVariables.add(varSeed.substring(1));
}

// One seed can create multiple instances. As the results are a table, we need to aggregate the results.
int runs = 0;
Map<String,Integer> ticks = new HashMap<String,Integer>();
for (String name: this.queryVariables) {
for (String name: filteredVariables) {
ticks.put(name, 0);
String lastValue = null;
boolean isArray = arrayVariables.contains("?" + name);
Expand Down Expand Up @@ -276,7 +281,7 @@ private List<WorkflowInstantiation> createWorkflowInstance (WorkflowSeed seed) {
newBindingValues.add(csvURL);
} else if (wfBiding.startsWith("?")) {
String name = wfBiding.substring(1);
if (queryVariables.contains(name)) {
if (filteredVariables.contains(name)) {
if (arrayVariables.contains(wfBiding)) {
//Is array, send a list with all values.
for (DataResult cell: resultsToBind) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,17 @@ public Map<LineOfInquiry, List<Map<String, String>>> getLOIByHypothesisId(String
// return tlois;
//}

public void queryAllGoals () {
for (Goal goal: this.listGoals()) {
String localId = DiskDB.getLocalId(goal.getId());
try {
this.queryGoal(localId);
} catch (Exception e) {
System.err.println("Error querying goal " + localId);
}
}
}

public List<TriggeredLOI> queryGoal (String id) throws Exception, QueryParseException {
System.out.println("Quering goal: " + id);
Goal goal = this.getGoal(id);
Expand All @@ -505,7 +516,7 @@ public List<TriggeredLOI> queryGoal (String id) throws Exception, QueryParseExce
String template = loiMatch.createQueryTemplate();
if (dataSource != null && template != null) {
String query = this.getAllPrefixes() + "SELECT DISTINCT " +
String.join(" ", loiMatch.seedVariables)
String.join(" ", loiMatch.selectVariables)
+ " WHERE { \n" + template + "\n}";
String cacheId = dataSource.getId() + "|" + query;
if (!queryCache.containsKey(cacheId)) {
Expand Down
58 changes: 12 additions & 46 deletions server/src/main/java/org/diskproject/server/threads/DataThread.java
Original file line number Diff line number Diff line change
@@ -1,56 +1,22 @@
package org.diskproject.server.threads;

import org.diskproject.server.repository.DiskRepository;

public class DataThread implements Runnable {
DiskRepository disk;
public DataThread(DiskRepository disk) {
this.disk = disk;
}

@Override
public void run() {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'run'");
System.out.println("[D] Running data monitor thread");
disk.queryAllGoals();
}

// All of this needs to be reworked to clear caches and re run queries.
/*public class DataMonitor implements Runnable {
boolean stop;
ScheduledFuture<?> scheduledFuture;
public DataMonitor() {
stop = false;
scheduledFuture = monitor.scheduleWithFixedDelay(this, 0, 1, TimeUnit.DAYS);
public void stop() {
while (!Thread.interrupted()) {
Thread.currentThread().interrupt();
}
public void run() {
System.out.println("[D] Running data monitor thread");
try {
Thread.sleep(5000);
if (stop) {
scheduledFuture.cancel(false);
while (!Thread.currentThread().isInterrupted()) {
Thread.currentThread().interrupt();
}
} else if (!this.equals(dataThread)) {
stop();
return;
} else {
// Re-run all hypothesis FIXME:
// runAllHypotheses("admin");
}
} catch (Exception e) {
scheduledFuture.cancel(false);
while (!Thread.interrupted()) {
stop = true;
Thread.currentThread().interrupt();
}
}
}
public void stop() {
while (!Thread.interrupted()) {
stop = true;
scheduledFuture.cancel(false);
Thread.currentThread().interrupt();
}
}
}*/

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,29 @@

import org.diskproject.server.managers.MethodAdapterManager;
import org.diskproject.server.repository.DiskRepository;
import org.diskproject.shared.classes.adapters.MethodAdapter;
import org.diskproject.shared.classes.common.Status;
import org.diskproject.shared.classes.loi.LineOfInquiry;
import org.diskproject.shared.classes.loi.TriggeredLOI;
import org.diskproject.shared.classes.workflow.Execution;
import org.diskproject.shared.classes.workflow.VariableBinding;
import org.diskproject.shared.classes.workflow.WorkflowInstantiation;
import org.diskproject.shared.classes.workflow.WorkflowSeed;
import org.diskproject.shared.classes.workflow.WorkflowVariable;

public class ThreadManager {
protected MethodAdapterManager methodAdapters;
private DiskRepository disk;
protected ScheduledExecutorService monitor;
protected ExecutorService executor;
private DataThread dataThread;

public ThreadManager (MethodAdapterManager methodAdapters, DiskRepository disk) {
this.disk = disk;
this.methodAdapters = methodAdapters;
this.executor = Executors.newFixedThreadPool(2);
this.monitor = Executors.newScheduledThreadPool(0);

this.dataThread = new DataThread(disk);
this.monitor.scheduleWithFixedDelay(this.dataThread, 0, 1, TimeUnit.DAYS);
}

public void shutdownExecutors () {
Expand Down

0 comments on commit 1e6cd9f

Please sign in to comment.