Skip to content

Commit

Permalink
Reactive Programming, Flow API, Reactive Streams
Browse files Browse the repository at this point in the history
  • Loading branch information
AndriiPiatakha committed Dec 15, 2023
1 parent 040f224 commit 8516e1e
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.itbulls.learnit.javacore.javaupdates.java9.flowwapi;

import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;

public class FlowApiExample {
public static void main(String[] args) throws InterruptedException {
// Create a SubmissionPublisher
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

// Create a custom Processor
MySubscriber subscriber = new MySubscriber();

// Subscribe the Subscriber to the Publisher
publisher.subscribe(subscriber);
TimeUnit.SECONDS.sleep(1);
// Publish some items to the Publisher
for (int i = 1; i <= 5; i++) {
System.out.println("Publisher: Emitting item " + i);
publisher.submit(i);
}

// Close the publisher to signal completion
publisher.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.itbulls.learnit.javacore.javaupdates.java9.flowwapi;

import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

//Custom Processor implementing the Subscriber and Publisher interfaces
class MySubscriber implements Subscriber<Integer> {
private Subscription subscription;

@Override
public void onSubscribe(Subscription subscription) {
System.out.println("Subscriber: Subscribed");
System.out.println(subscription);
this.subscription = subscription;
subscription.request(1); // Request the first item when subscribed
}

@Override
public void onNext(Integer item) {
System.out.println("Subscriber: Received item " + item);
// Transform the integer into a string and pass it downstream
submit(String.valueOf(item * 2));
subscription.request(1); // Request the next item
}

@Override
public void onError(Throwable throwable) {
System.err.println("Subscriber: Error - " + throwable.getMessage());
}

@Override
public void onComplete() {
System.out.println("Subscriber: Completed");
subscription.cancel();
}

// Helper method to submit an item downstream
private void submit(String item) {
System.out.println("Subscriber: Emitting item " + item);
}

}

0 comments on commit 8516e1e

Please sign in to comment.