Understanding Streams: Everything you need to know
What is Stream? Types of Streams? How to use and manipulate the Streams? How to create your own Stream? What is StreamBuilder and how to use it?

I'm a mobile/web developer đ¨âđģ who loves to build projects and share valuable tips for programmers
Follow me for Flutter, React/Next.js, and other awesome tech-related stuff đ
Introduction
- Streams are very complex and hard to understand. In this blog, I'll try to explain this concept with some examples.
- For beginners, this concept should be clear in their mind, because most of the time our application is dealing with asynchronous data, For example: Fetch data from API, DB, etc.
- Before start coding and implementing Stream let's first understand what actually a Stream is.

What are Streams?
- A streams are nothing but a continuous flow of Data.
- It provides an asynchronous sequence of Data. And because of the asynchronous type, we have to use
asyncandawait. - The
asyncandawaitare keywords that are used to define an asynchronous function. someAsyncFunction () async{ await getData(); }- There are two main ends associated with Steams: 1.Sink and 2.Source.
- The Sink is the end from where we can add data inside the Streams and
- The Source is the end from where we can get the data from the Streams.

- If you've not understood then, you can think of a Streams like a water pipe, where the flow of water (Data) is continuously flowing from one end (Sink) and coming out from the second end (Source).

- There are basically two types of Streams:
- 1. Single Subscription Stream and
- 2. Broadcast Stream
- The Single Subscription Streams allow only one
listenerto listen to the stream. If you try to listen to the same stream again, you'll get an exception. The Broadcast Stream allows more than onelistenerto listen to the same stream. - A
listenerlistens to the particular event of the stream. Whenever data flows in the Streams thelistenerwill recognize that event and listens to it. - If there are no data flowing inside the Streams then it gets destroyed.
- Now let's write some code to understand how actually we can use the Streams.
How to use Streams :
- To use Streams we have to make the function async generator (async* ).
- The
async*is used when a function returns multiple future values one at a time. - When this function is called the Stream is created. Now to return the data from this function we have to use
yieldoryield*. - yield is like return but the difference is
yielddoesn't terminate the function immediately. - Let's take one simple example :
Stream<int> numberGenerator() async*{ for(int i=0; i<10; i++){ await Future.delayed(Duration(milliseconds: 1000)); yield i; } }void main() { final myStream = numberGenerator(); final subscription= myStream.listen( (data)=>{ print("Number: $data") } ); }
- The
subscriptionis a type ofStreamSubscription. When you listen on aStreamusingStream.listen, aStreamSubscriptionobject is returned. - The
subscriptionprovides events to thelistener, and holds the callbacks used to handle the events. Thesubscriptioncan also be used to unsubscribe from the events, or to temporarily pause the events from the stream. - Here when
subscriptionstarts listening to themyStream, one by one all the integer values start flowing into the stream. - And then it will
yieldthatintvalues one by one asynchronously. listen()properties:onData():
- On each data event from this stream, onData handler is called. If onData is
null, nothing happens. - Example :
void main() { final myStream = numberGenerator(); final subscription = myStream.listen( (data)=>{ // data handler function print("Number: $data") } ); }onDone() :
- Called when stream completes all its events.
void main() { final myStream = timedCounter(); final subscription = myStream.listen( (data){ print("Number: $data"); }, onDone: () { print("You've reached at the end"); }, ); }Stream<int> timedCounter() async* { int i = 0; while (i <= 5) { await Future.delayed(Duration(milliseconds: 1000)); yield i++; } }- As you can see as soon as the stream completes its listening,
onDonefunction is executed. 
onError(error):
- Subscription is automatically canceled when an error occurred.
void main() { final myStream = timedCounter(); final subscription = myStream .listen( (data){ print("Number: $data"); }, onDone: () { print("You've reached at the end"); }, onError: (e){ print("Error: $e"); }, ); }Stream<int> timedCounter() async* { int i = 0; while (i <= 5) { await Future.delayed(Duration(milliseconds: 1000)); if(i ==2){ throw Exception("Error Occurred"); } yield i++; } }- As soon as number
2is encountered we've thrown anerrorandonErrorfunction executed.
cancelOnError :
- If
cancelOnErroristrue, the subscription is automatically canceled when the firsterrorevent is delivered. The default isfalse. - We can see in the above example that even if the
erroris encountered, the stream has not stopped its execution. It executedonDonetoo. To close the stream subscription as soon as theerroris encountered we have to givetruevalue tocancelOnError. void main() { final myStream = timedCounter(); final subscription = myStream.listen( (data){ print("Number: $data"); }, onDone: () { print("You've reached at the end"); }, onError: (e){ print("Error: $e"); }, cancelOnError: true ); }Stream<int> timedCounter() async* { int i = 0; while (i <= 5) { await Future.delayed(Duration(milliseconds: 1000)); if(i ==2){ throw Exception("Error Occurred"); } yield i++; } }
Manipulating Streams :
- We can manipulate the stream data on the fly.
- We can chain up methods such as
map,where,takeandexpand. - We can chain up all the methods that are available in the
iterable(like,elementAt,cast,contains,any, etc) to manipulate the data of the stream map() :
void main() { final myStream = timedCounter() .map((data)=> 'Number : ${data*2}') .listen(print); }Stream<int> timedCounter() async* { int i = 0; while (i <= 5) { await Future.delayed(Duration(milliseconds: 1000)); yield i++; } }
where() :
void main() { final myStream = timedCounter() .where((data) => data % 2 == 0) .map((data)=> 'Number : $data') .listen(print); }Stream<int> timedCounter() async* { int i = 0; while (i <= 5) { await Future.delayed(Duration(milliseconds: 1000)); yield i++; } }
Types of Stream :
Single Subscription Stream :
- The Single Subscription Stream as the name suggests, only allows one listener to listen to the stream.
- This type of stream is basically used in reading a file, receive a web request, etc. It's because we want a continuous flow of data in the correct order and without any errors.
- You can listen to this stream by the
listen()method as we've seen in the above examples. - This stream can listen only once. Listening again to this stream will cause an error.
- Example :
void main() { final myStream = timedCounter(); final subscription = myStream.listen( (data){ print("Number: $data"); }, ); }Stream<int> timedCounter() async* { int i = 0; while (i <= 5) { await Future.delayed(Duration(milliseconds: 1000)); yield i++; } }
- Listening with two subscribers will cause an error
void main() { final myStream = timedCounter(); final subscriber1 = myStream.listen( (data){ print("Sub 1 : Number: $data"); }, ); final subscriber2= myStream.listen( (data){ print("Sub 2 : Number: $data"); }, ); }Stream<int> timedCounter() async* { int i = 0; while (i <= 5) { await Future.delayed(Duration(milliseconds: 1000)); yield i++; } }
Broadcast Stream :
- When more than one subscribers want to listen to one particular stream, we have to make that stream broadcast stream.
asBroadcastStream()is used to make any stream a Broadcast Stream.- Any subscriber can start listening to events as soon as they subscribe to it.
- Example :
void main() { final myStream = timedCounter().asBroadcastStream(); final subscriber = myStream.listen( (data){ print("Sub 1 : Number: $data"); }, ); final subscriber2 = myStream.map( (data)=> 'Sub 2 : Number : ${data*2}' ).listen(print); }Stream<int> timedCounter() async* { int i = 0; while (i <= 5) { await Future.delayed(Duration(milliseconds: 1000)); yield i++; } }
Creating your Own Stream
- Most of the time we use the stream provided by the network libraries, file libraries, state management, etc.
- But we can create our own stream by using StreamController.
- StreamController gives you the ability to add your own events from anywhere.
- To use StreamController you have to import
dart:iopackage inside your application. import 'dart:async';void main() { final _streamController = StreamController<int>(); // initialization of StreamController int _number = 1; addData() { Timer.periodic(Duration(seconds: 1),(_) { _streamController.sink.add(_number); // adding data to stream _number++; }); } addData(); Stream<int> myStream = _streamController.stream; //creating a stream final subscription = myStream.listen( // listening to the stream (data) => { print(data) }, ); }
- Above stream will add data infinitely.
- If you want to close the stream at a certain point, you can do something like this :
import 'dart:async';void main() { final _streamController = StreamController<int>(); int _number = 1; addData() { Timer.periodic(Duration(seconds: 1),(_) { if(_number == 5){ _streamController.close(); // closing a stream return; } _streamController.sink.add(_number); _number++; }); } addData(); Stream<int> myStream = _streamController.stream; final subscription = myStream.listen( (data) => { print(data) }, onDone: (){ print("All data received"); // called when stream is closed } ); }
- You can also listen to different events like
onListen,onPause,onCancel,onResumeusing StreamController. StreamController<int>( onListen: ..., onPause: ..., onResume: ..., onCancel: ... );
StreamBuilder in Flutter
- We are often dealing with asynchronous data in our app.
- Flutter provides StreamBuilder which listens to the event flowing from the stream.
- For every new event it rebuilt the widget giving them the latest event to work with.
- You can give your stream to StreamBuilder's
streamproperty. StreamBuilder( stream: myStream //... )- The
builderproperty is used to return a Widget that we want to display on the screen. - It has
contextand asnapshotas a parameter StreamBuilder( stream: myStream, builder: (context, snapshot) { return Container() } )- The
initialValueproperty of the StreamBuilder is used to give the initial data to the widget while it's waiting for the first event. - You can check if the
snapshothas data or not, has any error or not and also the connection state : StreamBuilder( stream: myStream, builder: (context, snapshot) { if(!snapshot.hasData) return CircularProgressIndicator() //checking is there any data if(snapshot.hasError) return Text("Something went wrong") //cheking for the error if(snapshot.connectionState == ConnectionState.done){} // or `waiting`,`none`, `active` return Container() } )- Let's take one simple example :
- In the below example, there is one container and a button underneath it.
- When the button is pressed we are adding a new data(Color)to the stream.
class FlutterStreamBuilder extends StatefulWidget { @override _FlutterStreamBuilderState createState() => _FlutterStreamBuilderState(); }class _FlutterStreamBuilderState extends State<FlutterStreamBuilder> { final colorStream = StreamController<Color>(); // generate new Color randomly Color generateColor() { final random = Random(); return Color.fromARGB( 255, random.nextInt(255), random.nextInt(255), random.nextInt(255), ); } // add Color to `colorStream` addData(){ colorStream.sink.add(generateColor()); } @override void initState() { addData(); super.initState(); } @override void dispose() { colorStream.close(); // To prevent memory leak, Make Sure you close the Stream. super.dispose(); } @override Widget build(BuildContext context) { return Scaffold( body: SizedBox.expand( child: Column( mainAxisAlignment: MainAxisAlignment.center, crossAxisAlignment: CrossAxisAlignment.center, children: [ StreamBuilder( stream: colorStream.stream, builder: (BuildContext context, AsyncSnapshot<dynamic> snapshot) { if (!snapshot.hasData) { return Center(child:CircularProgressIndicator()); } if (snapshot.connectionState == ConnectionState.done) {} return Container( height: 220, width: 220, color: snapshot.data, ); }), SizedBox(height:30), ElevatedButton(onPressed: addData, child: Text("Click")) ], ), ), ); } }
Wrapping Up
- Thanks for reading. Hope you liked it.
- Make sure you leave feedback in the comments đ.






