2013-07-13 8 views
7

Jestem nowy w Trident in Storm. Łamie mi głowę nad TridentState. O ile mój trójząb rozumiejący utrzymuje stan (tj. Metadane) dla każdej partii (czy wszystkie krotki w partii są w pełni przetwarzane przez utrzymywanie identyfikatora transakcji w bazie danych) i nie jestem całkowicie pewien, co to jest następująca instrukcja:Co to jest Trident State in Storm?

TridentState urlToTweeters = 
    topology.newStaticState(getUrlToTweetersState()); 

Czy ktoś może wyjaśnić, co faktycznie się dzieje, gdy zdefiniujemy powyższy kod?

+0

Czy możesz zdefiniować "Trident" w tym kontekście? Istnieje wiele rzeczy zwanych Trident. – Charles

+1

Kontekst to "Burza": https://github.com/nathanmarz/storm/wiki/Documentation#trident – Dan

Odpowiedz

0

Istnieje dobra dokumentacja dotycząca stanu Trident on the storm wiki. Prosta odpowiedź na twoje pytanie jest taka, że ​​urlToTweeters jest obiektem stanu, z którego można uzyskać zapytanie. Zakładam, że powyższe oświadczenie jest z trident tutorial, zamieszczonym poniżej:

TridentState urlToTweeters = topology.newStaticState(getUrlToTweetersState()); 
TridentState tweetersToFollowers = topology.newStaticState(getTweeterToFollowersState()); 
topology.newDRPCStream("reach") 
    .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters")).each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")) 
    /* At this point we have the tweeters for each url passed in args */ 
    .shuffle()   
    .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")) 
    .parallelismHint(200) 
    .each(new Fields("followers"), new ExpandList(), new Fields("follower")) 
    .groupBy(new Fields("follower")) 
    .aggregate(new One(), new Fields("one")) 
    .parallelismHint(20) 
    .aggregate(new Count(), new Fields("reach")); 

W tym przykładzie urlToTweeters będzie przechowywać mapowanie adresów URL do Tweeterów, a zapytanie DRPC reach zdefiniowany w następnym wierszu (które odbywa się w Adresy URL jako argumenty) ostatecznie doprowadziłyby do zasięgu. Ale po drodze (oznaczonej inline) zobaczysz strumień głośników wysokotonowych każdego adresu URL, tj. Wynik zapytania na urlToTweeters.

+0

możesz pomóc w tym http://stackoverflow.com/questions/35445165/total-number-of-non -requated-words-in-each-tweet – user1

9

Mam nadzieję, że nigdy nie jest zbyt późno, aby odpowiedzieć, przynajmniej ktoś inny może znaleźć moją odpowiedź użyteczne :)

Więc topology.newStaticState() jest abstrakcją trójząb jest o przechowywaniu danych queryable. Parametrem dla newStaticState() powinna być implementacja - oparta na metodzie umowy - z storm.trident.state.StateFactory. Fabryka z kolei powinna wdrożyć metodę zwracającą instancję o numerze storm.trident.state.State. Jeśli jednak planujesz wysłać zapytanie do swojego stanu, powinieneś zwrócić istote z storm.trident.state.map.ReadOnlyMapState, ponieważ zwykły storm.trident.state.State nie ma metod sprawdzania prawdziwego źródła danych (otrzymasz wyjątek od klasy, jeśli spróbujesz użyć czegokolwiek poza ReadOnlyMapState).

Więc, spróbujmy!

Manekin realizacja stan:

public static class ExampleStaticState implements ReadOnlyMapState<String> { 

    private final Map<String, String> dataSourceStub; 

    public ExampleStaticState() { 
     dataSourceStub = new HashMap<>(); 
     dataSourceStub.put("tuple-00", "Trident"); 
     dataSourceStub.put("tuple-01", "definitely"); 
     dataSourceStub.put("tuple-02", "lacks"); 
     dataSourceStub.put("tuple-03", "documentation"); 
    } 

    @Override 
    public List<String> multiGet(List<List<Object>> keys) { 

     System.out.println("DEBUG: MultiGet, keys is " + keys); 

     List<String> result = new ArrayList<>(); 

     for (List<Object> inputTuple : keys) { 
      result.add(dataSourceStub.get(inputTuple.get(0))); 
     } 

     return result; 
    } 

    @Override 
    public void beginCommit(Long txid) { 
     // never gets executed... 
     System.out.println("DEBUG: Begin commit, txid=" + txid); 
    } 

    @Override 
    public void commit(Long txid) { 
     // never gets executed... 
     System.out.println("DEBUG: Commit, txid=" + txid); 
    } 
} 

Fabryka:

public static class ExampleStaticStateFactory implements StateFactory { 
    @Override 
    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { 
     return new ExampleStaticState(); 
    } 
} 

Prosty psvm (aka public static void main):

public static void main(String... args) { 
    TridentTopology tridentTopology = new TridentTopology(); 
    FeederBatchSpout spout = new FeederBatchSpout(Arrays.asList(new String[]{ 
      "foo" 
    })); 
    TridentState state = tridentTopology.newStaticState(new ExampleStaticStateFactory()); 
    tridentTopology 
      .newStream("spout", spout) 
      .stateQuery(state, new Fields("foo"), new MapGet(), new Fields("bar")) 
      .each(new Fields("foo", "bar"), new Debug()) 
      ; 

    Config conf = new Config(); 
    conf.setNumWorkers(6); 

    LocalCluster localCluster = new LocalCluster(); 
    localCluster.submitTopology("tridentTopology", conf, tridentTopology.build()); 

    spout.feed(Arrays.asList(new Values[]{ 
      new Values("tuple-00"), 
      new Values("tuple-01"), 
      new Values("tuple-02"), 
      new Values("tuple-03") 
    })); 

    localCluster.shutdown(); 
} 

I wreszcie, wyjście:

DEBUG: MultiGet, keys is [[tuple-00], [tuple-01], [tuple-02], [tuple-03]] 
DEBUG: [tuple-00, Trident] 
DEBUG: [tuple-01, definitely] 
DEBUG: [tuple-02, lacks] 
DEBUG: [tuple-03, documentation] 

Widzisz, stateQuery() pobiera wartości z wsadu wejściowego i odwzorowuje je na wartości znalezione w "przechowywaniu danych".

nurkowanie nieco głębiej, można spojrzeć na źródła MapGet klasy (facet, którego wystąpienie jest używany do odpytywania wewnątrz topologii) i znaleźć tam następujące:

public class MapGet extends BaseQueryFunction<ReadOnlyMapState, Object> { 
    @Override 
    public List<Object> batchRetrieve(ReadOnlyMapState map, List<TridentTuple> keys) { 
     return map.multiGet((List) keys); 
    }  

    @Override 
    public void execute(TridentTuple tuple, Object result, TridentCollector collector) { 
     collector.emit(new Values(result)); 
    }  
} 

to pod maską po prostu wywołuje metodę multiGet() implementacji, a następnie emituje wartości znalezione w pamięci danych, dodając je do już istniejącej krotki.Możesz (chociaż nie jest to najlepsze, co możesz zrobić) stworzyć własną implementację BaseQueryFunction<ReadOnlyMapState, Object>, wykonując coś bardziej skomplikowanego.

+1

Dzięki ... nigdy nie jest za późno, jeśli chodzi o naukę ... – Ezhil

+0

możesz pomóc w tym http://stackoverflow.com/questions/35445165/total-number-of -nie powtarzające się słowa w każdym tweecie – user1