Potrzebuję przekazywać wiadomości do procesów CLI PHP przez stdin z Java. Chciałbym utrzymać około 20 procesów PHP działających w puli, tak, że kiedy przekazuję wiadomość do puli, wysyła ona każdą wiadomość do osobnego wątku, zachowując kolejkę komunikatów do dostarczenia. Chciałbym, aby te procesy PHP pozostały żywe jak najdłużej, wychodząc z nowego, jeśli umrze. Spróbowałem zrobić to ze statyczną pulą wątków, ale wydaje się bardziej zaprojektowane do zadań, które wykonują i po prostu umierają. Jak mogłem to zrobić za pomocą prostego interfejsu, aby przekazać wiadomość do puli? Czy będę musiał wdrożyć własną "pulę wątków"?ThreadPool procesów CLI
Odpowiedz
Dostarczam kod z tym, ponieważ myślę, że to sprawi, że rzeczy będą wyraźniejsze. Zasadniczo musisz zachować pulę obiektów procesu wokół. Uważaj, że każdy z tych procesów ma strumień wejściowy, wyjściowy i błędu, którym musisz zarządzać w pewien sposób. W moim przykładzie przekierowuję błąd i wynik na główną konsolę procesów. Możesz skonfigurować wywołania zwrotne i procedury obsługi w celu uzyskania danych wyjściowych programu PHP, jeśli to konieczne. Jeśli tylko przetwarzasz zadania i nie obchodzi cię, co mówi PHP, pozostaw to tak, jak jest, lub przekieruj do pliku.
Używam biblioteki Apache Commons Pool dla ObjectPool. Nie trzeba wymyślać jednego.
Będziesz mieć pulę 20 procesów, które uruchamiają twój program PHP. Samo to nie zapewni ci tego, czego potrzebujesz. Możesz przetwarzać zadania przeciwko wszystkim 20 tym procesom "w tym samym czasie". Będziesz także potrzebował ThreadPool, który pobierze proces z twojego ObjectPool.
Musisz również zrozumieć, że jeśli zabijesz, lub CTRL-C proces Java, proces init
przejmie twoje procesy php i po prostu tam usiądą. Prawdopodobnie będziesz chciał zachować dziennik wszystkich pidów procesów PHP, które spawnujesz, a następnie wyczyścić je, jeśli ponownie uruchomisz program Java.
public class StackOverflow_10037379 {
private static Logger sLogger = Logger.getLogger(StackOverflow_10037379.class.getName());
public static class CLIPoolableObjectFactory extends BasePoolableObjectFactory<Process> {
private String mProcessToRun;
public CLIPoolableObjectFactory(String processToRun) {
mProcessToRun = processToRun;
}
@Override
public Process makeObject() throws Exception {
ProcessBuilder builder = new ProcessBuilder();
builder.redirectError(Redirect.INHERIT);
// I am being lazy, but really the InputStream is where
// you can get any output of the PHP Process. This setting
// will make it output to the current processes console.
builder.redirectOutput(Redirect.INHERIT);
builder.redirectInput(Redirect.PIPE);
builder.command(mProcessToRun);
return builder.start();
}
@Override
public boolean validateObject(Process process) {
try {
process.exitValue();
return false;
} catch (IllegalThreadStateException ex) {
return true;
}
}
@Override
public void destroyObject(Process process) throws Exception {
// If PHP has a way to stop it, do that instead of destroy
process.destroy();
}
@Override
public void passivateObject(Process process) throws Exception {
// Should really try to read from the InputStream of the Process
// to prevent lock-ups if Rediret.INHERIT is not used.
}
}
public static class CLIWorkItem implements Runnable {
private ObjectPool<Process> mPool;
private String mWork;
public CLIWorkItem(ObjectPool<Process> pool, String work) {
mPool = pool;
mWork = work;
}
@Override
public void run() {
Process workProcess = null;
try {
workProcess = mPool.borrowObject();
OutputStream os = workProcess.getOutputStream();
os.write(mWork.getBytes(Charset.forName("UTF-8")));
os.flush();
// Because of the INHERIT rule with the output stream
// the console stream overwrites itself. REMOVE THIS in production.
Thread.sleep(100);
} catch (Exception ex) {
sLogger.log(Level.SEVERE, null, ex);
} finally {
if (workProcess != null) {
try {
// Seriously.. so many exceptions.
mPool.returnObject(workProcess);
} catch (Exception ex) {
sLogger.log(Level.SEVERE, null, ex);
}
}
}
}
}
public static void main(String[] args) throws Exception {
// Change the 5 to 20 in your case.
// Also change mock_php.exe to /usr/bin/php or wherever.
ObjectPool<Process> pool =
new GenericObjectPool<>(
new CLIPoolableObjectFactory("mock_php.exe"), 5);
// This will only allow you to queue 100 work items at a time. I would suspect
// that if you only want 20 PHP processes running at a time and this queue
// filled up you'll need to implement some other strategy as you are doing
// more work than PHP can keep up with. You'll need to block at some point
// or throw work away.
BlockingQueue<Runnable> queue =
new ArrayBlockingQueue<>(100, true);
ThreadPoolExecutor executor =
new ThreadPoolExecutor(20, 20, 1, TimeUnit.HOURS, queue);
// print some stuff out.
executor.execute(new CLIWorkItem(pool, "Message 1\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 2\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 3\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 4\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 5\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 6\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 7\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 8\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 9\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 10\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 11\r\n"));
executor.shutdown();
executor.awaitTermination(4000, TimeUnit.HOURS);
pool.close();
}
}
Wyjście przebiegu programu:
12172 - Message 2
10568 - Message 1
4804 - Message 3
11916 - Message 4
11116 - Message 5
12172 - Message 6
4804 - Message 7
10568 - Message 8
11916 - Message 9
11116 - Message 10
12172 - Message 11
Kodeks C Program ++ po prostu wyjście co było wejście:
#include <windows.h>
#include <iostream>
#include <string>
int main(int argc, char* argv[])
{
DWORD pid = GetCurrentProcessId();
std::string line;
while (true) {
std::getline (std::cin, line);
std::cout << pid << " - " << line << std::endl;
}
return 0;
}
Aktualizacja
Przepraszamy za opóźnienie. Oto wersja JDK 6 dla wszystkich zainteresowanych. Będziesz musiał uruchomić oddzielny wątek, aby odczytać wszystkie dane wejściowe z InputStream procesu. Ustawiłem ten kod, aby odradzał się nowy wątek obok każdego nowego procesu. Ten wątek zawsze czyta się z procesu tak długo, jak długo żyje. Zamiast wysyłania bezpośrednio do pliku, ustawiłem go tak, aby korzystał ze struktury Logging. W ten sposób można skonfigurować rejestrowanie, aby przejść do pliku, przewrócić, przejść do konsoli itp. Bez konieczności zakodowania pliku.
Zauważysz, że uruchamiam tylko jednego Gobblera dla każdego procesu, nawet jeśli proces ma standardowe wyjście i stderr. Przekierowuję stderr na stdout tylko po to, aby ułatwić. Wygląda na to, że jdk6 obsługuje tylko ten typ przekierowania.
public class StackOverflow_10037379_jdk6 {
private static Logger sLogger = Logger.getLogger(StackOverflow_10037379_jdk6.class.getName());
// Shamelessy taken from Google and modified.
// I don't know who the original Author is.
public static class StreamGobbler extends Thread {
InputStream is;
Logger logger;
Level level;
StreamGobbler(String logName, Level level, InputStream is) {
this.is = is;
this.logger = Logger.getLogger(logName);
this.level = level;
}
public void run() {
try {
InputStreamReader isr = new InputStreamReader(is);
BufferedReader br = new BufferedReader(isr);
String line = null;
while ((line = br.readLine()) != null) {
logger.log(level, line);
}
} catch (IOException ex) {
logger.log(Level.SEVERE, "Failed to read from Process.", ex);
}
logger.log(
Level.INFO,
String.format("Exiting Gobbler for %s.", logger.getName()));
}
}
public static class CLIPoolableObjectFactory extends BasePoolableObjectFactory<Process> {
private String mProcessToRun;
public CLIPoolableObjectFactory(String processToRun) {
mProcessToRun = processToRun;
}
@Override
public Process makeObject() throws Exception {
ProcessBuilder builder = new ProcessBuilder();
builder.redirectErrorStream(true);
builder.command(mProcessToRun);
Process process = builder.start();
StreamGobbler loggingGobbler =
new StreamGobbler(
String.format("process.%s", process.hashCode()),
Level.INFO,
process.getInputStream());
loggingGobbler.start();
return process;
}
@Override
public boolean validateObject(Process process) {
try {
process.exitValue();
return false;
} catch (IllegalThreadStateException ex) {
return true;
}
}
@Override
public void destroyObject(Process process) throws Exception {
// If PHP has a way to stop it, do that instead of destroy
process.destroy();
}
@Override
public void passivateObject(Process process) throws Exception {
// Should really try to read from the InputStream of the Process
// to prevent lock-ups if Rediret.INHERIT is not used.
}
}
public static class CLIWorkItem implements Runnable {
private ObjectPool<Process> mPool;
private String mWork;
public CLIWorkItem(ObjectPool<Process> pool, String work) {
mPool = pool;
mWork = work;
}
@Override
public void run() {
Process workProcess = null;
try {
workProcess = mPool.borrowObject();
OutputStream os = workProcess.getOutputStream();
os.write(mWork.getBytes(Charset.forName("UTF-8")));
os.flush();
// Because of the INHERIT rule with the output stream
// the console stream overwrites itself. REMOVE THIS in production.
Thread.sleep(100);
} catch (Exception ex) {
sLogger.log(Level.SEVERE, null, ex);
} finally {
if (workProcess != null) {
try {
// Seriously.. so many exceptions.
mPool.returnObject(workProcess);
} catch (Exception ex) {
sLogger.log(Level.SEVERE, null, ex);
}
}
}
}
}
public static void main(String[] args) throws Exception {
// Change the 5 to 20 in your case.
ObjectPool<Process> pool =
new GenericObjectPool<Process>(
new CLIPoolableObjectFactory("mock_php.exe"), 5);
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(100, true);
ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 20, 1, TimeUnit.HOURS, queue);
// print some stuff out.
executor.execute(new CLIWorkItem(pool, "Message 1\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 2\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 3\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 4\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 5\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 6\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 7\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 8\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 9\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 10\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 11\r\n"));
executor.shutdown();
executor.awaitTermination(4000, TimeUnit.HOURS);
pool.close();
}
}
Wyjście
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 9440 - Message 3
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 8776 - Message 2
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 6100 - Message 1
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 10096 - Message 4
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 8868 - Message 5
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 8868 - Message 8
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 6100 - Message 10
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 8776 - Message 9
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 10096 - Message 6
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 9440 - Message 7
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 6100 - Message 11
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: Exiting Gobbler for process.295131993.
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: Exiting Gobbler for process.756434719.
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: Exiting Gobbler for process.332711452.
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: Exiting Gobbler for process.1981440623.
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: Exiting Gobbler for process.1043636732.
Wow, dzięki za dokładną odpowiedź! Na tej podstawie wykonuję testową implementację. Naprawdę to doceniam. – Will
Więc jestem na Java6 i nie mam przekierowania. Jak mogę zapobiec blokowaniu się stdout/stderr procesu? W moim normalnym przypadku użycia będę chciał napisać do procesu i przekierować stdout/stderr do oddzielnych logów (bez blokowania). – Will
@Will Zaktualizowany o wersję jdk6. –
Najlepiej tutaj użyć funkcji pcntl do rozwidlenia procesu, ale komunikacja między procesami jest trudna. Polecam tworzenie kolejki, z której mogą odczytać twoje procesy, zamiast próbować przekazywać wiadomości do wiersza poleceń.
Beanstalk ma kilka klientów PHP, których można użyć do obsługi wiadomości między procesami.
Przepraszamy, może moje pytanie było niejasne - zmienię. To jest pytanie Java. Chcę wątku z java z długimi procesami cli (/ usr/bin/php w tym przypadku). Muszę mieć możliwość przesłania czegoś do puli, która zostanie zapisana na stdin w jednym z procesów CLI. – Will
Bardzo podobny do tego pytania: http://stackoverflow.com/questions/2592093/php-thread-pool –
I istnieje wyjście z PHP taka, że wiedzieć, kiedy jest przetwarzane? – Clint
To nigdy nie będzie przetwarzane. Jeśli ktoś umrze, muszę odrodzić nowy, aby go zastąpić. Będę przekazywał im dane w trybie round-robin przez stdin. – Will