2018-03-31

Java Lambda 初體驗,就遇到 Project Reactor 的 GroupedFlux

在練習書上的(不負責任的)範例時,遇到了「流中流」的問題,就是 Stream of Stream,一般都是用 flatMap 去打平,但這個 GroupedFlux 不太一樣。

先來說說需求,就是輸入一堆單字,然後統計每個字母(不分大小寫)的出現次數。

舉例來說,輸入「java」、「"hibernate」和「spring」三個單字,要得到以下的結果:
A => 3
B => 1
E => 2
G => 1
H => 1
I => 2
J => 1
N => 2
P => 1
R => 2
S => 1
T => 1
V => 1
第一次嘗試,試了好久才免強得到結果,但不符合我目前理解的標準。
Flux.just("java", "hibernate", "spring")// 1
        .map(String::toUpperCase)// 2
        .flatMap(s -> Flux.fromArray(s.split("")))// 3
        .groupBy(String::toString)// 4
        .sort((o1, o2) -> o1.key().compareTo(o2.key()))// 5
        .map(t -> t.key() + " => " + t.count().block())// 6
        .subscribe(System.out::println);// 7
  1. 先用 Flux.just() 這個 static 方法建立 String 流(在 Java 8 叫做 Stream,但在 Project Reactor 叫做 Flux,本質雖然類似,但實際上有很大的差異,就是非同步和非阻塞)。
  2. 為了不分大小寫,先用 Flux.map() 轉成大寫。
  3. 為了統計每個字母的數量,先用 String.split() 將字串拆成字母陣列,再用 Flux.fromArray() 將字母陣列轉換成 Flux<string>,這時候就是簡單的流中流(Flux of Flux),最後用 Flux.flapMap() 打平成單一個 Flux<string>,此時的 String 是單一字母。
  4. 使用 Flux.groupBy() 將 Flux<string>依照字母做統計,最終得到 GroupedFlux<string, String>,這時就是最棘手的流中流,Flux of GroupedFlux。
  5. 使用 Flux.sort() 將字母排序。
  6. 使用 Flux.map() 將 GroupedFlux 轉成目標字串,就是「字母 => 數量」,但問題來了,GroupedFlux.count() 回傳的是 Mono<long>,可以說是另一種流,Flux  可以看作是容納 0 到 無限多物件的容器,而 Mono 是容納 0 到 1 個物件的容器,有點像是 Java 8 的 Optional,目前找不到方法可以得到 Mono<long> 裡的 Long,最終用了我覺得不該用的 Mono.block(),雖然可以得到數量,但 Mono.block() 就像是 Flux.subscribe(),是流的終點,是不是不應該在流的中間呼叫這樣的 API 呢?
  7. 最後呼叫 Flux.subscribe() 將內容輸出。
第二次嘗試,將 Mono.block() 的呼叫放到最後的 Flux.subscribe(),不過好像還是怪怪的。
Flux.just("java", "hibernate", "spring")// 1
        .map(String::toUpperCase)// 2
        .flatMap(s -> Flux.fromArray(s.split("")))// 3
        .groupBy(String::toString)// 4
        .sort((o1, o2) -> o1.key().compareTo(o2.key()))// 5
        .subscribe(g -> System.out.println(g.key() + " => " + g.count().block()));// 6
最後終於試出一種比較像樣的結果。
Flux.just("java", "hibernate", "spring")// 1
        .map(String::toUpperCase)// 2
        .flatMap(s -> Flux.fromArray(s.split("")))// 3
        .groupBy(String::toString)// 4
        .sort((o1, o2) -> o1.key().compareTo(o2.key()))// 5
        .flatMap(g -> g.count().map(count -> g.key() + " => " + count))// 6
        .subscribe(System.out::println); // 7
差別在第 6 步驟用 Flux.flatMap() 去打平 Flux of GroupedFlux,但特別的地方在對 GroupedFlux.count()  回傳的 Mono<Long> 去呼叫它的 map(),因為 Mono<Long> 也是流,所以可以呼叫 Mono.map() 來得到內容(也就是數量)再組成字串,不過因為是流的關係,所以最後要用 flatMap 去打平。
---
---
---

沒有留言:

張貼留言