Skip to content

Commit

Permalink
Clean up TCP push code and modify tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ghsnd committed Mar 18, 2021
1 parent a688f1f commit 83b2b84
Show file tree
Hide file tree
Showing 94 changed files with 94 additions and 204 deletions.
49 changes: 2 additions & 47 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Please note that this version works with Flink 1.11.3 with Scala 2.11 support, w
### Building RMLStreamer

In order to build a jar file that can be deployed on a Flink cluster, you need:
- a Java JDK >= 11 and <= 13
- a Java JDK >= 11 and <= 13 (We develop and test on JDK 11)
- Apache Maven 3 or higher

Clone or download and then build the code in this repository:
Expand All @@ -46,7 +46,7 @@ The resulting `RMLStreamer-<version>.jar`, found in the `target` folder, can be
### Executing RML Mappings

Here we give examples for running RMLStreamer from the command line. We use `FLINK_BIN` to denote the Flink CLI tool,
usually found in the `bin` directory of the Flink installation. E.g. `/home/myuser/flink-1.11.2/bin/flink`.
usually found in the `bin` directory of the Flink installation. E.g. `/home/myuser/flink-1.11.3/bin/flink`.
For Windows a `flink.bat` script is provided.

The general usage is:
Expand Down Expand Up @@ -121,7 +121,6 @@ An example of how to define the generation of an RDF stream from a stream in an
rml:source [
rdf:type rmls:TCPSocketStream ;
rmls:hostName "localhost";
rmls:type "PULL" ;
rmls:port "5005"
];
rml:referenceFormulation ql:JSONPath;
Expand Down Expand Up @@ -193,38 +192,6 @@ See also https://stackoverflow.com/questions/38639019/flink-kafka-consumer-group

The only option for spreading load is to use multiple topics, and assign one RMLStreamer job to one topic.

##### Generating a stream from a file (to be implemented)
```
<#TripleMap>
a rr:TriplesMap;
rml:logicalSource [
rml:source [
rdf:type rmls:FileStream;
rmls:path "/home/wmaroy/github/rml-framework/akka-pipeline/src/main/resources/io/rml/framework/data/books.json"
];
rml:referenceFormulation ql:JSONPath;
rml:iterator "$.store.books[*]"
];
rr:subjectMap [
rr:template "{$.id}" ;
rr:termType rr:IRI;
rr:class skos:Concept
];
rr:predicateObjectMap [
rr:predicateMap [
rr:constant dcterms:title;
rr:termType rr:IRI
];
rr:objectMap [
rml:reference "$.id";
rr:termType rr:Literal
]
].
```

##### Generating a stream from a dataset

```
Expand Down Expand Up @@ -272,17 +239,6 @@ The following are the classes/terms currently used:


* **rmls:port** specifies a port number for the stream mapper to connect to.


* **rmls:type** specifies how a streamer will act:
* **"PULL"**:
The stream mapper will act as a client.
It will create a socket and connect to the specified port at the given host name.
**rmls:port** and **rmls:hostName** needs to be specified.
* **"PUSH"**:
The stream mapper will act as a server and will start listening at the given port.
If the given port is taken, the mapper will keep opening subsequent ports until a free port is found.
Only **rmls:port** needs to be specified here.

Example of a valid json logical source map using all possible terms:

Expand All @@ -292,7 +248,6 @@ rml:logicalSource [
rml:source [
rdf:type rmls:TCPSocketStream ;
rmls:hostName "localhost";
rmls:type "PULL" ;
rmls:port "5005"
];
rml:referenceFormulation ql:JSONPath;
Expand Down
2 changes: 1 addition & 1 deletion documentation/README_DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Prerequisites
* Git
* Java JDK >= 11
* Java JDK 11
* Maven 3
* IntelliJ (Eclipse should work too, but consult Flink documentation for set-up)

Expand Down
1 change: 1 addition & 0 deletions documentation/README_Netty_Snapshot.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# README: Using JitPack
(kept for reference only, not applicable anymore for RMLStreamer)

### Using JitPack to get unpublished Maven Dependencies from a GitHub/GitLab repository

Expand Down
23 changes: 0 additions & 23 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,29 +52,6 @@ SOFTWARE.
<license.licenseName>mit</license.licenseName>
</properties>

<!--
Execute "mvn clean package -Pbuild-jar"
to build a jar file out of this project!
How to use the Flink Quickstart pom:
a) Adding new dependencies:
You can add dependencies to the list below.
Please check if the maven-shade-plugin below is filtering out your dependency
and remove the exclude from there.
b) Build a jar for running on the cluster:
There are two options for creating a jar from this project
b.1) "mvn clean package" -> this will create a fat jar which contains all
dependencies necessary for running the jar created by this pom in a cluster.
The "maven-shade-plugin" excludes everything that is provided on a running Flink cluster.
b.2) "mvn clean package -Pbuild-jar" -> This will also create a fat-jar, but with much
nicer dependency exclusion handling. This approach is preferred and leads to
much cleaner jar files.
-->

<dependencies>

Expand Down
1 change: 0 additions & 1 deletion src/main/resources/json_stream_data_mapping.ttl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
rml:source [
rdf:type rmls:TCPSocketStream ;
rmls:hostName "localhost";
rmls:type "PULL";
rmls:port "5005"
];
rml:referenceFormulation ql:JSONPath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,6 @@ class StdDataSourceExtractor extends DataSourceExtractor {
require(hostNameProperties.length == 1, resource.uri.toString + ": exactly 1 hostname needed.")
val portProperties = resource.listProperties(RMLVoc.Property.PORT)
require(portProperties.length == 1, resource.uri.toString + ": exactly 1 port needed.")
val typeProperties = resource.listProperties(RMLVoc.Property.TYPE)
require(typeProperties.length == 1, resource.uri.toString + ": needs type.")

val hostName = hostNameProperties.head match {
case resource: RDFResource => throw new RMLException(resource.uri + ": hostname must be a literal.")
Expand All @@ -119,7 +117,6 @@ class StdDataSourceExtractor extends DataSourceExtractor {
case literal: Literal => literal.value
}

val _type = ExtractorUtil.matchLiteral(typeProperties.head)
TCPSocketStream(hostName, port.toInt, _type.value)
TCPSocketStream(hostName, port.toInt)
}
}
13 changes: 3 additions & 10 deletions src/main/scala/io/rml/framework/core/model/TCPSocketStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,12 @@ package io.rml.framework.core.model

import java.util.Objects

case class TCPSocketStream(hostName: String, port: Int, _type: String) extends StreamDataSource {
case class TCPSocketStream(hostName: String, port: Int) extends StreamDataSource {
override def uri: ExplicitNode = {
val hashValue = Objects.hash(hostName, Integer.valueOf(port), _type)
val hashValue = Objects.hash(hostName, Integer.valueOf(port))

Uri(hashValue.toHexString)
}
}

object TCPSocketStream {

object TYPE {
val PUSH = "PUSH"
val PULL = "PULL"
}

}
//object TCPSocketStream
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ object RMLVoc {
val HOSTNAME = Namespaces("rmls", "hostName")
val PORT = Namespaces("rmls", "port")
val PATH = Namespaces("rmls", "path")
val TYPE = Namespaces("rmls", "type")

///////////////////////////////////////////////////////////////////////////
// RMLS: Kafka Source
Expand Down
15 changes: 1 addition & 14 deletions src/main/scala/io/rml/framework/flink/source/StreamUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,11 @@ import io.rml.framework.core.model.TCPSocketStream
import io.rml.framework.core.util.StreamerConfig
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
//import org.apache.flink.streaming.connectors.netty.example.TcpReceiverSource

object StreamUtil {

def createTcpSocketSource(tCPSocketStream: TCPSocketStream, delimiter:String="\n")(implicit env: StreamExecutionEnvironment): DataStream[String] = {

tCPSocketStream._type match {
//TODO Update flink to 1.3.3 to use latest methods in scala without java -> scala conversion.

/**
* Flink 1.3.2's scala socketTextStream can't use custom multi-char/String delimiter (there is delimiter param but it's not being used)
*
*/


case TCPSocketStream.TYPE.PULL => new DataStream[String](env.getJavaEnv.socketTextStream(tCPSocketStream.hostName, tCPSocketStream.port, delimiter))
//case TCPSocketStream.TYPE.PUSH => env.addSource(new TcpReceiverSource(tCPSocketStream.port)).setParallelism(1) // to avoid library to setup multiple instances
}
new DataStream[String](env.getJavaEnv.socketTextStream(tCPSocketStream.hostName, tCPSocketStream.port, delimiter))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
a rr:TriplesMap;

rml:logicalSource [
rml:source [ rdf:type rmls:TCPSocketStream ; rmls:hostName "localhost" ; rmls:port "9999" ; rmls:type "PULL" ];
rml:source [ rdf:type rmls:TCPSocketStream ; rmls:hostName "localhost" ; rmls:port "9999" ; ];
rml:referenceFormulation ql:CSV
];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
a rr:TriplesMap;

rml:logicalSource [
rml:source [ rdf:type rmls:TCPSocketStream ; rmls:hostName "localhost" ; rmls:port "9999" ; rmls:type "PULL" ];
rml:source [ rdf:type rmls:TCPSocketStream ; rmls:hostName "localhost" ; rmls:port "9999" ; ];
rml:referenceFormulation ql:CSV
];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
a rr:TriplesMap;

rml:logicalSource [
rml:source [ rdf:type rmls:TCPSocketStream ; rmls:hostName "localhost" ; rmls:port "9999" ; rmls:type "PULL" ];
rml:source [ rdf:type rmls:TCPSocketStream ; rmls:hostName "localhost" ; rmls:port "9999" ; ];
rml:referenceFormulation ql:JSONPath;
];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
a rr:TriplesMap;

rml:logicalSource [
rml:source [ rdf:type rmls:TCPSocketStream ; rmls:hostName "localhost" ; rmls:port "9999" ; rmls:type "PULL" ];
rml:source [ rdf:type rmls:TCPSocketStream ; rmls:hostName "localhost" ; rmls:port "9999" ; ];
rml:referenceFormulation ql:JSONPath;
rml:iterator "$.persons[*]"
];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
a rr:TriplesMap;

rml:logicalSource [
rml:source [ rdf:type rmls:TCPSocketStream ; rmls:hostName "localhost" ; rmls:port "9999" ; rmls:type "PULL" ];
rml:source [ rdf:type rmls:TCPSocketStream ; rmls:hostName "localhost" ; rmls:port "9999" ; ];
rml:referenceFormulation ql:XPath;
];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
a rr:TriplesMap;

rml:logicalSource [
rml:source [ rdf:type rmls:TCPSocketStream ; rmls:hostName "localhost" ; rmls:port "9999" ; rmls:type "PULL" ];
rml:source [ rdf:type rmls:TCPSocketStream ; rmls:hostName "localhost" ; rmls:port "9999" ; ];
rml:referenceFormulation ql:XPath;
rml:iterator "/Persons/Person"
];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
rml:source [
rdf:type rmls:TCPSocketStream ;
rmls:hostName "localhost" ;
rmls:port "9999" ;
rmls:type "PULL"
rmls:port "9999"
];
rml:referenceFormulation ql:CSV
];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
rml:source [
rdf:type rmls:TCPSocketStream ;
rmls:hostName "localhost" ;
rmls:port "9999" ;
rmls:type "PULL"
rmls:port "9999"
];
rml:referenceFormulation ql:CSV
];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
rml:source [
rdf:type rmls:TCPSocketStream ;
rmls:hostName "localhost" ;
rmls:port "9999" ;
rmls:type "PULL"
rmls:port "9999"
];
rml:referenceFormulation ql:CSV
];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
rml:source [
rdf:type rmls:TCPSocketStream ;
rmls:hostName "localhost" ;
rmls:port "9999" ;
rmls:type "PULL"
rmls:port "9999"
];
rml:referenceFormulation ql:CSV
];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
rml:source [
rdf:type rmls:TCPSocketStream ;
rmls:hostName "localhost" ;
rmls:port "9999" ;
rmls:type "PULL"
rmls:port "9999"
];
rml:referenceFormulation ql:CSV
];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
rml:source [
rdf:type rmls:TCPSocketStream ;
rmls:hostName "localhost" ;
rmls:port "9999" ;
rmls:type "PULL"
rmls:port "9999"
];
rml:referenceFormulation ql:CSV
];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
rml:source [
rdf:type rmls:TCPSocketStream ;
rmls:hostName "localhost" ;
rmls:port "9999" ;
rmls:type "PULL"
rmls:port "9999"
];
rml:referenceFormulation ql:CSV
];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
rml:source [
rdf:type rmls:TCPSocketStream ;
rmls:hostName "localhost" ;
rmls:port "9999" ;
rmls:type "PULL"
rmls:port "9999"
];
rml:referenceFormulation ql:CSV
];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
rml:source [
rdf:type rmls:TCPSocketStream ;
rmls:hostName "localhost" ;
rmls:port "9999" ;
rmls:type "PULL"
rmls:port "9999"
];
rml:referenceFormulation ql:CSV
];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
rml:source [
rdf:type rmls:TCPSocketStream ;
rmls:hostName "localhost" ;
rmls:port "9999" ;
rmls:type "PULL"
rmls:port "9999"
];
rml:referenceFormulation ql:CSV
];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
rml:source [
rdf:type rmls:TCPSocketStream ;
rmls:hostName "localhost" ;
rmls:port "9999" ;
rmls:type "PULL"
rmls:port "9999"
];
rml:referenceFormulation ql:CSV
];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
rml:source [
rdf:type rmls:TCPSocketStream ;
rmls:hostName "localhost" ;
rmls:port "9999" ;
rmls:type "PULL"
rmls:port "9999"
];
rml:referenceFormulation ql:CSV
];
Expand Down
Loading

0 comments on commit 83b2b84

Please sign in to comment.