2016-02-07 21 views
5

Próbuję buforować niektóre dane w ryglu burzowym, ale nie jestem pewien, czy to jest właściwy sposób, aby to zrobić, czy nie. W poniższym formularzu identyfikator pracownika i nazwa pracodawcy są przechowywane w pamięci podręcznej mapy skrótu. W tym celu wykonano wywołanie bazy danych do tabeli Employee, aby wybrać wszystkich pracowników i wypełnić mapę haszującą w metodzie przygotowania (czy jest to właściwe miejsce do zainicjowania mapy?).Buforowanie w burzach

Po pewnym zalogowaniu się okazuje (podczas pracy z topologią burzy), topologia tworzy wiele połączeń z bazą danych i inicjalizuje mapę wiele razy. Oczywiście chcę tego uniknąć, dlatego chcę buforować wynik, aby nie trafiał do bazy danych za każdym razem. Proszę pomóż?

public class TestBolt extends BaseRichBolt { 
    private static final long serialVersionUID = 2946379346389650348L; 
    private OutputCollector collector; 
    private Map<String, String> employeeIdToNameMap; 
    private static final Logger LOG = Logger.getLogger(TestBolt.class); 

    @Override 
    public void execute(Tuple tuple) {  
     String employeeId = tuple.getStringByField("employeeId"); 
     String employeeName = employeeIdToNameMap.get(employeeId); 

     collector.emit(tuple, new Values(employeeId, employeeName)); 
     collector.ack(tuple); 
    } 

    @Override 
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 
     // TODO Auto-generated method stub 
     this.collector = collector; 
     try { 
      employeeIdToNameMap = createEmployeIdToNameMap(); 
     } catch (SQLException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields(/*some fields*/)); 

    } 

    private Map<String, String> createEmployeIdToNameMap() throws SQLException { 
     final Map<String, String> employeeIdToNameMap = new HashMap<>(); 
     final DatabaseManager dbm = new PostgresManager(); 
     final String query = "select id, name from employee;"; 
     final Connection conn = dbm.createDefaultConnection(); 
     final ResultSet result = dbm.executeSelectQuery(conn, query); 
     while(result.next()) { 
      String employeId = result.getString("id"); 
      String name = result.getString("name"); 
      employeeIdToNameMap.put(employeId, name); 
     } 
     conn.close(); 
     return employeeIdToNameMap; 
    }  
} 

ROZWIĄZANIE stworzyłem zsynchronizowany mapę i jego pracy w porządku dla mnie

private static Map<String, String> employeeIdToNameMap = Collections 
      .synchronizedMap(new HashMap<String, String>()); 
+0

Kod wydaje się być prawidłowy. Twoje podejście do zainicjowania HashMap w prepare() jest absolutnie poprawne. O wielu połączeniach z bazą danych: Zakładam, że masz wiele instancji twojego rygla w twojej topologii (np. Parallelim_hint> 1). W ten sposób każde wystąpienie otworzy własne połączenie i jest to uzasadnione. –

+0

Mam wiele innych śrub obok tej śruby, która ma równoległość> 1, ale dla tego śruby równoległości jest 1 – big

Odpowiedz

1

Skoro masz wiele zadań śruby, można oznaczyć employeeIdToNameMap statyczne i zmienne. Zainicjuj mapę w taki sposób -

try { 
synchronized(TestBolt.class) { 
    if (null == employeeIdToNameMap) { 
    employeeIdToNameMap = createEmployeIdToNameMap(); 
    } 
    } 
} catch (SQLException e) { 
... 
}