Skip to content

Commit

Permalink
Merge branch 'icwe2021' into development
Browse files Browse the repository at this point in the history
# Conflicts:
#	CHANGELOG.md
  • Loading branch information
ghsnd committed May 19, 2021
2 parents b2697b5 + 550b201 commit fac534c
Show file tree
Hide file tree
Showing 72 changed files with 1,218 additions and 258 deletions.
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,15 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## 2.1.1 - ?

### Added
* Support for using Web of Things descriptions in logical soure and logical target, as described in [Van Assche et al](https://link.springer.com/chapter/10.1007/978-3-030-74296-6_26)
and [Target in RML specification](https://rml.io/specs/rml-target).
The current imlementation is a proof-of-concept. As WoT data source RMLStreamer supports MQTT streams;
as logical target a file dump is supported.

### Changed
* Update Flink from version 1.11.3 to 1.12.2
* Updated JsonSurfer from version 1.5.1 to 1.6.0
* Updated Flink from version 1.11.3 to 1.12.2

## [2.1.0] - 2020-03-18

Expand Down
2 changes: 1 addition & 1 deletion documentation/README_Functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,4 @@ to load and bind every function as specified in the testcase's `mapping.ttl`.
# Remarks
- When the RMLStreamer is unable to find a function description or function mapping, bind method parameters to values, it will be logged as an error to the console
and the function will not be applied.


2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ SOFTWARE.
<dependency>
<groupId>com.github.jsurfer</groupId>
<artifactId>jsurfer-jackson</artifactId>
<version>1.5.1</version>
<version>1.6.0</version>
<exclusions>
<!-- provided by Jena -->
<exclusion>
Expand Down
80 changes: 49 additions & 31 deletions src/main/scala/io/rml/framework/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ package io.rml.framework


import io.rml.framework.api.{FnOEnvironment, RMLEnvironment}
import io.rml.framework.core.extractors.TriplesMapsCache
import io.rml.framework.core.extractors.NodeCache
import io.rml.framework.core.internal.Logging
import io.rml.framework.core.item.{EmptyItem, Item, JoinedItem}
import io.rml.framework.core.model._
Expand All @@ -37,6 +37,7 @@ import io.rml.framework.engine._
import io.rml.framework.engine.statement.StatementEngine
import io.rml.framework.flink.connector.kafka.{RMLPartitioner, UniversalKafkaConnectorFactory}
import io.rml.framework.flink.function.{FnOEnvironmentLoader, FnOEnvironmentStreamLoader, RichItemIdentityFunction, RichStreamItemIdentityFunction}
import io.rml.framework.flink.sink.{RichMQTTSink, TargetSinkFactory}
import io.rml.framework.flink.source.{FileDataSet, Source}
import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema}
import org.apache.flink.api.scala._
Expand Down Expand Up @@ -176,6 +177,10 @@ object Main extends Logging {
.build()
stream.addSink(sink).name("Streaming file sink")
}
else if (config.outputSink.equals(OutputSinkOption.MQTT)) {
val sink = new RichMQTTSink(config.broker.get, config.topic.get)
stream.addSink(sink)
}
// discard output if the parameter is given
else if (config.outputSink.equals(OutputSinkOption.None)) {
stream.addSink(output => {}).name("No output sink")
Expand Down Expand Up @@ -235,31 +240,26 @@ object Main extends Logging {
new RichStreamItemIdentityFunction()
}

// Create sinks for every logical target
val logicalTargetId2Sinks = TargetSinkFactory.createStreamSinksFromLogicalTargetCache()

// This is the collection of all data streams that are created by the current mapping
val processedStreams: immutable.Iterable[DataStream[String]] =
val processedStreams: immutable.Iterable[DataStream[String]] = {
sourceEngineMap.map(entry => {
val source = entry._1.asInstanceOf[io.rml.framework.flink.source.Stream]
val engine = entry._2
// link the different steps in each pipeline
source.stream // this will generate a stream of items
val dataStream = source.stream // this will generate a stream of items
// process every item by a processor with a loaded engine

.map(preProcessingFunction)
.map(new StdStreamProcessor(engine))
.name("Execute mapping statements on items")

// format every list of triples (as strings)
.flatMap(
list => {
if (list.nonEmpty) {
Some(list.reduce((a, b) => a + "\n" + b) + "\n\n")
} else {
None
}
}
)
.name("Convert triples to strings")
// add sinks to the data stream
TargetSinkFactory.appendSinksToStream(logicalTargetId2Sinks, dataStream)
})
}

// union all streams to one final stream
unionStreams(processedStreams)
Expand Down Expand Up @@ -349,17 +349,13 @@ object Main extends Logging {

// the "normal" scenario.
val engine = StatementEngine.fromTriplesMaps(List(triplesMap))
stream
val dataStream = stream
.map(new StdStreamProcessor(engine))
.name("Execute mapping statements on items")

// format every list of triples (as strings)
.flatMap(list =>
if (list.nonEmpty) {
Some(list.reduce((a, b) => a + "\n" + b) + "\n\n")
} else None
)
.name("Convert triples to strings")
val logicalTargetId2Sinks = TargetSinkFactory.createStreamSinksFromLogicalTargetCache()

TargetSinkFactory.appendSinksToStream(logicalTargetId2Sinks, dataStream)
}

})
Expand All @@ -374,7 +370,7 @@ object Main extends Logging {

formattedMapping.joinedSteamTriplesMaps.foreach(joinedTm => {
// identify the parent triples map
val parentTm = TriplesMapsCache.get(joinedTm.parentTriplesMap).get;
val parentTm = NodeCache.getTriplesMap(joinedTm.parentTriplesMap).get;

// find the parent source of the join condition
val joinParentSource = joinedTm.joinCondition.get.parent.identifier
Expand Down Expand Up @@ -503,9 +499,16 @@ object Main extends Logging {
.map(preProcessingFunction)
.map(new StdStaticProcessor(engine))
.name("Execute mapping statements on items")

.map(outputStringToLogicalTargetIDs => {
outputStringToLogicalTargetIDs.map(outputStringToLogicalTargetID => outputStringToLogicalTargetID._2)
// TODO: integrate logical target for data set.
})
.name("Ignoring the logical target for now.")
.flatMap(list => {
list.seq
})
// format every list of triples (as strings)
.flatMap(list => if (list.nonEmpty) Some(list.reduce((a, b) => a + "\n" + b) + "\n\n") else None)
.reduce((a, b) => a + "\n" + b + "\n\n")
.name("Convert triples to strings")
})

Expand Down Expand Up @@ -545,7 +548,7 @@ object Main extends Logging {
})


val parentTriplesMap = TriplesMapsCache.get(tm.parentTriplesMap).get;
val parentTriplesMap = NodeCache.getTriplesMap(tm.parentTriplesMap).get;
val parentDataset =
// Create a Source from the parents logical source
Source(parentTriplesMap.logicalSource).asInstanceOf[FileDataSet]
Expand Down Expand Up @@ -587,7 +590,16 @@ object Main extends Logging {
.map(new JoinedStaticProcessor(engine)).name("Execute mapping statements on joined items")

// format the list of triples as strings
.flatMap(list => if (list.nonEmpty) Some(list.reduce((a, b) => a + "\n" + b)) else None)
.map(outputStringToLogicalTargetIDs => {
outputStringToLogicalTargetIDs.map(outputStringToLogicalTargetID => outputStringToLogicalTargetID._2)
// TODO: integrate logical target for data set.
})
.name("Ignoring the logical target for now.")
.flatMap(list => {
list.seq
})
// format every list of triples (as strings)
.reduce((a, b) => a + "\n" + b + "\n\n")
.name("Convert triples to strings")

} else { // if there are no join conditions a cross join will be executed
Expand All @@ -599,7 +611,16 @@ object Main extends Logging {
JoinedItem(items._1, items._2)
) // create a JoinedItem from the crossed items
.map(new JoinedStaticProcessor(engine)).name("Execute mapping statements on joined items") // process the joined items
.flatMap(list => if (list.nonEmpty) Some(list.reduce((a, b) => a + "\n" + b)) else None) // format the triples
.map(outputStringToLogicalTargetIDs => {
outputStringToLogicalTargetIDs.map(outputStringToLogicalTargetID => outputStringToLogicalTargetID._2)
// TODO: integrate logical target for data set.
})
.name("Ignoring the logical target for now.")
.flatMap(list => {
list.seq
})
// format every list of triples (as strings)
.reduce((a, b) => a + "\n" + b + "\n\n")
.name("Convert joined triples to strings")
}

Expand Down Expand Up @@ -641,7 +662,4 @@ object Main extends Logging {
} else head
}




}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.rml.framework.core.extractors

import io.rml.framework.core.extractors.std.StdDataTargetExtractor
import io.rml.framework.core.model.DataTarget

/**
* MIT License
*
* Copyright (C) 2017 - 2021 RDF Mapping Language (RML)
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
* */
trait DataTargetExtractor extends ResourceExtractor[DataTarget]

object DataTargetExtractor {
def apply(): DataTargetExtractor = {
new StdDataTargetExtractor()
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ object ExtractorUtil {

def extractResourceFromProperty(resource: RDFResource, property: String): Option[RDFResource] = {
val properties = resource.listProperties(property);
require(properties.length <= 1, resource.uri.toString + ": at most 1 " + property + " needed.");
if (properties.isEmpty) {
None
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.rml.framework.core.extractors

import io.rml.framework.core.extractors.std.StdLogicalTargetExtractor
import io.rml.framework.core.model.LogicalTarget

/**
* MIT License
*
* Copyright (C) 2017 - 2021 RDF Mapping Language (RML)
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
* */
trait LogicalTargetExtractor extends ResourceExtractor[Set[LogicalTarget]]

object LogicalTargetExtractor {

def apply(): LogicalTargetExtractor = {
lazy val extractor = new StdLogicalTargetExtractor(DataTargetExtractor())
extractor
}

}
54 changes: 54 additions & 0 deletions src/main/scala/io/rml/framework/core/extractors/NodeCache.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package io.rml.framework.core.extractors

import io.rml.framework.core.model.{LogicalTarget, Node, TriplesMap}

/**
* MIT License
*
* Copyright (C) 2017 - 2020 RDF Mapping Language (RML)
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
* */
object NodeCache extends scala.collection.mutable.HashMap[String, Node] {

def getTriplesMap(resource: String): Option[TriplesMap] = {
val node = NodeCache.get(resource)
node match {
case Some(tm: TriplesMap) => Some(tm)
case None => None
case _ => throw new InternalError(s"Expected TriplesMap in node cache for key ${resource}")
}
}

def getLogicalTarget(identifier: String): Option[LogicalTarget] = {
val node = NodeCache.get(identifier)
node match {
case Some(tm: LogicalTarget) => Some(tm)
case None => None
case _ => throw new InternalError(s"Expected TriplesMap in node cache for key ${identifier}")
}
}

def logicalTargetIterator: Iterator[(String, LogicalTarget)] = {
this.iterator
.filter(entry => entry._2.isInstanceOf[LogicalTarget])
.map(entry => (entry._1, entry._2.asInstanceOf[LogicalTarget]))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ import io.rml.framework.core.model.rdf.RDFResource
import io.rml.framework.core.vocabulary._
import io.rml.framework.shared.RMLException

import java.util.Properties

class StdDataSourceExtractor extends DataSourceExtractor {

/**
Expand Down Expand Up @@ -72,6 +70,7 @@ class StdDataSourceExtractor extends DataSourceExtractor {
case Uri(RMLSVoc.Class.FILESTREAM) => extractFileStream(resource)
case Uri(RMLSVoc.Class.KAFKASTREAM) => extractKafkaStream(resource)
case Uri(WoTVoc.ThingDescription.Class.THING) => extractWoTSource(resource)
case _ => throw new RMLException(s"${classResource.uri} not supported as data source.")
}
case literal: Literal => throw new RMLException(literal.value + ": type must be a resource.")
}
Expand Down Expand Up @@ -140,14 +139,6 @@ class StdDataSourceExtractor extends DataSourceExtractor {
}
logDebug("MQTT data source defined in mapping file. hypermediaTarget: " + hypermediaTarget
+ ", contentType: " + contentType + ", dup: " + dup + ", qosOpt: " + qosOpt);
val mqttProperties = new Properties;
mqttProperties.put("hypermediaTarget", hypermediaTarget);
mqttProperties.put("contentType", contentType);
mqttProperties.put("controlPacketValue", controlPacketValue);
if (qosOpt.isDefined) {
mqttProperties.put("qos", qosOpt.get);
}
mqttProperties.put("dup", dup); // Java 8 can't handle Scala Boolean objects in a Properties object.
MQTTStream(mqttProperties)
MQTTStream(hypermediaTarget, contentType, controlPacketValue, dup, qosOpt)
}
}
Loading

0 comments on commit fac534c

Please sign in to comment.