2016-08-03 45 views
8

Robię PoC używając apache ignite. Oto scenariusz, który testuję:Problem z włączeniem Apache Ignite w/Custom CacheStoreAdapter

  1. Uruchom klaster 3 węzłów i klienta.
  2. Zadzwoń po klucz. Loguję węzeł, który przechowuje ten klucz .
  3. Zadzwoń po klucz. Weryfikuję, że dostaje się wartość przechowywaną.
  4. Wykonaj loadCache(). Wszystkie węzły raportują pomyślnie Ładowanie pamięci podręcznej.
  5. węzeł Zabij że pierwotnie załadowane kluczową
  6. węzeł Restart, że po prostu zabić.
  7. Zapytanie o klucz ponownie.

Kroki 6 i 7 mają pewne problemy. Jeśli poczekam na wystarczająco długo między tymi dwoma elementami wszystko działa tak, jak powinno. Jeśli jednak spróbujesz zrobić 6 i 7 zbyt blisko siebie, otrzymam this error on the client i this error on the node.

Widzę błąd IgniteClientDisconnectedException: Failed to wait for topology update, client disconnected. Czy istnieje sposób na uniknięcie tego problemu? Ustawienie dłuższego czasu oczekiwania na aktualizację topologii nie jest tak naprawdę opcją, ponieważ klient może próbować się połączyć w dowolnym momencie. Czy ma to związek z moją konfiguracją klastra? Widziałem this documentation, co sugeruje nieskończoną próbę nawiązania połączenia, co wydaje się po prostu nie mieć końca.

Musimy również mieć możliwość dynamicznego powiększania/zmniejszania klastra. czy to możliwe? Czy posiadanie w kopiach zapasowych pamięci poprawi funkcjonalność?

Uwaga, jeśli pominę krok 6, nie widziałem, żeby to się nie udawało.

Cluster Node Config

<beans xmlns="http://www.springframework.org/schema/beans" 
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xsi:schemaLocation=" 
     http://www.springframework.org/schema/beans 
     http://www.springframework.org/schema/beans/spring-beans.xsd"> 
    <!--<import resource="./cache.xml"/>--> 
    <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> 
     <property name="peerClassLoadingEnabled" value="true"/> 

     <property name="cacheConfiguration"> 
      <bean class="org.apache.ignite.configuration.CacheConfiguration"> 
       <!-- Set a cache name. --> 
       <property name="name" value="recordData"/> 
       <!--<property name="rebalanceMode" value="SYNC"/>--> 
       <!-- Set cache mode. --> 
       <property name="cacheMode" value="PARTITIONED"/> 

       <property name="cacheStoreFactory"> 
        <bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf"> 
         <constructor-arg value="Application.RecordDataStore"/> 
        </bean> 
       </property> 
       <property name="readThrough" value="true"/> 
       <property name="writeThrough" value="true"/> 

      </bean> 
     </property> 

     <property name="discoverySpi"> 
      <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> 
       <!-- Override local port. --> 
       <property name="localPort" value="8000"/> 
      </bean> 
     </property> 

     <property name="communicationSpi"> 
      <bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi"> 
       <!-- Override local port. --> 
       <property name="localPort" value="8100"/> 
      </bean> 
     </property> 
    </bean> 
</beans> 

Client Config

<beans xmlns="http://www.springframework.org/schema/beans" 
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xmlns:util="http://www.springframework.org/schema/util" 
     xsi:schemaLocation=" 
     http://www.springframework.org/schema/beans 
     http://www.springframework.org/schema/beans/spring-beans.xsd 
     http://www.springframework.org/schema/util 
     http://www.springframework.org/schema/util/spring-util.xsd"> 
    <bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> 
     <!-- Set to true to enable distributed class loading for examples, default is false. --> 
     <property name="peerClassLoadingEnabled" value="true"/> 
     <property name="clientMode" value="true"/> 

     <property name="cacheConfiguration"> 
      <bean class="org.apache.ignite.configuration.CacheConfiguration"> 
       <!-- Set a cache name. --> 
       <property name="name" value="recordData"/> 
       <!--<property name="rebalanceMode" value="SYNC"/>--> 

       <!-- Set cache mode. --> 
       <property name="cacheMode" value="PARTITIONED"/> 

       <property name="cacheStoreFactory"> 
        <bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf"> 
         <constructor-arg value="com.digitaslbi.idiom.util.RecordDataStore"/> 
        </bean> 
       </property> 
       <property name="readThrough" value="true"/> 
       <property name="writeThrough" value="true"/> 

      </bean> 
     </property> 

     <!-- Enable task execution events for examples. --> 
     <property name="includeEventTypes"> 
      <list> 
       <!--Task execution events--> 
       <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/> 
       <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/> 
       <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/> 
       <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/> 
       <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/> 
       <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/> 
       <!--Cache events--> 
       <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/> 
       <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/> 
       <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/> 
      </list> 
     </property> 

     <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --> 
     <property name="discoverySpi"> 
      <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> 
       <property name="ipFinder"> 
        <!-- 
         Ignite provides several options for automatic discovery that can be used 
         instead os static IP based discovery. For information on all options refer 
         to our documentation: http://apacheignite.readme.io/docs/cluster-config 
        --> 
        <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> 
        <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> 
        <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"> 
         <property name="addresses"> 
          <list> 
           <!-- In distributed environment, replace with actual host IP address. --> 
           <value>localhost:8000..8099</value> 
           <!--<value>127.0.0.1:47500..47509</value>--> 
          </list> 
         </property> 
        </bean> 
       </property> 
      </bean> 
     </property> 
    </bean> 
</beans> 

Zastosowane metody CacheStoreAdaptor

public class RecordDataStore extends CacheStoreAdapter<Long, List<Record>> { 

    // This method is called whenever "get(...)" methods are called on IgniteCache. 
    @Override public List<Record> load(Long key) { 
     System.out.println("Load data for pel: " + key); 
     try { 
      CouchDbConnector db = RecordDataStore.getDb(); 
      ViewQuery viewQuery = new ViewQuery().designDocId("_design/docs").viewName("all"); 
      List<Record> list = db.queryView(viewQuery,Record.class); 
      HashMultimap<Long,Record> multimap = HashMultimap.create(); 

      list.forEach(r -> { 
       multimap.put(r.getId(),r); 
      }); 
      return new LinkedList<>(multimap.get(key)); 
     } catch (MalformedURLException e) { 
      throw new CacheLoaderException("Failed to load values from cache store.", e); 
     } 
    } 
    .... 
    @Override public void loadCache(IgniteBiInClosure<Long, List<Record>> clo, Object... args) { 
     if (args == null || args.length == 0 || args[0] == null) { 
      throw new CacheLoaderException("Expected entry count parameter is not provided."); 
     } 

     System.out.println("Loading Cache..."); 
     final long entryCnt = (Long)args[0]; 

     try{ 
      CouchDbConnector db = RecordDataStore.getDb(); 
      ViewQuery viewQuery = new ViewQuery().designDocId("_design/docs").viewName("all"); 
      List<Record> list = db.queryView(viewQuery,Record.class); 
      HashMultimap<Long,Record> multimap = HashMultimap.create(); 

      long count = 0; 
      for(Record r : list) { 
       multimap.put(r.getPel(),r); 
       count++; 
       if(count == entryCnt) 
        break; 
      } 

      multimap.keySet().forEach(key -> { 
       clo.apply(key,new LinkedList<>(multimap.get(key))); 
      }); 
     } 
     catch (MalformedURLException e) { 
      throw new CacheLoaderException("Failed to load values from cache store.", e); 
     } 

     System.out.println("Loaded Cache"); 
    } 

    public static CouchDbConnector getDb() throws MalformedURLException { 
     HttpClient httpClient = new StdHttpClient.Builder() 
      .url("server:1111/") 
      .build(); 

     CouchDbInstance dbInstance = new StdCouchDbInstance(httpClient); 
     CouchDbConnector db = new StdCouchDbConnector("ignite", dbInstance); 

     return db; 
    } 
} 
+0

Czy jesteś pewien, że węzły serwerowe rzeczywiście odkryć siebie? Czy możesz gdzieś przesłać dzienniki ze wszystkich węzłów? –

+0

Zdarza się, zanim węzły odkryją się nawzajem i uruchomią. Przynajmniej dzieje się to, zanim pojawi się aktualizacja topologii w konsoli. Dziś dostanę dla nich dzienniki. –

Odpowiedz

1

http://apache-ignite-users.70518.x6.nabble.com/Ignite-cluster-recovery-after-network-partition-td2775.html Podkreśla, że ​​IgniteClientDisconnectedException zapewnia IgniteFuture że można uzyskać dzwoniąc pod numer

IgniteFuture f = myException.reconnectFuture(); 

że przyszłość ma get() -method, która czeka na węzeł ponownie połączyć:

Synchronicznie czeka na zakończeniu obliczeń i zwraca wynik obliczeń.

Zatem następujące powinny zakończyć, gdy klient został ponownie podłączony:

f.get(); 
+0

Przepraszam, że przegapiłem odpowiedź na twoją odpowiedź PRZED wydaniem nagrody. Dzięki! –

+0

Nie ma za co. –