Java 9 รองรับการสร้าง Reactive Streams โดยแนะนำอินเทอร์เฟซบางส่วน:ผู้เผยแพร่ , สมาชิก , สมัครสมาชิก และ SubmissionPublisher คลาสที่ใช้ ผู้เผยแพร่ อินเตอร์เฟซ. แต่ละอินเทอร์เฟซสามารถมีบทบาทที่แตกต่างกันตามหลักการของ ปฏิกิริยา สตรีม .
เราสามารถใช้ สมาชิก อินเทอร์เฟซสำหรับสมัครรับข้อมูลที่เผยแพร่โดยผู้เผยแพร่ . เราจำเป็นต้องใช้ สมาชิก ส่วนต่อประสานและจัดเตรียมการใช้งานสำหรับวิธีนามธรรม
วิธีการติดต่อ Flow.Subscriber:
- onComplete(): เมธอดนี้ถูกเรียกเมื่อออบเจ็กต์ Publisher เสร็จสิ้นบทบาท
- onError(): วิธีการนี้ถูกเรียกเมื่อมีบางอย่างผิดพลาดใน Publisher และได้รับแจ้งไปยังสมาชิก
- onNext(): วิธีการนี้ถูกเรียกเมื่อใดก็ตามที่ผู้เผยแพร่มีข้อมูลใหม่ที่จะแจ้งให้สมาชิกทุกคนทราบ
- onSubscribe(): วิธีการนี้ถูกเรียกเมื่อผู้เผยแพร่เพิ่มสมาชิก
ตัวอย่าง
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.stream.IntStream;
public class SubscriberImplTest {
public static class Subscriber implements Flow.Subscriber<Integer> {
private Flow.Subscription subscription;
private boolean isDone;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("Subscribed");
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(Integer item) {
System.out.println("Processing " + item);
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Processing done");
isDone = true;
}
}
public static void main(String args[]) throws InterruptedException {
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
Subscriber subscriber = new Subscriber();
publisher.subscribe(subscriber);
IntStream intData = IntStream.rangeClosed(1, 10);
intData.forEach(publisher::submit);
publisher.close();
while(!subscriber.isDone) {
Thread.sleep(10);
}
System.out.println("Done");
}
} ผลลัพธ์
Subscribed Processing 1 Processing 2 Processing 3 Processing 4 Processing 5 Processing 6 Processing 7 Processing 8 Processing 9 Processing 10 Processing done Done