Skip to content

C# and F# language binding and extensions to Apache Spark

License

Notifications You must be signed in to change notification settings

cite-sa/MobiusCore

 
 

Repository files navigation

MobiusCore: C# API for Spark

MobiusCore is a port of Mobius the opensource framework that provides C# language binding to Apache Spark enabling the implementation of Spark driver program and data processing operations in the languages supported in the .NET framework like C# or F#. Mobius relied heavily on Delegate Serialization, a feature that was cut from .NET Core. In MobiusCore Delegates have been replaced by Linq Expressions, thus making it easier to Serialize the UDF's to the Mobius Workers. More details on this approach can be found here

For example, the word count sample in Apache Spark can be implemented in C# as follows :

var lines = sparkContext.TextFile(@"hdfs://path/to/input.txt");  
var words = lines.FlatMap(s => s.Split(' '));
var wordCounts = words.Map(w => new Tuple<string, int>(w.Trim(), 1))  
                      .ReduceByKey((x, y) => x + y);  
var wordCountCollection = wordCounts.Collect();  
wordCounts.SaveAsTextFile(@"hdfs://path/to/wordcount.txt");  

A simple DataFrame application using TempTable may look like the following:

var reqDataFrame = sqlContext.TextFile(@"hdfs://path/to/requests.csv");
var metricDataFrame = sqlContext.TextFile(@"hdfs://path/to/metrics.csv");
reqDataFrame.RegisterTempTable("requests");
metricDataFrame.RegisterTempTable("metrics");
// C0 - guid in requests DataFrame, C3 - guid in metrics DataFrame  
var joinDataFrame = GetSqlContext().Sql(  
    "SELECT joinedtable.datacenter" +
         ", MAX(joinedtable.latency) maxlatency" +
         ", AVG(joinedtable.latency) avglatency " +
    "FROM (" +
       "SELECT a.C1 as datacenter, b.C6 as latency " +  
       "FROM requests a JOIN metrics b ON a.C0  = b.C3) joinedtable " +   
    "GROUP BY datacenter");
joinDataFrame.ShowSchema();
joinDataFrame.Show();

A simple DataFrame application using DataFrame DSL may look like the following:

// C0 - guid, C1 - datacenter
var reqDataFrame = sqlContext.TextFile(@"hdfs://path/to/requests.csv")  
                             .Select("C0", "C1");    
// C3 - guid, C6 - latency   
var metricDataFrame = sqlContext.TextFile(@"hdfs://path/to/metrics.csv", ",", false, true)
                                .Select("C3", "C6"); //override delimiter, hasHeader & inferSchema
var joinDataFrame = reqDataFrame.Join(metricDataFrame, reqDataFrame["C0"] == metricDataFrame["C3"])
                                .GroupBy("C1");
var maxLatencyByDcDataFrame = joinDataFrame.Agg(new Dictionary<string, string> { { "C6", "max" } });
maxLatencyByDcDataFrame.ShowSchema();
maxLatencyByDcDataFrame.Show();

A simple Spark Streaming application that processes messages from Kafka using C# may be implemented using the following code:

StreamingContext sparkStreamingContext = StreamingContext.GetOrCreate(checkpointPath, () =>
    {
      var ssc = new StreamingContext(sparkContext, slideDurationInMillis);
      ssc.Checkpoint(checkpointPath);
      var stream = KafkaUtils.CreateDirectStream(ssc, topicList, kafkaParams, perTopicPartitionKafkaOffsets);
      //message format: [timestamp],[loglevel],[logmessage]
      var countByLogLevelAndTime = stream
                                    .Map(kvp => Encoding.UTF8.GetString(kvp.Value))
                                    .Filter(line => line.Contains(","))
                                    .Map(line => line.Split(','))
                                    .Map(columns => new Tuple<string, int>(
                                                          string.Format("{0},{1}", columns[0], columns[1]), 1))
                                    .ReduceByKeyAndWindow((x, y) => x + y, (x, y) => x - y,
                                                          windowDurationInSecs, slideDurationInSecs, 3)
                                    .Map(logLevelCountPair => string.Format("{0},{1}",
                                                          logLevelCountPair.Key, logLevelCountPair.Value));
      countByLogLevelAndTime.ForeachRDD(countByLogLevel => new SparkStreamingHelper().Execute(countByLogLevel));
      return ssc;
    });
sparkStreamingContext.Start();
sparkStreamingContext.AwaitTermination();

For more code samples, refer to MobiusCore\examples directory or MobiusCore\csharp\Samples directory.

API Documentation

Refer to MobiusCore C# API documentation for the list of Spark's data processing operations supported in Mobius.

API Usage

Mobius API usage samples are available at:

  • Examples folder which contains standalone C# and F# projects that can be used as templates to start developing Mobius applications

  • Samples project which uses a comprehensive set of Mobius APIs to implement samples that are also used for functional validation of APIs

  • Mobius performance test scenarios implemented in C# and Scala for side by side comparison of Spark driver code

Documents

Refer to the docs folder for design overview and other info on Mobius

License

License

Mobius is licensed under the MIT license. See LICENSE file for full license information.

About

C# and F# language binding and extensions to Apache Spark

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • C# 85.0%
  • Scala 9.7%
  • PowerShell 1.7%
  • C++ 1.3%
  • Batchfile 1.2%
  • Shell 0.6%
  • Other 0.5%