Skip to content

Commit

Permalink
Allow different rdf-syntax in flowfile
Browse files Browse the repository at this point in the history
  • Loading branch information
GordianDziwis committed Feb 14, 2019
1 parent 65ac30a commit b3e5809
Showing 1 changed file with 19 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.jena.rdfconnection.RDFConnectionFactory;
import org.apache.jena.riot.Lang;
import org.apache.jena.riot.RDFDataMgr;
import org.apache.jena.riot.RDFLanguages;
import org.apache.jena.riot.out.SinkQuadOutput;
import org.apache.jena.riot.out.SinkTripleOutput;
import org.apache.jena.shared.PrefixMapping;
Expand Down Expand Up @@ -100,6 +101,10 @@ public interface FLOW_FILE_CONTENTS {
public static final AllowableValue NON_RDF_DATA = new AllowableValue(FLOW_FILE_CONTENTS.NON_RDF_DATA);
public static final AllowableValue EMPTY = new AllowableValue(FLOW_FILE_CONTENTS.EMPTY);

public static final AllowableValue TURTLE = new AllowableValue(Lang.TURTLE.getLabel());
public static final AllowableValue NT = new AllowableValue(Lang.NT.getLabel());
public static final AllowableValue JSONLD = new AllowableValue(Lang.JSONLD.getLabel());

public static final PropertyDescriptor BASE_URI = new PropertyDescriptor.Builder()
.name("BASE_URI")
.displayName("Base URI")
Expand All @@ -116,9 +121,18 @@ public interface FLOW_FILE_CONTENTS {
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();

public static final PropertyDescriptor RDF_DATA_INPUT_SYNTAX = new PropertyDescriptor.Builder()
.name("RDF_DATA_INPUT_SYNTAX")
.displayName("RDF Data Input Syntax")
.description("RDF-Syntax of the FlowFile content, only used when Content of flow file is set to rdf-data")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.allowableValues(TURTLE, NT, JSONLD)
.defaultValue(TURTLE.getValue())
.build();

public static final PropertyDescriptor CONTENT_FLOW_FILE = new PropertyDescriptor.Builder()
.name("Content of FlowFile")
.displayName("Content of the processors input FlowFile")
.name("CONTENT_FLOW_FILE")
.displayName("Content of FlowFile")
.description("Content of the processors input FlowFile")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
Expand All @@ -138,6 +152,7 @@ protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(BASE_URI);
descriptors.add(CONTENT_FLOW_FILE);
descriptors.add(RDF_DATA_INPUT_SYNTAX);
descriptors.add(SPARQL_QUERY_PROPERTY);
this.descriptors = Collections.unmodifiableList(descriptors);

Expand Down Expand Up @@ -165,6 +180,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session

final String contentFlowFile = context.getProperty(CONTENT_FLOW_FILE).getValue();
final String baseUri = context.getProperty(BASE_URI).getValue();
final String rdfDataInputSyntax = context.getProperty(RDF_DATA_INPUT_SYNTAX).getValue();
final AtomicReference<Stream<SparqlStmt>> stmts = new AtomicReference<>();

FlowFile flowFile = session.get();
Expand Down Expand Up @@ -193,7 +209,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
RDFDataMgr.read(dataset, in, baseUri, Lang.TURTLE);
RDFDataMgr.read(dataset, in, baseUri, RDFLanguages.nameToLang(rdfDataInputSyntax));
}
});
break;
Expand Down

0 comments on commit b3e5809

Please sign in to comment.