Mam nadzieję, że nigdy nie jest zbyt późno, aby odpowiedzieć, przynajmniej ktoś inny może znaleźć moją odpowiedź użyteczne :)
Więc topology.newStaticState()
jest abstrakcją trójząb jest o przechowywaniu danych queryable. Parametrem dla newStaticState()
powinna być implementacja - oparta na metodzie umowy - z storm.trident.state.StateFactory
. Fabryka z kolei powinna wdrożyć metodę zwracającą instancję o numerze storm.trident.state.State
. Jeśli jednak planujesz wysłać zapytanie do swojego stanu, powinieneś zwrócić istote z storm.trident.state.map.ReadOnlyMapState
, ponieważ zwykły storm.trident.state.State
nie ma metod sprawdzania prawdziwego źródła danych (otrzymasz wyjątek od klasy, jeśli spróbujesz użyć czegokolwiek poza ReadOnlyMapState
).
Więc, spróbujmy!
Manekin realizacja stan:
public static class ExampleStaticState implements ReadOnlyMapState<String> {
private final Map<String, String> dataSourceStub;
public ExampleStaticState() {
dataSourceStub = new HashMap<>();
dataSourceStub.put("tuple-00", "Trident");
dataSourceStub.put("tuple-01", "definitely");
dataSourceStub.put("tuple-02", "lacks");
dataSourceStub.put("tuple-03", "documentation");
}
@Override
public List<String> multiGet(List<List<Object>> keys) {
System.out.println("DEBUG: MultiGet, keys is " + keys);
List<String> result = new ArrayList<>();
for (List<Object> inputTuple : keys) {
result.add(dataSourceStub.get(inputTuple.get(0)));
}
return result;
}
@Override
public void beginCommit(Long txid) {
// never gets executed...
System.out.println("DEBUG: Begin commit, txid=" + txid);
}
@Override
public void commit(Long txid) {
// never gets executed...
System.out.println("DEBUG: Commit, txid=" + txid);
}
}
Fabryka:
public static class ExampleStaticStateFactory implements StateFactory {
@Override
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
return new ExampleStaticState();
}
}
Prosty psvm
(aka public static void main
):
public static void main(String... args) {
TridentTopology tridentTopology = new TridentTopology();
FeederBatchSpout spout = new FeederBatchSpout(Arrays.asList(new String[]{
"foo"
}));
TridentState state = tridentTopology.newStaticState(new ExampleStaticStateFactory());
tridentTopology
.newStream("spout", spout)
.stateQuery(state, new Fields("foo"), new MapGet(), new Fields("bar"))
.each(new Fields("foo", "bar"), new Debug())
;
Config conf = new Config();
conf.setNumWorkers(6);
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("tridentTopology", conf, tridentTopology.build());
spout.feed(Arrays.asList(new Values[]{
new Values("tuple-00"),
new Values("tuple-01"),
new Values("tuple-02"),
new Values("tuple-03")
}));
localCluster.shutdown();
}
I wreszcie, wyjście:
DEBUG: MultiGet, keys is [[tuple-00], [tuple-01], [tuple-02], [tuple-03]]
DEBUG: [tuple-00, Trident]
DEBUG: [tuple-01, definitely]
DEBUG: [tuple-02, lacks]
DEBUG: [tuple-03, documentation]
Widzisz, stateQuery() pobiera wartości z wsadu wejściowego i odwzorowuje je na wartości znalezione w "przechowywaniu danych".
nurkowanie nieco głębiej, można spojrzeć na źródła MapGet
klasy (facet, którego wystąpienie jest używany do odpytywania wewnątrz topologii) i znaleźć tam następujące:
public class MapGet extends BaseQueryFunction<ReadOnlyMapState, Object> {
@Override
public List<Object> batchRetrieve(ReadOnlyMapState map, List<TridentTuple> keys) {
return map.multiGet((List) keys);
}
@Override
public void execute(TridentTuple tuple, Object result, TridentCollector collector) {
collector.emit(new Values(result));
}
}
to pod maską po prostu wywołuje metodę multiGet()
implementacji, a następnie emituje wartości znalezione w pamięci danych, dodając je do już istniejącej krotki.Możesz (chociaż nie jest to najlepsze, co możesz zrobić) stworzyć własną implementację BaseQueryFunction<ReadOnlyMapState, Object>
, wykonując coś bardziej skomplikowanego.
Czy możesz zdefiniować "Trident" w tym kontekście? Istnieje wiele rzeczy zwanych Trident. – Charles
Kontekst to "Burza": https://github.com/nathanmarz/storm/wiki/Documentation#trident – Dan