In the example above, we first filter out null references for invalid employee ids and then again apply a filter to only keep employees with salaries over a certain threshold. Here, the first employee with the salary greater than is returned. If no such employee exists, then null is returned. We saw how we used collect to get data out of the stream. If we need to get an array out of the stream, we can simply use toArray :. The syntax Employee::new creates an empty array of Employee — which is then filled with elements from the stream.
In cases like this, flatMap helps us to flatten the data structure to simplify further operations:. We saw forEach earlier in this section, which is a terminal operation. However, sometimes we need to perform multiple operations on each element of the stream before any terminal operation is applied. Simply put, it performs the specified operation on each element of the stream and returns a new stream which can be used further.
Here, the first peek is used to increment the salary of each employee. The second peek is used to print the employees. Finally, collect is used as the terminal operation. Intermediate operations such as filter return a new stream on which further processing can be done. Terminal operations, such as forEach , mark the stream as consumed, after which point it can no longer be used further. A stream pipeline consists of a stream source, followed by zero or more intermediate operations, and a terminal operation.
Some operations are deemed short-circuiting operations. Short-circuiting operations allow computations on infinite streams to complete in finite time:. Here, we use short-circuiting operations skip to skip first 3 elements, and limit to limit to 5 elements from the infinite stream generated using iterate. One of the most important characteristics of streams is that they allow for significant optimizations through lazy evaluations. Computation on the source data is only performed when the terminal operation is initiated, and source elements are consumed only as needed.
For example, consider the findFirst example we saw earlier. How many times is the map operation performed here? It first performs all the operations on id 1.
Since the salary of id 1 is not greater than , the processing moves on to the next element. Id 2 satisfies both of the filter predicates and hence the stream evaluates the terminal operation findFirst and returns the result. This behavior becomes even more important when the input stream is infinite and not just very large. This means, in the example above, even if we had used findFirst after the sorted , the sorting of all the elements is done before applying the findFirst.
This happens because the operation cannot know what the first element is until the entire stream is sorted. As the name suggests, min and max return the minimum and maximum element in the stream respectively, based on a comparator. They return an Optional since a result may or may not exist due to, say, filtering :. It uses the equals method of the elements to decide whether two elements are equal or not:. These operations all take a predicate and return a boolean. Short-circuiting is applied and processing is stopped as soon as the answer is determined:.
Here, it returns false as soon as it encounters 5, which is not divisible by 2. Here, again short-circuiting is applied and true is returned immediately after the first element. Here, it simply returns false as soon as it encounters 6, which is divisible by 3. It tells RxJS that we want to listen to click events from the document. If we pass a jQuery object instead of document, then RxJS knows that it has to call on instead. This example using fromEventPattern basically does the same as fromEvent :. RxJS itself creates the actual listener handler and your job is to add and remove it.
The purpose of fromEventPattern is basically to tell RxJS how to register and remove event listeners. Now imagine you use a library where you have to call a method named registerListener. When we call foo. You could also use it to listen to more than one event type or connect with any API that communicates via callbacks, e. That is the stream completes after the callback has been invoked. Yes, you can actually create a websocket connection and expose it as stream:.
That means you can both subscribe to it in order to receive messages and send messages through it by calling next. You probably use a library or framework with built-in AJAX support anyway. Sometimes the already presented functions are not flexible enough. Or you need more control over subscriptions. A subject is a special object that allows you to emit data to the stream and control it.
That way you cannot accidentally call the source methods. In addition to the regular subject RxJS provides three specialized versions. The AsyncSubject emits only the last value after completion. The BehaviorSubject allows you to provide a default value that will be emitted to every subscriber if no other value has been emitted so far. Otherwise subscribers receive the last emitted value. The ReplaySubject stores all emitted values up to a certain number, time or infinitely.
All new subscribers will then get all stored values. You can find more information on subjects in the ReactiveX documentation that also offers additional links. You can create an observable by simply using the the new operator. With the function you pass in you can control the stream.
That function is called whenever someone subscribe and it receives an observer that you can use like a subject, i. The function can return an unsubscribe function that is called when the subscriber cancels the subscription. You can use it to clean up or execute some finishing action. Before the advent of lettable operators this was a way to implement custom operators. RxJS extends Observable internally. One example is Subject , another is the publish operator.
It returns a ConnectableObservable that provides the additional method connect.
Sometimes you already have an object that holds state and can emit values. You can turn it into an observable if you implement the Subscribable interface that consists of only a subscribe method. Knowing how to create individual streams is not enough. Sometimes you are confronted with several streams but you only need one. I can recommend an article from Max NgWizard K that even contains some fancy animations.
One more recommendation: You can interactively play with combination operators on RxMarbles by dragging around elements. Operators and functions that expect a stream or an array of streams usually do not only work with observables. Instead they actually expect the argument to be of the type ObservableInput that is defined as follows:. That means you can e.
The main purpose is to defer the creation of an observable to the time when someone wants to subscribe. This is useful if. The last point includes one not so obvious use case: Promises defer can also return a promise. Take this example using the fetch API:. Promises are executed immediately, whereas streams are executed when you subscribe.
In Akka Streams this is called a Flow :. This ensures that functionality specific to input streams can be added to these classes in the future without breaking binary compatibility. Otherwise the system will receive data, but not be able to process it. There are two approaches. Use the Download Materials button at the top or bottom of this tutorial to download the completed project. To start the processing after all the transformations have been setup, we finally call.
The very moment we call getUser, a request is sent even if we did not want that at that point. Actually iif can be easily implemented with defer and exists only for convenience and readability reasons. Starts the first stream and if it fails continues with the next stream. This achieves the most efficient sending of data to external systems. Specifically, RDD actions inside the DStream output operations force the processing of the received data. Hence, if your application does not have any output operation, or has output operations like dstream.
The system will simply receive the data and discard it. By default, output operations are executed one-at-a-time. And they are executed in the order they are defined in the application. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SQLContext.
The STREAMS Programming Guide describes how to use STREAMS in designing and implementing applications and STREAMS modules and. STREAMS is a general, flexible programming model for UNIX system communication services. STREAMS defines standard interfaces for character input/output.
This is shown in the following example. See the full source code. You can also run SQL queries on tables defined on streaming data from a different thread that is, asynchronous to the running StreamingContext. Just make sure that you set the StreamingContext to remember a sufficient amount of streaming data such that the query can run. Otherwise the StreamingContext, which is unaware of the any asynchronous SQL queries, will delete off old streaming data before the query can complete.
For example, if you want to query the last batch, but your query can take 5 minutes to run, then call streamingContext. You can also easily use machine learning algorithms provided by MLlib. First of all, there are streaming machine learning algorithms e. Beyond these, for a much larger class of machine learning algorithms, you can learn a learning model offline i. See the MLlib guide for more details.
This is useful if the data in the DStream will be computed multiple times e. Hence, DStreams generated by window-based operations are automatically persisted in memory, without the developer calling persist. For input streams that receive data over the network such as, Kafka, Flume, sockets, etc. Note that, unlike RDDs, the default persistence level of DStreams keeps the data serialized in memory. This is further discussed in the Performance Tuning section. More information on different persistence levels can be found in the Spark Programming Guide. For this to be possible, Spark Streaming needs to checkpoint enough information to a fault- tolerant storage system such that it can recover from failures.
There are two types of data that are checkpointed.
To summarize, metadata checkpointing is primarily needed for recovery from driver failures, whereas data or RDD checkpointing is necessary even for basic functioning if stateful transformations are used. Note that simple streaming applications without the aforementioned stateful transformations can be run without enabling checkpointing. The recovery from driver failures will also be partial in that case some received but unprocessed data may be lost.
This is often acceptable and many run Spark Streaming applications in this way. Support for non-Hadoop environments is expected to improve in the future. Checkpointing can be enabled by setting a directory in a fault-tolerant, reliable file system e. This is done by using streamingContext. This will allow you to use the aforementioned stateful transformations.
Additionally, if you want to make the application recover from driver failures, you should rewrite your streaming application to have the following behavior. This behavior is made simple by using StreamingContext. This is used as follows. If the checkpointDirectory exists, then the context will be recreated from the checkpoint data. If the directory does not exist i. This example appends the word counts of network data into a file. This behavior is made simple by using JavaStreamingContext. You can also explicitly create a StreamingContext from the checkpoint data and start the computation by using StreamingContext.
In addition to using getOrCreate one also needs to ensure that the driver process gets restarted automatically on failure. This can only be done by the deployment infrastructure that is used to run the application. This is further discussed in the Deployment section. Note that checkpointing of RDDs incurs the cost of saving to reliable storage. This may cause an increase in the processing time of those batches where RDDs get checkpointed. Hence, the interval of checkpointing needs to be set carefully. At small batch sizes say 1 second , checkpointing every batch may significantly reduce operation throughput.
Conversely, checkpointing too infrequently causes the lineage and task sizes to grow, which may have detrimental effects. For stateful transformations that require RDD checkpointing, the default interval is a multiple of the batch interval that is at least 10 seconds. It can be set by using dstream. Typically, a checkpoint interval of 5 - 10 sliding intervals of a DStream is a good setting to try.
Cluster with a cluster manager - This is the general requirement of any Spark application, and discussed in detail in the deployment guide. If you are using spark-submit to start the application, then you will not need to provide Spark and Spark Streaming in the JAR. However, if your application uses advanced sources e.
Kafka, Flume, Twitter , then you will have to package the extra artifact they link to, along with their dependencies, in the JAR that is used to deploy the application. Configuring sufficient memory for the executors - Since the received data must be stored in memory, the executors must be configured with sufficient memory to hold the received data.
Note that if you are doing 10 minute window operations, the system has to keep at least last 10 minutes of data in memory. So the memory requirements for the application depends on the operations used in it. Configuring checkpointing - If the stream application requires it, then a directory in the Hadoop API compatible fault-tolerant storage e.
HDFS, S3, etc. See the checkpointing section for more details. Configuring write ahead logs - Since Spark 1. If enabled, all the data received from a receiver gets written into a write ahead log in the configuration checkpoint directory. This prevents data loss on driver recovery, thus ensuring zero data loss discussed in detail in the Fault-tolerance Semantics section.
This can be enabled by setting the configuration parameter spark. However, these stronger semantics may come at the cost of the receiving throughput of individual receivers. This can be corrected by running more receivers in parallel to increase aggregate throughput. Additionally, it is recommended that the replication of the received data within Spark be disabled when the write ahead log is enabled as the log is already stored in a replicated storage system.
This can be done by setting the storage level for the input stream to StorageLevel. If a running Spark Streaming application needs to be upgraded with new application code, then there are two possible mechanisms. The upgraded Spark Streaming application is started and run in parallel to the existing application.
Once the new one receiving the same data as the old one has been warmed up and is ready for prime time, the old one be can be brought down. Note that this can be done for data sources that support sending the data to two destinations i. The existing application is shutdown gracefully see StreamingContext. Then the upgraded application can be started, which will start processing from the same point where the earlier application left off. Note that this can be done only with input sources that support source-side buffering like Kafka, and Flume as data needs to be buffered while the previous application was down and the upgraded application is not yet up.
And restarting from earlier checkpoint information of pre-upgrade code cannot be done. In this case, either start the upgraded app with a different checkpoint directory, or delete the previous checkpoint directory. If the data is being received by the receivers faster than what can be processed, you can limit the rate by setting the configuration parameter spark. When a StreamingContext is used, the Spark web UI shows an additional Streaming tab which shows statistics about running receivers whether receivers are active, number of records received, receiver error, etc.
This can be used to monitor the progress of the streaming application. In that case, consider reducing the batch processing time. The progress of a Spark Streaming program can also be monitored using the StreamingListener interface, which allows you to get receiver status and processing times.
Note that this is a developer API and it is likely to be improved upon i. Getting the best performance out of a Spark Streaming application on a cluster requires a bit of tuning. This section explains a number of the parameters and configurations that can be tuned to improve the performance of you application. At a high level, you need to consider two things:.
Setting the right batch size such that the batches of data can be processed as fast as they are received that is, data processing keeps up with the data ingestion. There are a number of optimizations that can be done in Spark to minimize the processing time of each batch. These have been discussed in detail in the Tuning Guide. This section highlights some of the most important ones. Receiving data over the network like Kafka, Flume, socket, etc.
If the data receiving becomes a bottleneck in the system, then consider parallelizing the data receiving.
Note that each input DStream creates a single receiver running on a worker machine that receives a single stream of data. Receiving multiple data streams can therefore be achieved by creating multiple input DStreams and configuring them to receive different partitions of the data stream from the source s. For example, a single Kafka input DStream receiving two topics of data can be split into two Kafka input streams, each receiving only one topic. This would run two receivers, allowing data to be received in parallel, thus increasing overall throughput. These multiple DStreams can be unioned together to create a single DStream.
Then the transformations that were being applied on a single input DStream can be applied on the unified stream. This is done as follows. The number of blocks in each batch determines the number of tasks that will be used to process the received data in a map-like transformation. For example, block interval of ms will create 10 tasks per 2 second batches. If the number of tasks is too low that is, less than the number of cores per machine , then it will be inefficient as all available cores will not be used to process the data.
To increase the number of tasks for a given batch interval, reduce the block interval. However, the recommended minimum value of block interval is about 50 ms, below which the task launching overheads may be a problem. This distributes the received batches of data across the specified number of machines in the cluster before further processing. Cluster resources can be under-utilized if the number of parallel tasks used in any stage of the computation is not high enough. For example, for distributed reduce operations like reduceByKey and reduceByKeyAndWindow , the default number of parallel tasks is controlled by the spark.
You can pass the level of parallelism as an argument see PairDStreamFunctions documentation , or set the spark. The overheads of data serialization can be reduced by tuning the serialization formats. In the case of streaming, there are two types of data that are being serialized. That is, the data is serialized into bytes to reduce GC overheads, and replicated for tolerating executor failures. Also, the data is kept first in memory, and spilled over to disk only if the memory is insufficient to hold all of the input data necessary for the streaming computation.
For example, window operations persist data in memory as they would be processed multiple times.
However, unlike the Spark Core default of StorageLevel. In both cases, using Kryo serialization can reduce both CPU and memory overheads. See the Spark Tuning Guide for more details. For Kryo, consider registering custom classes, and disabling object reference tracking see Kryo-related configurations in the Configuration Guide. In specific cases where the amount of data that needs to be retained for the streaming application is not large, it may be feasible to persist data both types as deserialized objects without incurring excessive GC overheads.
For example, if you are using batch intervals of a few seconds and no window operations, then you can try disabling serialization in persisted data by explicitly setting the storage level accordingly. This would reduce the CPU overheads due to serialization, potentially improving performance without too much GC overheads. If the number of tasks launched per second is high say, 50 or more per second , then the overhead of sending out tasks to the slaves may be significant and will make it hard to achieve sub-second latencies. The overhead can be reduced by the following changes:.
Task Serialization : Using Kryo serialization for serializing tasks can reduce the task sizes, and therefore reduce the time taken to send them to the slaves. This is controlled by the spark. However, at this time, Kryo serialization cannot be enabled for closure serialization. This may be resolved in a future release. Execution mode : Running Spark in Standalone mode or coarse-grained Mesos mode leads to better task launch times than the fine-grained Mesos mode. Please refer to the Running on Mesos guide for more details.