Robię PoC używając apache ignite. Oto scenariusz, który testuję:Problem z włączeniem Apache Ignite w/Custom CacheStoreAdapter
- Uruchom klaster 3 węzłów i klienta.
- Zadzwoń po klucz. Loguję węzeł, który przechowuje ten klucz .
- Zadzwoń po klucz. Weryfikuję, że dostaje się wartość przechowywaną.
- Wykonaj loadCache(). Wszystkie węzły raportują pomyślnie Ładowanie pamięci podręcznej.
- węzeł Zabij że pierwotnie załadowane kluczową
- węzeł Restart, że po prostu zabić.
- Zapytanie o klucz ponownie.
Kroki 6 i 7 mają pewne problemy. Jeśli poczekam na wystarczająco długo między tymi dwoma elementami wszystko działa tak, jak powinno. Jeśli jednak spróbujesz zrobić 6 i 7 zbyt blisko siebie, otrzymam this error on the client i this error on the node.
Widzę błąd IgniteClientDisconnectedException: Failed to wait for topology update, client disconnected.
Czy istnieje sposób na uniknięcie tego problemu? Ustawienie dłuższego czasu oczekiwania na aktualizację topologii nie jest tak naprawdę opcją, ponieważ klient może próbować się połączyć w dowolnym momencie. Czy ma to związek z moją konfiguracją klastra? Widziałem this documentation, co sugeruje nieskończoną próbę nawiązania połączenia, co wydaje się po prostu nie mieć końca.
Musimy również mieć możliwość dynamicznego powiększania/zmniejszania klastra. czy to możliwe? Czy posiadanie w kopiach zapasowych pamięci poprawi funkcjonalność?
Uwaga, jeśli pominę krok 6, nie widziałem, żeby to się nie udawało.
Cluster Node Config
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!--<import resource="./cache.xml"/>-->
<bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="peerClassLoadingEnabled" value="true"/>
<property name="cacheConfiguration">
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<!-- Set a cache name. -->
<property name="name" value="recordData"/>
<!--<property name="rebalanceMode" value="SYNC"/>-->
<!-- Set cache mode. -->
<property name="cacheMode" value="PARTITIONED"/>
<property name="cacheStoreFactory">
<bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
<constructor-arg value="Application.RecordDataStore"/>
</bean>
</property>
<property name="readThrough" value="true"/>
<property name="writeThrough" value="true"/>
</bean>
</property>
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<!-- Override local port. -->
<property name="localPort" value="8000"/>
</bean>
</property>
<property name="communicationSpi">
<bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
<!-- Override local port. -->
<property name="localPort" value="8100"/>
</bean>
</property>
</bean>
</beans>
Client Config
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<!-- Set to true to enable distributed class loading for examples, default is false. -->
<property name="peerClassLoadingEnabled" value="true"/>
<property name="clientMode" value="true"/>
<property name="cacheConfiguration">
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<!-- Set a cache name. -->
<property name="name" value="recordData"/>
<!--<property name="rebalanceMode" value="SYNC"/>-->
<!-- Set cache mode. -->
<property name="cacheMode" value="PARTITIONED"/>
<property name="cacheStoreFactory">
<bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
<constructor-arg value="com.digitaslbi.idiom.util.RecordDataStore"/>
</bean>
</property>
<property name="readThrough" value="true"/>
<property name="writeThrough" value="true"/>
</bean>
</property>
<!-- Enable task execution events for examples. -->
<property name="includeEventTypes">
<list>
<!--Task execution events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
<!--Cache events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
</list>
</property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!--
Ignite provides several options for automatic discovery that can be used
instead os static IP based discovery. For information on all options refer
to our documentation: http://apacheignite.readme.io/docs/cluster-config
-->
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>localhost:8000..8099</value>
<!--<value>127.0.0.1:47500..47509</value>-->
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>
Zastosowane metody CacheStoreAdaptor
public class RecordDataStore extends CacheStoreAdapter<Long, List<Record>> {
// This method is called whenever "get(...)" methods are called on IgniteCache.
@Override public List<Record> load(Long key) {
System.out.println("Load data for pel: " + key);
try {
CouchDbConnector db = RecordDataStore.getDb();
ViewQuery viewQuery = new ViewQuery().designDocId("_design/docs").viewName("all");
List<Record> list = db.queryView(viewQuery,Record.class);
HashMultimap<Long,Record> multimap = HashMultimap.create();
list.forEach(r -> {
multimap.put(r.getId(),r);
});
return new LinkedList<>(multimap.get(key));
} catch (MalformedURLException e) {
throw new CacheLoaderException("Failed to load values from cache store.", e);
}
}
....
@Override public void loadCache(IgniteBiInClosure<Long, List<Record>> clo, Object... args) {
if (args == null || args.length == 0 || args[0] == null) {
throw new CacheLoaderException("Expected entry count parameter is not provided.");
}
System.out.println("Loading Cache...");
final long entryCnt = (Long)args[0];
try{
CouchDbConnector db = RecordDataStore.getDb();
ViewQuery viewQuery = new ViewQuery().designDocId("_design/docs").viewName("all");
List<Record> list = db.queryView(viewQuery,Record.class);
HashMultimap<Long,Record> multimap = HashMultimap.create();
long count = 0;
for(Record r : list) {
multimap.put(r.getPel(),r);
count++;
if(count == entryCnt)
break;
}
multimap.keySet().forEach(key -> {
clo.apply(key,new LinkedList<>(multimap.get(key)));
});
}
catch (MalformedURLException e) {
throw new CacheLoaderException("Failed to load values from cache store.", e);
}
System.out.println("Loaded Cache");
}
public static CouchDbConnector getDb() throws MalformedURLException {
HttpClient httpClient = new StdHttpClient.Builder()
.url("server:1111/")
.build();
CouchDbInstance dbInstance = new StdCouchDbInstance(httpClient);
CouchDbConnector db = new StdCouchDbConnector("ignite", dbInstance);
return db;
}
}
Czy jesteś pewien, że węzły serwerowe rzeczywiście odkryć siebie? Czy możesz gdzieś przesłać dzienniki ze wszystkich węzłów? –
Zdarza się, zanim węzły odkryją się nawzajem i uruchomią. Przynajmniej dzieje się to, zanim pojawi się aktualizacja topologii w konsoli. Dziś dostanę dla nich dzienniki. –