2017-05-08 33 views
6

Staramy się uruchomić prosty potok rozprowadzany w klastrze roota w doku. Pracownicy luigi są wdrażani jako replikowane usługi dokowania. Zaczynają się pomyślnie i po kilku sekundach proszenia o pracę do luigi-server, zaczynają umierać, ponieważ nie przydzielono im żadnej pracy, a wszystkie zadania zostają przypisane do jednego pracownika.Pracownicy umierają wcześnie z powodu nierównomiernej dystrybucji pracy w Luigi (2.6.1)

Musieliśmy ustawić keep_alive = True w luigi.cfg naszych pracowników, aby zmusić ich do nieumierania, ale utrzymywanie pracowników po zakończeniu prac wydaje się złym pomysłem. Czy istnieje sposób kontrolowania dystrybucji pracy?

Nasz test rurociąg:

class RunAllTasks(luigi.Task): 

    tasks = luigi.IntParameter() 
    sleep_time = luigi.IntParameter() 

    def requires(self): 
     for i in range(self.tasks): 
      yield RunExampleTask(i, self.sleep_time) 

    def run(self): 
     with self.output().open('w') as f: 
      f.write('All done!') 

    def output(self): 
     return LocalTarget('/data/RunAllTasks.txt') 


class RunExampleTask(luigi.Task): 

    number = luigi.IntParameter() 
    sleep_time = luigi.IntParameter() 

    @property 
    def cmd(self): 
     return """ 
       docker run --rm --name example_{number} hello-world 
      """.format(number=self.number) 

    def run(self): 
     time.sleep(self.sleep_time) 
     logger.debug(self.cmd) 
     out = subprocess.check_output(self.cmd, stderr=subprocess.STDOUT, shell=True) 
     logger.debug(out) 
     with self.output().open('w') as f: 
      f.write(str(out)) 

    def output(self): 
     return LocalTarget('/data/{number}.txt'.format(number=self.number)) 


if __name__ == "__main__": 
    luigi.run() 
+0

Czy nie wydaje się być pytaniem w docku? – johnharris85

+0

Czy uruchamiasz 'RunAllTasks' na każdym węźle? – MattMcKnight

+0

@MattMcKnight Tak, hermetyzowaliśmy potok jako usługę dokowania, więc rój rozpoczyna replikę na każdym węźle (round robin). – fcisneros

Odpowiedz

1

Twój problem jest wynikiem yield ing jeden wymóg naraz, zamiast chcesz yield ich wszystkich naraz, co następuje:

def requires(self): 
    reqs = [] 
    for i in range(self.tasks): 
     reqs.append(RunExampleTask(i, self.sleep_time)) 
    yield reqs