Używam Spark Streaming z dwoma różnymi oknami (w oknie do szkolenia modelu z SKLearn i drugim do przewidywania wartości na podstawie tego modelu) i zastanawiam się, jak mogę uniknąć jednego okna ("powolnego" okna treningowego) do trenowania modelu, bez "blokowania" okna "szybkiego" przewidywania.
Moja uproszczony kod wygląda następująco:Jak uniknąć jednego okna Spark Streaming blokującego inne okno z uruchomionym rodzimym kodem Pythona
conf = SparkConf()
conf.setMaster("local[4]")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 1)
stream = ssc.socketTextStream("localhost", 7000)
import Custom_ModelContainer
### Window 1 ###
### predict data based on model computed in window 2 ###
def predict(time, rdd):
try:
# ... rdd conversion to df, feature extraction etc...
# regular python code
X = np.array(df.map(lambda lp: lp.features.toArray()).collect())
pred = Custom_ModelContainer.getmodel().predict(X)
# send prediction to GUI
except Exception, e: print e
predictionStream = stream.window(60,60)
predictionStream.foreachRDD(predict)
### Window 2 ###
### fit new model ###
def trainModel(time, rdd):
try:
# ... rdd conversion to df, feature extraction etc...
X = np.array(df.map(lambda lp: lp.features.toArray()).collect())
y = np.array(df.map(lambda lp: lp.label).collect())
# train test split etc...
model = SVR().fit(X_train, y_train)
Custom_ModelContainer.setModel(model)
except Exception, e: print e
modelTrainingStream = stream.window(600,600)
modelTrainingStream.foreachRDD(trainModel)
(Uwaga: Custom_ModelContainer jest klasa napisałem, aby zapisać i odzyskać wyszkolony modelu)
Moja konfiguracja zazwyczaj działa dobrze, z wyjątkiem że za każdym razem nowy model jest wyszkolony w drugim oknie (co zajmuje około minuty), pierwsze okna nie obliczają prognoz, dopóki szkolenie modelowe nie zostanie zakończone. Właściwie, wydaje mi się, że to ma sens, ponieważ dopasowanie modelu i prognozy są obliczane na węźle głównym (w ustawieniu niepodzielonym - z powodu SKLearn).
Moje pytanie brzmi: czy możliwe byłoby szkolenie modelu na pojedynczym węźle roboczym (zamiast w węźle głównym)? Jeśli tak, to w jaki sposób mogę osiągnąć ten drugi cel i czy to rzeczywiście rozwiązałoby mój problem?
Jeśli nie, to wszelkie inne sugestie dotyczące tego, jak mogę dokonać takiej konfiguracji bez opóźniania obliczeń w oknie 1?
Każda pomoc jest bardzo doceniana.
EDYCJA: Domyślam się, że bardziej ogólne pytanie brzmi: Jak mogę uruchomić dwa różne zadania na dwóch różnych pracowników równolegle?