Próbuję napisać topologii, który wykonuje następujące czynności:Grupowanie w topologii burzy prosty agregacji
- dziobek, który subskrybuje kanał twitter (na podstawie słów kluczowych)
- śrubą agregacji że agreguje pewną liczbę tweetów (np. N) w kolekcji i wysyła im śrubę drukarki
- Prosta śruba, która drukuje kolekcję na konsoli jednocześnie.
W rzeczywistości chcę zrobić więcej przetwarzania na kolekcji.
Testowałem to lokalnie i wygląda na to, że działa. Nie jestem jednak pewien, czy poprawnie ustawiłem zgrupowania na śrubach i czy działałoby to prawidłowo, gdy rozmieszczono je na prawdziwym klastrze burzowym. Byłbym wdzięczny, gdyby ktoś mógł pomóc w ocenie tej topologii i zasugerować wszelkie błędy, zmiany lub ulepszenia.
Dzięki.
Tak wygląda moja topologia.
builder.setSpout("spout", new TwitterFilterSpout("pittsburgh"));
builder.setBolt("sampleaggregate", new SampleAggregatorBolt())
.shuffleGrouping("spout");
builder.setBolt("printaggreator",new PrinterBolt()).shuffleGrouping("sampleaggregate");
Agregacja Bolt
public class SampleAggregatorBolt implements IRichBolt {
protected OutputCollector collector;
protected Tuple currentTuple;
protected Logger log;
/**
* Holds the messages in the bolt till you are ready to send them out
*/
protected List<Status> statusCache;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
log = Logger.getLogger(getClass().getName());
statusCache = new ArrayList<Status>();
}
@Override
public void execute(Tuple tuple) {
currentTuple = tuple;
Status currentStatus = null;
try {
currentStatus = (Status) tuple.getValue(0);
} catch (ClassCastException e) {
}
if (currentStatus != null) {
//add it to the status cache
statusCache.add(currentStatus);
collector.ack(tuple);
//check the size of the status cache and pass it to the next stage if you have enough messages to emit
if (statusCache.size() > 10) {
collector.emit(new Values(statusCache));
}
}
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("tweets"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null; //To change body of implemented methods use File | Settings | File Templates.
}
protected void setupNonSerializableAttributes() {
}
}
Bolt drukarki
public class PrinterBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println(tuple.size() + " " + tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
}
}
Podałem kod dla śruby agregatora powyżej (patrz metoda wykonywania). Na razie czeka, aż zgromadzi N (10 w powyższym przykładzie) wiadomości i podzieli je, gdy tylko będzie mieć 10 wiadomości. BTW Właśnie znalazłem błąd, który naprawię. Po wyemitowaniu wartości muszę wyczyścić pamięć podręczną. Jakie zmiany powinny być konieczne, jeśli muszę użyć więcej niż jednego agregatora. –