2015-10-28 24 views
14

Piszę moduł, który jest zapisywalnym strumieniem. Chcę wdrożyć interfejs rur dla moich użytkowników.Jaki jest prawidłowy sposób wstrzymania odczytu strumieniowego z pliku do zapisu w nodejs?

Jeśli wystąpi jakikolwiek błąd, muszę wstrzymać czytelny strumień i wyemitować zdarzenie błędu. Następnie użytkownik zdecyduje - jeśli jest w porządku z błędem, powinien być w stanie wznowić przetwarzanie danych.

var writeable = new BackPressureStream(); 
writeable.on('error', function(error){ 
    console.log(error); 
    writeable.resume(); 
}); 

var readable = require('fs').createReadStream('somefile.txt'); 
readable.pipe.(writeable); 

widzę, że węzeł dostarcza nam readable.pause() metody, które można wykorzystać, aby wstrzymać czytelny strumień. Ale nie mogę sobie wyobrazić, jak mogę to nazwać z mojego modułu zapisu do zapisu:

var Writable = require('stream').Writable; 

function BackPressureStream(options) { 
    Writable.call(this, options); 
} 
require('util').inherits(BackPressureStream, Writable); 

BackPressureStream.prototype._write = function(chunk, encoding, done) { 
    done(); 
}; 

BackPressureStream.prototype.resume = function() { 
    this.emit('drain'); 
} 

W jaki sposób można wprowadzić ciśnienie wsteczne w zapisywalnym strumieniu?

P.S. Możliwe jest użycie zdarzeń pipe/unpipe, które zapewniają czytelny strumień jako parametr. Ale mówi się również, że dla strumieni potokowych jedyną szansą na zatrzymanie jest wypakowanie czytelnego strumienia z zapisu.

Czy mam to prawo? Muszę odpakować mój zapis do strumienia, dopóki użytkownik nie wznowi połączenia? A po wznowieniu wywołań użytkownika, powinienem odejść czytelny strumień z powrotem?

+1

zainteresowany rozpoczęciem bounty na ten jeden? –

+1

hej, znalazłeś odpowiedź na swoje pytanie? –

Odpowiedz

0

Zasadniczo, jak rozumiem, szukasz przeciwciśnienia w strumieniu w przypadku zdarzenia błędu. Masz kilka opcji.

Po pierwsze, jak już wspomniano, należy użyć pipe, aby pobrać instancję strumienia odczytu i wykonać wymyślne kroki.

Inną opcją jest stworzenie zawijania zapisu strumienia, który daje taką funkcjonalność (czyli zajmuje WritableStream jako wejście, a przy realizacji funkcji strumieniowych, przekazuje dane wraz z dostarczanym strumienia.

zasadzie skończyć z czymś

source stream -> wrapping writable -> writable

https://nodejs.org/api/stream.html#stream_implementing_a_writable_stream zajmuje się wdrożenie zapisu strumienia.

klucz fo r jest tak, że jeśli wystąpi błąd w zapisywalnym dysku, ustawisz flagę w strumieniu, a następnie zadzwonisz na write, aby zdezaktywować porcję, zapisać wywołanie zwrotne i tylko wywołać. Coś jak

// ... 
constructor(wrappedWritableStream) { 
    wrappedWritableStream.on('error', this.errorHandler); 
    this.wrappedWritableStream = wrappedWritableStream; 
} 
// ... 
write(chunk, encoding, callback) { 
    if (this.hadError) { 
     // Note: until callback is called, this function won't be called again, so we will have maximum one stored 
     // chunk. 
     this.bufferedChunk = [chunk, encoding, callback]; 
    } else { 
     wrappedWritableStream.write(chunk, encoding, callback); 
    } 
} 
// ... 
errorHandler(err) { 
    console.error(err); 
    this.hadError = err; 
    this.emit(err); 
} 
// ... 
recoverFromError() { 
    if (this.bufferedChunk) { 
     wrappedWritableStream.write(...this.bufferedChunk); 
     this.bufferedChunk = undefined; 
    } 
    this.hadError = false; 
} 

Uwaga: należy tylko trzeba zaimplementować funkcję write, ale ja zachęcam do kopania wokół i bawić się z innymi funkcjami realizacji.

Warto również zauważyć, że możesz mieć problemy z zapisem w strumieniach, które emitowały zdarzenie błędu, ale pozostawię to jako oddzielny problem do rozwiązania.

Oto kolejny dobry zasobów na backpressuring https://nodejs.org/en/docs/guides/backpressuring-in-streams/