2014-12-20 24 views
8

Mój dokładny scenariusz polega na wstawianiu danych do bazy danych w partiach, więc chcę gromadzić obiekty DOM następnie co 1000, spłukiwać je.Czy istnieje elegancki sposób przetwarzania strumienia w porcjach?

Wdrożyłem go, umieszczając kod w akumulatorze, aby wykryć pełnię, a następnie przepłukać, ale to wydaje się błędne - kontrola spłukiwania powinna pochodzić od osoby dzwoniącej.

Mogę przekonwertować strumień na listę, a następnie użyć podelistycznie w sposób powtarzalny, ale to też wydaje się niezgrabne.

Czy jest to dobry sposób na podejmowanie działań co n elementów, a następnie kontynuowanie strumienia przy jednoczesnym przetwarzaniu strumienia?

+2

Na podobny przypadek użycia Zrobiłem to: https://bitbucket.org/assylias/bigblue-utils/src/3f56d19777a0ebc5dc3b53d3c2ec8dc64fd2b28e/src/main/java/com/assylias/bigblue/utils/SplitProcessing.java?at= master - nie dokładnie to, o co prosisz. – assylias

Odpowiedz

4

Elegancja jest w oku patrzącego. Jeśli nie masz nic przeciwko użyciu funkcji stanowej w groupingBy, można to zrobić:

AtomicInteger counter = new AtomicInteger(); 

stream.collect(groupingBy(x->counter.getAndIncrement()/chunkSize)) 
    .values() 
    .forEach(database::flushChunk); 

nie wygra żadnych wydajności lub użycia pamięci punktów nad oryginalnym rozwiązaniem, ponieważ będzie jeszcze zmaterializować cały strumień przed wykonaniem byle co.

Jeśli chcesz uniknąć materializacji listy, strumień API nie pomoże. Będziesz musiał dostać iterator strumienia lub spliterator i zrobić coś takiego:

Spliterator<Integer> split = stream.spliterator(); 
int chunkSize = 1000; 

while(true) { 
    List<Integer> chunk = new ArrayList<>(size); 
    for (int i = 0; i < chunkSize && split.tryAdvance(chunk::add); i++){}; 
    if (chunk.isEmpty()) break; 
    database.flushChunk(chunk); 
} 
4

wykorzystaniem biblioteki StreamEx rozwiązaniem będzie wyglądać

Stream<Integer> stream = IntStream.iterate(0, i -> i + 1).boxed().limit(15); 
AtomicInteger counter = new AtomicInteger(0); 
int chunkSize = 4; 

StreamEx.of(stream) 
     .groupRuns((prev, next) -> counter.incrementAndGet() % chunkSize != 0) 
     .forEach(chunk -> System.out.println(chunk)); 

wyjściowa:

[0, 1, 2, 3] 
[4, 5, 6, 7] 
[8, 9, 10, 11] 
[12, 13, 14] 

groupRuns przyjmuje orzeczenie, że decyduje, czy 2 elementy powinny należeć do tej samej grupy.

Tworzy grupę, gdy tylko znajdzie pierwszy element, który do niej nie należy.

+0

To nie działa dla pojedynczego rekordu. Na przykład całkowity strumień po prostu [1] zakończyłby się niepowodzeniem. –

+0

Strumień pojedynczego przedmiotu działa dla mnie. Jakiego rodzaju błąd widzisz? Czy mógłbyś opublikować kod, który wypróbowałeś? –

+0

Licznik zwraca niepoprawną wartość w przypadku, gdy jest jeden rekord. –