ในการเขียนโปรแกรมทั่วๆ ไปเรามักจะส่งข้อมูลกันในรูปแบบของ sync
เช่น
int getNumber() {
return 1;
}
การเรียกใช้งานก็ง่ายๆ แบบนี้
print(getNumber());
แต่ในบทก่อนๆ เราเคยสอนเรื่องของ Generator ไปแล้ว นั่นคือการที่ส่งข้อมูลแบบหลายๆ ตัวกลับมาในรูปแบบของ sync*
ทบทวนแบบเร็วๆ! คือแทนที่จะตอบข้อมูลกลับแค่ครั้งเดียวด้วย return
ก็เปลี่ยนเป็น yield
แทน (yield ตอบได้หลายครั้งมากกว่า return)
Iterable<int> getNumbers() sync* {
yield 1;
yield 2;
yield 3;
}
เวลารับข้อมูลจากฟังก์ชันพวกนี้ไปใช้ ก็เลยต้องใช้การวนลูปนั่นเอง
for(var num in getNumbers()) {
print(num);
}
ถ้าสรุปเป็นรูป ก็จะได้แบบข้างล่างนี่
แต่ทั้ง 2 วิธีนี้มันเป็นการส่งข้อมูลแบบ Synchronous คือโค้ดยังทำงานเรียงๆ กันตามลำดับ และโค้ดทั้งหมดจะไม่ถูกแตกเป็นส่วนๆ เข้า Event Loop แบบ Asynchronous
ดังนั้นต่อไป... เรามาดู 2 กรณีนี้ แต่อยู่ในรูปของ Asynchronous กันบ้างดีกว่า!
ในบทที่แล้วเราพูดถึง Future
กันไปแล้ว ซึ่งมันคือการเปลี่ยนจาก sync
ให้กลายเป็น async
นั่นเอง
แปลว่าเรายังขาดอยู่หนึ่งตัว คือ async*
นั่นเอง
Single Value | Zero or more Values | |
---|---|---|
Sync | int |
Iterable |
Async | Future |
Stream |
สรุปง่ายๆ คือ
Stream
ก็คือIterable
ที่อยู่ในรูปของ Asynchronous นั่นเอง
วิธีการดึงค่าออกมาจาก Stream
การใช้งาน Stream ก็คล้ายๆ กับ Future นั่นแหละ แต่มีรายละเอียดเยอะกว่า เพราะมันตอบได้หลายค่ามากกว่า Future ยังไงล่ะ
ขอสมมุติว่าเรามีฟังก์ชันหนึ่ง ที่ตอบค่าเป็น Stream
ชื่อ getNumberStream()
นะ
รอฟังค่าเรื่อยๆ ด้วย listen
สำหรับ Future เราจะใช้คำสั่ง then()
ในการรอค่า แต่ Stream จะใช้ listen()
แทน
Stream<int> numberStream = getNumberStream();
var subscription = numberStream.listen((num){
print('Receive: $num');
});
listen()
จะตอบกลับเป็น subscription ซึ่งเดี๋ยวจะพูดถึงอีกที
แน่นอนว่าเวลาเอาไปรัน ผลที่ได้อาจจะเป็นแบบนี้
output:
Receive: 1
Receive: 2
Receive: 3
Receive: ...
ก็คือเราได้รับตัวเลขหลายๆ ตัวนั่นเอง แต่อาจจะได้ตอนไหนก็ไม่รู้ แบบนี้
time │ │ │
0| 1sec. │ │
1| Receive: 1 ─┘ 2sec. │
2| Receive: 2 ───────┘ │
3| 4sec.
4| Receive: 3 ────────────┘
5|
สำหรับตัว listen()
นั้นมี options เสริมให้เราใช้งานได้หลายตัว คือ
onError
: เมื่อเกิด error ให้ทำอะไรonDone
: เมื่อได้รับข้อมูลครบทุกตัวแล้วให้ทำอะไร (ทำงานเมื่อ Stream ตอบข้อมูลครบหมดทุกตัวแล้ว มันจะเรียก onDone ให้ทำงาน)cancelOnError
: เป็นตัวกำหนดว่าถ้าเกิด error ขึ้นแล้ว (ในกรณีที่ข้อมูลยังส่งกลับมาไม่ครบทุกตัว) จะเลือกที่จะหยุดการทำงานของ Stream ไปเลย หรือจะยังให้ทำงานต่อก็ได้
var subscription = numberStream.listen(
(num){
print('ได้รับข้อมูล $num');
},
onError: (err){
print('มีข้อผิดพลาดเกิดขึ้นล่ะ $err');
},
onDone: (){
print('ได้รับข้อมูลครบแล้วจ้า');
},
cancelOnError: false,
);
ลดรูป Stream ด้วย await
เช่นเดียวกับตอน Future นะ คือเราสามารถเปลี่ยน listen()
ให้ไปอยู่ในรูปแบบการเขียนแบบ Synchronous ได้ แต่ก็จะไม่ได้ตรงๆ แบบ Future นะเพราะเราต้องรับข้อมูลด้วยลูป
Stream<int> numberStream = getNumberStream();
await for(var number in numberStream) {
print('Receive: $number');
}
Broadcast Stream
ตามปกติแล้วการใช้งาน Stream จะมีการ listen()
ได้แค่ครั้งเดียวเท่านั้น
เมื่อเรา subscription ไปครั้งหนึ่งแล้ว ถ้าจะ subscription ซึ่งกับ Stream ตัวเดิมมันจะเกิด Exception ขึ้นนะ!!
Stream<int> numberStream = getNumberStream();
var subscription1 = numberStream.listen((num){
print('Receive: $num');
});
//second subscribe -> Exception!!
var subscription2 = numberStream.listen((num){
print('Receive: $num');
});
วิธีการแก้ (ถ้าอยาก subscription หลายครั้งจริงๆ) ให้เปลี่ยน Stream ตัวนั้นให้เป็นสิ่งที่เรียกว่า "Broadcast Stream" แทน
วิธีเปลี่ยนก็ง่ายๆ คือใช้ get property ที่ชื่อว่า asBroadcastStream
แบบนี้
Stream<int> numberStream = getNumberStream().asBroadcastStream;
var subscription1 = numberStream.listen((num){
print('First receive: $num');
});
var subscription2 = numberStream.listen((num){
print('Second receive: $num');
});
ทีนี้ ถ้า Stream ของเรามีการส่งตัวเลข [1 เมื่อผ่านไป1วินาที] และ [3 เมื่อผ่านไป3วินาที] ก็จะได้ผลลัพธ์แบบนี้ (สังเกตดูว่าเราได้ข้อมูลตัวละ 2 ครั้ง เพราะมี subscription 2 ตัวนั่นเอง)
time │ │
0| 1sec. │
1| First receive: 1 ─┤ │
| First receive: 1 ─┘ 3sec.
2| │
3| Second receive: 3 ───────┤
| Second receive: 3 ───────┘
4|
Subscription
กลับมาที่สิ่งที่เราพูดค้างไว้ นั่นคือตอบที่เราสั่ง listen()
เราจะได้สิ่งที่เรียกว่า subscription
กลับมา
เพราะ Stream คือ "กระแสข้อมูล" ที่ไหลมาเรื่อยๆ มาตอนไหนก็ไม่รู้ เลยมี subscription เอาไว้ควบคุมกระแสนั้นอีกที
//หยุดการรับข้อมูล
subscription.pause();
//กลับรับข้อมูลใหม่
subscription.resume();
//แคนเซิล Stream ตัวนั้น
subscription.cancel();
แต่มีข้อควรระวังอย่างนึง คือการ pause()
ไม่ใช้การไม่รับข้อมูลในจังหวะนั้น แต่เป็นการหยุดรับข้อมูลชั่วคราวเท่านั้น แปลว่า "ข้อมูลไม่ได้หายไปนะ มันแค่เข้าคิวรอเรา resume()
อีกทีนั่นเอง!"
ลองดูตัวอย่างข้างล่างประกอบ ขอสมมุติว่าเรามี stream อยู่หนึ่งตัวที่จะส่งตัวเลขกลับมาเรื่อยๆ ทุกๆ 1 วินาที [1, 2, 3, 4, 5, ...] แบบนี้นะ
var subscription = stream.listen((x){
print('Receive: $x');
});
Future.delayed(Duration(seconds: 3), (){
subscription.pause();
});
Future.delayed(Duration(seconds: 6), (){
subscription.resume();
});
Future.delayed(Duration(seconds: 8), (){
subscription.cancel();
});
แล้วเราก็ตั้งค่า (โดยใช้ Future.delayed) ให้มัน..
- pause: เมื่อผ่านไป 3 วินาที
- resume: เมื่อผ่านไป 6 วินาที (จากตอนเริ่มโปรแกรม)
- cancel: เมื่อผ่านไป 8 วินาที (จากตอนเริ่มโปรแกรม)
เราอาจจะคิดว่าระหว่างวินาทีที่ 3-6 เราจะไม่ได้รับข้อมูลช่วงนั้น (คือข้อมูล [4,5] หายไปเลย)
แต่ไม่ใช่นะ! เพราะเลข [4,5] นั้นยังอยู่นะ แค่มันเข้าคิวรอที่จะออกมาอยู่
เมื่อเรา resume ในวินาทีที่ 6 มันก็จะออกมาทีเดียวหมดเลย [4,5,6]
ดูอธิบายด้วย timeline ข้างล่างนี่น่าจะเข้าใจมากกว่า
time
1| Receive: 1
2| Receive: 2
3| Receive: 3
4| ├─ waiting until (6)
5| │
6| Receive: 4 ┐
| Receive: 5 ├─ 3 values in 1 sec.
| Receive: 6 ┘
7| Receive: 7
8| Receive: 8
9|
มาลองสร้าง Stream กันบ้าง
การสร้าง Stream ทำได้หลายวิธีมากๆ แต่แบบง่ายที่สุดคือใช้วิธีแบบ Generator
แปล Iterable เป็น Stream
เราเคยสอนการสร้าง Generator ไปแล้วในบท Dart 105: มันวนซ้ำได้นะ! มาสร้าง Generator และใช้งาน Iterable กันเถอะ แต่ตอนนั้นเราเขียนมันในรูปแบบของ sync
อย่างที่พูดไปตอนต้นว่า Stream
ก็คือ Iterable
ที่อยู่ในรูปของ Asynchronous เราจะมาแสดงให้เห็นกัน
ขอเปิดด้วยโค้ดแบบ Iterable ในรูปของ Synchronous
โจทย์คือ... เราต้องการสร้างฟังก์ชันสำหรับดึง Data ออกมาจำนวนหนึ่งตั้งแต่ from ถึง to
Data fetchOneData(int id){
...
}
Iterable<int> getData(int from, int to) sync* {
var dataList = [];
for(var i=from; i<=to; i++){
dataList.add(fetchOneData(i))
}
return dataList;
}
(ฟังก์ชัน fetchOneData()
ทำอะไรไม่ต้องสนใจนะ ไม่ใช่ประเด็นของเรื่อนี้)
แต่เพื่อ performance ที่ดี เลยเราไม่โหลดข้อมูลในครั้งเดียว แต่เลือกที่จะเขียนมันในรูปแบบ Generator แทน ก็จะได้โค้ดแบบนี้..
Data fetchOneData(int id){
...
}
Iterable<int> getData(int from, int to) sync* {
for(var i=from; i<=to; i++){
yield fetchOneData(i);
}
}
เวลาเราจะใช้งาน ก็เอาไปวนลูปได้ธรรมดาๆ แบบนี้
var dataList = getData(1, 10);
for(var data in dataList) {
print(data);
}
ขึ้นต่อไปคือ แล้วถ้าข้อมูลของเราไม่ได้มาแบบ sync ล่ะ?
ถ้าฟังก์ชัน fetchOneData()
ใช้เวลาโหลดข้อมูลนานขึ้น เราคงต้องเปลี่ยนมันเป็น Future แบบนี้
Future<Data> fetchData(int id) {
...
}
เมื่อเป็นแบบนี้ เราก็มีปัญหาซะแล้ว!
เพราะฟังก์ชัน getData()
ที่เป็น sync ไม่สามารถเรียก fetchOneData()
ที่เป็น async ได้ล่ะ!!
วิธีการแก้ก็คือเปลี่ยนฟังก์ชัน fetchOneData()
ให้หลายเป็น async ยังไงล่ะ
"ด้วยการเปลี่ยน Iterable ให้กลายเป็น Stream"
สิ่งที่เราต้องทำ มีอยู่ 3 อย่าง
- จากเดิมที่รีเทิร์นค่าเป็น
Iterable
เราต้องเปลี่ยนมันเป็นStream
แทน - จากเดิมที่ฟังก์ชันเป็นชนิด
sync*
เราต้องเปลี่ยนมันเป็นasync*
- จากเดิมที่เรา
yield
ค่าทันทีได้เลย แต่เมื่อมันเป็น Future แล้วเราก็ต้องสั่งให้มันรอด้วยawait
Stream<Data> getData(int from, int to) async* {
for(var id=from; id<=to; id++) {
yield await fetchOneData(id);
}
}
void main() {
fetchAllData(1, 10).listen(print);
}
จะเห็นว่า Iterable นั้นแปลงเป็น Stream ได้ง่ายๆ เลย
ลองมาดูกันอีกตัวอย่างกัน
int sumIt(List<int> items) {
int sum = 0;
for(var item in items) {
sum += item;
}
return sum;
}
เรามีฟังก์ชัน sumIt()
สำหรับหาผลรวมทั้งหมดใน List แต่ถ้าลิสต์ตัวนี้ไม่ได้ค่ามาแบบ sync ล่ะ? จะทำยังไง?
คราวนี้ให้สังเกตว่า parameter นั้นรับ List (หรือก็คือ Iterable นั่นแหละนะ) เข้ามา แล้วมันรีเทิร์นค่าแบบสเกล่าธรรมดา (คือ int
ธรรมดาๆ นี่แหละ)
สำหรับเคสนี้ สิ่งที่เราต้องทำก็คือ
- เปลี่ยนค่าแบบสเกล่าธรรมดาให้กลายเป็น Future
- เปลี่ยนให้ฟังก์ชันเป็น
async
(ระวัง! ไม่ได้เปลี่ยนเป็นasync*
นะ เพราะมันไม่ได้รีเทิร์น Stream) - จุดที่เป็นปัญหาคือ for loop ที่วนค่าตัว Stream .. ให้เราเติม
await
ลงไปข้างหน้าเป็นอันจบ
Future<int> sumIt(Stream<int> items) async {
int sum = 0;
await for(var item in items) {
sum += item;
}
return sum;
}
เรื่องสุดท้ายคือในบางกรณี เราอาจจะมีฟังก์ชัน Stream ที่เรียกใช้งาน Stream อีกตัวก็เป็นได้นะ
Stream<Data> getFirstStream() async* {
...
}
Stream<Data> getSecondStream() async* {
yield getFirstStream(); //Not Work!!
}
วิธีการนี้โค้ดอาจคอมไพล์ได้ แต่เราจะไม่ได้ข้อมูลอะไรเลย (ถ้าใครยังไม่เข้าใจว่าทำไมไม่เวิร์ค ลองกลับไปอ่านบทที่เราพูดถึง Generator อีกทีนะ)
วิธีการแก้ก็เหมือนกับฟังก์ชัน sync*
นั่นแหละ คือเราจะต้องวนลูปทีละตัว
Stream<Data> getFirstStream() async* {
...
}
Stream<Data> getSecondStream() async* {
await for(var s in getFirstStream()){
yield s;
}
}
หรือแบบง่ายกว่าคือการใช้ yield*
ก็ได้
Stream<Data> getFirstStream() async* {
...
}
Stream<Data> getSecondStream() async* {
yield* getFirstStream(); //Ok!!
}
นอกจากวิธีสร้างแบบ Generator ที่เราใช้แนวคิดเดียวกับ Generator ใน sync แล้วยังมีอีกวิธีหนึ่งที่ให้เราคุมการไหลของข้อมูลใน Stream ได้คล่องตัวขึ้น นั่นคือใช้ StreamController
ซึ่งเราจะพูดถึงกันในบทต่อไปนะ (พร้อมกับเก็บตกเนื้อหาเกี่ยวกับ Stream กันอีกนิดหน่อยด้วย)