Understanding Streams: Everything you need to know
What is Stream? Types of Streams? How to use and manipulate the Streams? How to create your own Stream? What is StreamBuilder and how to use it?
Introduction
- Streams are very complex and hard to understand. In this blog, I'll try to explain this concept with some examples.
- For beginners, this concept should be clear in their mind, because most of the time our application is dealing with asynchronous data, For example: Fetch data from API, DB, etc.
- Before start coding and implementing Stream let's first understand what actually a Stream is.
What are Streams?
- A streams are nothing but a continuous flow of Data.
- It provides an asynchronous sequence of Data. And because of the asynchronous type, we have to use
async
andawait
. - The
async
andawait
are keywords that are used to define an asynchronous function. someAsyncFunction () async{ await getData(); }
- There are two main ends associated with Steams: 1.Sink and 2.Source.
- The Sink is the end from where we can add data inside the Streams and
- The Source is the end from where we can get the data from the Streams.
- If you've not understood then, you can think of a Streams like a water pipe, where the flow of water (Data) is continuously flowing from one end (Sink) and coming out from the second end (Source).
- There are basically two types of Streams:
- 1. Single Subscription Stream and
- 2. Broadcast Stream
- The Single Subscription Streams allow only one
listener
to listen to the stream. If you try to listen to the same stream again, you'll get an exception. The Broadcast Stream allows more than onelistener
to listen to the same stream. - A
listener
listens to the particular event of the stream. Whenever data flows in the Streams thelistener
will recognize that event and listens to it. - If there are no data flowing inside the Streams then it gets destroyed.
- Now let's write some code to understand how actually we can use the Streams.
How to use Streams :
- To use Streams we have to make the function async generator (async* ).
- The
async*
is used when a function returns multiple future values one at a time. - When this function is called the Stream is created. Now to return the data from this function we have to use
yield
oryield*
. - yield is like return but the difference is
yield
doesn't terminate the function immediately. - Let's take one simple example :
Stream<int> numberGenerator() async*{ for(int i=0; i<10; i++){ await Future.delayed(Duration(milliseconds: 1000)); yield i; } }
void main() { final myStream = numberGenerator(); final subscription= myStream.listen( (data)=>{ print("Number: $data") } ); }
- The
subscription
is a type ofStreamSubscription
. When you listen on aStream
usingStream.listen
, aStreamSubscription
object is returned. - The
subscription
provides events to thelistener
, and holds the callbacks used to handle the events. Thesubscription
can also be used to unsubscribe from the events, or to temporarily pause the events from the stream. - Here when
subscription
starts listening to themyStream
, one by one all the integer values start flowing into the stream. - And then it will
yield
thatint
values one by one asynchronously. listen()
properties:onData():
- On each data event from this stream, onData handler is called. If onData is
null
, nothing happens. - Example :
void main() { final myStream = numberGenerator(); final subscription = myStream.listen( (data)=>{ // data handler function print("Number: $data") } ); }
onDone() :
- Called when stream completes all its events.
void main() { final myStream = timedCounter(); final subscription = myStream.listen( (data){ print("Number: $data"); }, onDone: () { print("You've reached at the end"); }, ); }
Stream<int> timedCounter() async* { int i = 0; while (i <= 5) { await Future.delayed(Duration(milliseconds: 1000)); yield i++; } }
- As you can see as soon as the stream completes its listening,
onDone
function is executed. onError(error):
- Subscription is automatically canceled when an error occurred.
void main() { final myStream = timedCounter(); final subscription = myStream .listen( (data){ print("Number: $data"); }, onDone: () { print("You've reached at the end"); }, onError: (e){ print("Error: $e"); }, ); }
Stream<int> timedCounter() async* { int i = 0; while (i <= 5) { await Future.delayed(Duration(milliseconds: 1000)); if(i ==2){ throw Exception("Error Occurred"); } yield i++; } }
- As soon as number
2
is encountered we've thrown anerror
andonError
function executed. cancelOnError :
- If
cancelOnError
istrue
, the subscription is automatically canceled when the firsterror
event is delivered. The default isfalse
. - We can see in the above example that even if the
error
is encountered, the stream has not stopped its execution. It executedonDone
too. To close the stream subscription as soon as theerror
is encountered we have to givetrue
value tocancelOnError
. void main() { final myStream = timedCounter(); final subscription = myStream.listen( (data){ print("Number: $data"); }, onDone: () { print("You've reached at the end"); }, onError: (e){ print("Error: $e"); }, cancelOnError: true ); }
Stream<int> timedCounter() async* { int i = 0; while (i <= 5) { await Future.delayed(Duration(milliseconds: 1000)); if(i ==2){ throw Exception("Error Occurred"); } yield i++; } }
Manipulating Streams :
- We can manipulate the stream data on the fly.
- We can chain up methods such as
map
,where
,take
andexpand
. - We can chain up all the methods that are available in the
iterable
(like,elementAt
,cast
,contains
,any
, etc) to manipulate the data of the stream map() :
void main() { final myStream = timedCounter() .map((data)=> 'Number : ${data*2}') .listen(print); }
Stream<int> timedCounter() async* { int i = 0; while (i <= 5) { await Future.delayed(Duration(milliseconds: 1000)); yield i++; } }
where() :
void main() { final myStream = timedCounter() .where((data) => data % 2 == 0) .map((data)=> 'Number : $data') .listen(print); }
Stream<int> timedCounter() async* { int i = 0; while (i <= 5) { await Future.delayed(Duration(milliseconds: 1000)); yield i++; } }
Types of Stream :
Single Subscription Stream :
- The Single Subscription Stream as the name suggests, only allows one listener to listen to the stream.
- This type of stream is basically used in reading a file, receive a web request, etc. It's because we want a continuous flow of data in the correct order and without any errors.
- You can listen to this stream by the
listen()
method as we've seen in the above examples. - This stream can listen only once. Listening again to this stream will cause an error.
- Example :
void main() { final myStream = timedCounter(); final subscription = myStream.listen( (data){ print("Number: $data"); }, ); }
Stream<int> timedCounter() async* { int i = 0; while (i <= 5) { await Future.delayed(Duration(milliseconds: 1000)); yield i++; } }
- Listening with two subscribers will cause an error
void main() { final myStream = timedCounter(); final subscriber1 = myStream.listen( (data){ print("Sub 1 : Number: $data"); }, ); final subscriber2= myStream.listen( (data){ print("Sub 2 : Number: $data"); }, ); }
Stream<int> timedCounter() async* { int i = 0; while (i <= 5) { await Future.delayed(Duration(milliseconds: 1000)); yield i++; } }
Broadcast Stream :
- When more than one subscribers want to listen to one particular stream, we have to make that stream broadcast stream.
asBroadcastStream()
is used to make any stream a Broadcast Stream.- Any subscriber can start listening to events as soon as they subscribe to it.
- Example :
void main() { final myStream = timedCounter().asBroadcastStream(); final subscriber = myStream.listen( (data){ print("Sub 1 : Number: $data"); }, ); final subscriber2 = myStream.map( (data)=> 'Sub 2 : Number : ${data*2}' ).listen(print); }
Stream<int> timedCounter() async* { int i = 0; while (i <= 5) { await Future.delayed(Duration(milliseconds: 1000)); yield i++; } }
Creating your Own Stream
- Most of the time we use the stream provided by the network libraries, file libraries, state management, etc.
- But we can create our own stream by using StreamController.
- StreamController gives you the ability to add your own events from anywhere.
- To use StreamController you have to import
dart:io
package inside your application. import 'dart:async';
void main() { final _streamController = StreamController<int>(); // initialization of StreamController int _number = 1; addData() { Timer.periodic(Duration(seconds: 1),(_) { _streamController.sink.add(_number); // adding data to stream _number++; }); } addData(); Stream<int> myStream = _streamController.stream; //creating a stream final subscription = myStream.listen( // listening to the stream (data) => { print(data) }, ); }
- Above stream will add data infinitely.
- If you want to close the stream at a certain point, you can do something like this :
import 'dart:async';
void main() { final _streamController = StreamController<int>(); int _number = 1; addData() { Timer.periodic(Duration(seconds: 1),(_) { if(_number == 5){ _streamController.close(); // closing a stream return; } _streamController.sink.add(_number); _number++; }); } addData(); Stream<int> myStream = _streamController.stream; final subscription = myStream.listen( (data) => { print(data) }, onDone: (){ print("All data received"); // called when stream is closed } ); }
- You can also listen to different events like
onListen
,onPause
,onCancel
,onResume
using StreamController. StreamController<int>( onListen: ..., onPause: ..., onResume: ..., onCancel: ... );
StreamBuilder in Flutter
- We are often dealing with asynchronous data in our app.
- Flutter provides StreamBuilder which listens to the event flowing from the stream.
- For every new event it rebuilt the widget giving them the latest event to work with.
- You can give your stream to StreamBuilder's
stream
property. StreamBuilder( stream: myStream //... )
- The
builder
property is used to return a Widget that we want to display on the screen. - It has
context
and asnapshot
as a parameter StreamBuilder( stream: myStream, builder: (context, snapshot) { return Container() } )
- The
initialValue
property of the StreamBuilder is used to give the initial data to the widget while it's waiting for the first event. - You can check if the
snapshot
has data or not, has any error or not and also the connection state : StreamBuilder( stream: myStream, builder: (context, snapshot) { if(!snapshot.hasData) return CircularProgressIndicator() //checking is there any data if(snapshot.hasError) return Text("Something went wrong") //cheking for the error if(snapshot.connectionState == ConnectionState.done){} // or `waiting`,`none`, `active` return Container() } )
- Let's take one simple example :
- In the below example, there is one container and a button underneath it.
- When the button is pressed we are adding a new data(Color)to the stream.
class FlutterStreamBuilder extends StatefulWidget { @override _FlutterStreamBuilderState createState() => _FlutterStreamBuilderState(); }
class _FlutterStreamBuilderState extends State<FlutterStreamBuilder> { final colorStream = StreamController<Color>(); // generate new Color randomly Color generateColor() { final random = Random(); return Color.fromARGB( 255, random.nextInt(255), random.nextInt(255), random.nextInt(255), ); } // add Color to `colorStream` addData(){ colorStream.sink.add(generateColor()); } @override void initState() { addData(); super.initState(); } @override void dispose() { colorStream.close(); // To prevent memory leak, Make Sure you close the Stream. super.dispose(); } @override Widget build(BuildContext context) { return Scaffold( body: SizedBox.expand( child: Column( mainAxisAlignment: MainAxisAlignment.center, crossAxisAlignment: CrossAxisAlignment.center, children: [ StreamBuilder( stream: colorStream.stream, builder: (BuildContext context, AsyncSnapshot<dynamic> snapshot) { if (!snapshot.hasData) { return Center(child:CircularProgressIndicator()); } if (snapshot.connectionState == ConnectionState.done) {} return Container( height: 220, width: 220, color: snapshot.data, ); }), SizedBox(height:30), ElevatedButton(onPressed: addData, child: Text("Click")) ], ), ), ); } }
Wrapping Up
- Thanks for reading. Hope you liked it.
- Make sure you leave feedback in the comments ๐.
ย