reactor demo
立即下载
资源介绍:
reactor demo
package com.atguigu.reactor;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
/**
* @author lfy
* @Description
* @create 2023-11-30 20:07
*/
public class APITest {
@Test
void next(){
Integer block = Flux.just(1, 2, 3)
.next()
.block();
System.out.println(block);
}
//Context-API: https://projectreactor.io/docs/core/release/reference/#context
@Test //ThreadLocal在响应式编程中无法使用。
//响应式中,数据流期间共享数据,Context API: Context:读写 ContextView:只读;
void threadlocal(){
//支持Context的中间操作
Flux.just(1,2,3)
.transformDeferredContextual((flux,context)->{
System.out.println("flux = " + flux);
System.out.println("context = " + context);
return flux.map(i->i+"==>"+context.get("prefix"));
})
//上游能拿到下游的最近一次数据
.contextWrite(Context.of("prefix","哈哈"))
//ThreadLocal共享了数据,上游的所有人能看到; Context由下游传播给上游
.subscribe(v-> System.out.println("v = " + v));
//以前 命令式编程
// controller -- service -- dao
//响应式编程 dao(10:数据源) --> service(10) --> controller(10); 从下游反向传播
}
@Test
void paralleFlux() throws IOException {
// 百万数据,8个线程,每个线程处理100,进行分批处理一直处理结束
Flux.range(1,1000000)
.buffer(100)
.parallel(8)
.runOn(Schedulers.newParallel("yy"))
.log()
.flatMap(list->Flux.fromIterable(list))
.collectSortedList(Integer::compareTo)
.subscribe(v-> System.out.println("v = " + v));
System.in.read();
}
@Test
void block(){
//
// Integer integer = Flux.just(1, 2, 4)
// .map(i -> i + 10)
// .blockLast();
// System.out.println(integer);
List integers = Flux.just(1, 2, 4)
.map(i -> i + 10)
.collectList()
.block(); // 也是一种订阅者; BlockingMonoSubscriber
System.out.println("integers = " + integers);
}
@Test
void sinks() throws InterruptedException, IOException {
// Flux.create(fluxSink -> {
// fluxSink.next("111")
// })
// Sinks.many(); //发送Flux数据。
// Sinks.one(); //发送Mono数据
// Sinks: 接受器,数据管道,所有数据顺着这个管道往下走的
//Sinks.many().unicast(); //单播: 这个管道只能绑定单个订阅者(消费者)
//Sinks.many().multicast();//多播: 这个管道能绑定多个订阅者
//Sinks.many().replay();//重放: 这个管道能重放元素。 是否给后来的订阅者把之前的元素依然发给它;
// 从头消费还是从订阅的那一刻消费;
// Sinks.Many
资源文件列表:
reactor-programming-master.zip 大约有118个文件