Staramy się uruchomić prosty potok rozprowadzany w klastrze roota w doku. Pracownicy luigi są wdrażani jako replikowane usługi dokowania. Zaczynają się pomyślnie i po kilku sekundach proszenia o pracę do luigi-server, zaczynają umierać, ponieważ nie przydzielono im żadnej pracy, a wszystkie zadania zostają przypisane do jednego pracownika.Pracownicy umierają wcześnie z powodu nierównomiernej dystrybucji pracy w Luigi (2.6.1)
Musieliśmy ustawić keep_alive = True w luigi.cfg naszych pracowników, aby zmusić ich do nieumierania, ale utrzymywanie pracowników po zakończeniu prac wydaje się złym pomysłem. Czy istnieje sposób kontrolowania dystrybucji pracy?
Nasz test rurociąg:
class RunAllTasks(luigi.Task):
tasks = luigi.IntParameter()
sleep_time = luigi.IntParameter()
def requires(self):
for i in range(self.tasks):
yield RunExampleTask(i, self.sleep_time)
def run(self):
with self.output().open('w') as f:
f.write('All done!')
def output(self):
return LocalTarget('/data/RunAllTasks.txt')
class RunExampleTask(luigi.Task):
number = luigi.IntParameter()
sleep_time = luigi.IntParameter()
@property
def cmd(self):
return """
docker run --rm --name example_{number} hello-world
""".format(number=self.number)
def run(self):
time.sleep(self.sleep_time)
logger.debug(self.cmd)
out = subprocess.check_output(self.cmd, stderr=subprocess.STDOUT, shell=True)
logger.debug(out)
with self.output().open('w') as f:
f.write(str(out))
def output(self):
return LocalTarget('/data/{number}.txt'.format(number=self.number))
if __name__ == "__main__":
luigi.run()
Czy nie wydaje się być pytaniem w docku? – johnharris85
Czy uruchamiasz 'RunAllTasks' na każdym węźle? – MattMcKnight
@MattMcKnight Tak, hermetyzowaliśmy potok jako usługę dokowania, więc rój rozpoczyna replikę na każdym węźle (round robin). – fcisneros