StreamingContext
is the entry point for all Spark Streaming functionality. Whatever you do in Spark Streaming has to start from creating an instance of StreamingContext.
import org.apache.spark.streaming._
val sc = SparkContext.getOrCreate
val ssc = new StreamingContext(sc, Seconds(5))
Note
|
StreamingContext belongs to org.apache.spark.streaming package.
|
With an instance of StreamingContext
in your hands, you can create ReceiverInputDStreams or set the checkpoint directory.
Once streaming pipelines are developed, you start StreamingContext to set the stream transformations in motion. You stop the instance when you are done.
You can create a new instance of StreamingContext
using the following constructors. You can group them by whether a StreamingContext constructor creates it from scratch or it is recreated from a checkpoint directory (follow the links for their extensive coverage).
-
Creating StreamingContext from scratch:
-
StreamingContext(conf: SparkConf, batchDuration: Duration)
-
StreamingContext(master: String, appName: String, batchDuration: Duration, sparkHome: String, jars: Seq[String], environment: Map[String,String])
-
StreamingContext(sparkContext: SparkContext, batchDuration: Duration)
-
-
Recreating StreamingContext from a checkpoint file (where
path
is the checkpoint directory):-
StreamingContext(path: String)
-
StreamingContext(path: String, hadoopConf: Configuration)
-
StreamingContext(path: String, sparkContext: SparkContext)
-
Note
|
StreamingContext(path: String) uses SparkHadoopUtil.get.conf.
|
Note
|
When a StreamingContext is created and spark.streaming.checkpoint.directory setting is set, the value gets passed on to checkpoint method. |
When you create a new instance of StreamingContext
, it first checks whether a SparkContext or the checkpoint directory are given (but not both!)
Tip
|
WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data. |
A DStreamGraph is created.
A JobScheduler is created.
A StreamingJobProgressListener is created.
Streaming tab in web UI is created (when spark.ui.enabled is enabled).
A StreamingSource is instantiated.
At this point, StreamingContext
enters INITIALIZED state.
StreamingContext
offers the following methods to create ReceiverInputDStreams:
-
actorStream[T](props: Props, name: String, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy): ReceiverInputDStream[T]
-
socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String]
-
socketStream[T](hostname: String, port: Int, converter: (InputStream) ⇒ Iterator[T], storageLevel: StorageLevel): ReceiverInputDStream[T]
-
rawSocketStream[T](hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[T]
StreamingContext
offers the following methods to create InputDStreams:
-
queueStream[T](queue: Queue[RDD[T]], oneAtATime: Boolean = true): InputDStream[T]
-
queueStream[T](queue: Queue[RDD[T]], oneAtATime: Boolean, defaultRDD: RDD[T]): InputDStream[T]
You can also use two additional methods in StreamingContext
to build (or better called compose) a custom DStream:
-
union[T](streams: Seq[DStream[T]]): DStream[T]
receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T]
You can register a custom input dstream using receiverStream
method. It accepts a Receiver.
Note
|
You can find an example of a custom Receiver in Custom Receiver.
|
transform[T](dstreams: Seq[DStream[_]], transformFunc: (Seq[RDD[_]], Time) => RDD[T]): DStream[T]
remember(duration: Duration): Unit
remember
method sets the remember interval (for the graph of output dstreams). It simply calls DStreamGraph.remember method and exits.
Caution
|
FIXME figure |
The checkpoint interval is an internal property of StreamingContext
and corresponds to batch interval or checkpoint interval of the checkpoint (when checkpoint was present).
Note
|
The checkpoint interval property is also called graph checkpointing interval. |
checkpoint interval is mandatory when checkpoint directory is defined (i.e. not null
).
A checkpoint directory is a HDFS-compatible directory where checkpoints are written to.
Note
|
"A HDFS-compatible directory" means that it is Hadoop’s Path class to handle all file system-related operations. |
Its initial value depends on whether the StreamingContext was (re)created from a checkpoint or not, and is the checkpoint directory if so. Otherwise, it is not set (i.e. null
).
You can set the checkpoint directory when a StreamingContext is created or later using checkpoint method.
Internally, a checkpoint directory is tracked as checkpointDir
.
Tip
|
Refer to Checkpointing for more detailed coverage. |
Initial checkpoint is the checkpoint (file) this StreamingContext has been recreated from.
The initial checkpoint is specified when a StreamingContext is created.
val ssc = new StreamingContext("_checkpoint")
isCheckpointPresent
internal method behaves like a flag that remembers whether the StreamingContext
instance was created from a checkpoint or not so the other internal parts of a streaming application can make decisions how to initialize themselves (or just be initialized).
isCheckpointPresent
checks the existence of the initial checkpoint that gave birth to the StreamingContext.
checkpoint(directory: String): Unit
You use checkpoint
method to set directory
as the current checkpoint directory.
Note
|
Spark creates the directory unless it exists already. |
checkpoint
uses SparkContext.hadoopConfiguration to get the file system and create directory
on. The full path of the directory is passed on to SparkContext.setCheckpointDir method.
Note
|
Calling checkpoint with null as directory clears the checkpoint directory that effectively disables checkpointing.
|
Note
|
When StreamingContext is created and spark.streaming.checkpoint.directory setting is set, the value gets passed on to checkpoint method.
|
start(): Unit
start()
starts stream processing. It acts differently per state of StreamingContext and only INITIALIZED state makes for a proper startup.
Note
|
Consult States section in this document to learn about the states of StreamingContext. |
Right after StreamingContext has been instantiated, it enters INITIALIZED
state in which start
first checks whether another StreamingContext
instance has already been started in the JVM. It throws IllegalStateException
exception if it was and exits.
java.lang.IllegalStateException: Only one StreamingContext may be started in this JVM. Currently running StreamingContext was started at [startSite]
If no other StreamingContext exists, it performs setup validation and starts JobScheduler
(in a separate dedicated daemon thread called streaming-start).
It enters ACTIVE state.
It then register the shutdown hook stopOnShutdown and streaming metrics source. If web UI is enabled, it attaches the Streaming tab.
Given all the above has have finished properly, it is assumed that the StreamingContext started fine and so you should see the following INFO message in the logs:
INFO StreamingContext: StreamingContext started
When in ACTIVE
state, i.e. after it has been started, executing start
merely leads to the following WARN message in the logs:
WARN StreamingContext: StreamingContext has already been started
Attempting to start StreamingContext
in STOPPED state, i.e. after it has been stopped, leads to the IllegalStateException
exception:
java.lang.IllegalStateException: StreamingContext has already been stopped
You stop StreamingContext
using one of the three variants of stop
method:
-
stop(stopSparkContext: Boolean = true)
-
stop(stopSparkContext: Boolean, stopGracefully: Boolean)
Note
|
The first stop method uses spark.streaming.stopSparkContextByDefault configuration setting that controls stopSparkContext input parameter.
|
stop
methods stop the execution of the streams immediately (stopGracefully
is false
) or wait for the processing of all received data to be completed (stopGracefully
is true
).
stop
reacts appropriately per the state of StreamingContext
, but the end state is always STOPPED state with shutdown hook removed.
If a user requested to stop the underlying SparkContext (when stopSparkContext
flag is enabled, i.e. true
), it is now attempted to be stopped.
It is only in ACTIVE state when stop
does more than printing out WARN messages to the logs.
It does the following (in order):
-
StreamingSource is removed from MetricsSystem (using
MetricsSystem.removeSource
) -
Streaming tab is detached (using
StreamingTab.detach
). -
ContextWaiter
isnotifyStop()
-
shutdownHookRef
is cleared.
At that point, you should see the following INFO message in the logs:
INFO StreamingContext: StreamingContext stopped successfully
StreamingContext
enters STOPPED state.
When in INITIALIZED state, you should see the following WARN message in the logs:
WARN StreamingContext: StreamingContext has not been started yet
StreamingContext
enters STOPPED state.
stopOnShutdown
is a JVM shutdown hook to clean up after StreamingContext
when the JVM shuts down, e.g. all non-daemon thread exited, System.exit
was called or ^C
was typed.
Note
|
It is registered to ShutdownHookManager when StreamingContext starts. |
Note
|
ShutdownHookManager uses org.apache.hadoop.util.ShutdownHookManager for its work.
|
When executed, it first reads spark.streaming.stopGracefullyOnShutdown setting that controls whether to stop StreamingContext gracefully or not. You should see the following INFO message in the logs:
INFO Invoking stop(stopGracefully=[stopGracefully]) from shutdown hook
With the setting it stops StreamingContext without stopping the accompanying SparkContext
(i.e. stopSparkContext
parameter is disabled).
validate(): Unit
validate()
method validates configuration of StreamingContext
.
Note
|
The method is executed when StreamingContext is started.
|
It first asserts that DStreamGraph
has been assigned (i.e. graph
field is not null
) and triggers validation of DStreamGraph.
Caution
|
It appears that graph could never be null , though.
|
If checkpointing is enabled, it ensures that checkpoint interval is set and checks whether the current streaming runtime environment can be safely serialized by serializing a checkpoint for fictitious batch time 0 (not zero time).
If dynamic allocation is enabled, it prints the following WARN message to the logs:
WARN StreamingContext: Dynamic Allocation is enabled for this application. Enabling Dynamic allocation for Spark Streaming applications can cause data loss if Write Ahead Log is not enabled for non-replayable sources like Flume. See the programming guide for details on how to enable the Write Ahead Log
StreamingContext
can be in three states:
-
INITIALIZED
, i.e. after it was instantiated. -
ACTIVE
, i.e. after it was started. -
STOPPED
, i.e. after it has been stopped