java ShellExecutor

package org.rx.service;

import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FilenameUtils;
import org.rx.core.exception.InvalidException;
import org.rx.core.Strings;
import org.rx.core.Tasks;
import org.rx.core.ThreadPool;
import org.rx.core.util.Utils;

import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;

import static org.rx.core.Contract.quietly;
import static org.rx.core.Contract.require;

@Slf4j
public class ShellExecutor {
    @Getter
    private final String shell;
    private File workspace;
    //0 stopped, 1 started, 2 restarting
    private final AtomicInteger status = new AtomicInteger();
    private BiConsumer<Integer, String> streamHandler;
    private volatile Process process;
    private volatile CompletableFuture<Void> future;

    public ShellExecutor(String shell) {
        this(shell, null);
    }

    public ShellExecutor(String shell, String workspace) {
        require(shell);

        this.shell = shell;
        if (Strings.isNullOrEmpty(workspace)) {
            this.workspace = new File(FilenameUtils.getFullPathNoEndSeparator(shell));
            return;
        }
        this.workspace = new File(workspace);
    }

    private String getLogId() {
        return String.format("Shell pid=%s
%s", process.pid(), shell);
    }

    public boolean isRunning() {
        return process != null && process.isAlive();
    }

    public ShellExecutor start() {
        return start(null);
    }

    @SneakyThrows
    public synchronized ShellExecutor start(BiConsumer<Integer, String> outHandler) {
        if (isRunning()) {
            throw new InvalidException("already started");
        }

        streamHandler = outHandler;
        process = Runtime.getRuntime().exec(shell, null, workspace);
        future = Tasks.run(() -> {
            status.incrementAndGet();
            log.info("start {}", getLogId());
            try (LineNumberReader reader = new LineNumberReader(new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8))
//                 ; LineNumberReader error = new LineNumberReader(new InputStreamReader(process.getErrorStream(), StandardCharsets.UTF_8))
            ) {
                while (isRunning()) {
                    handleIn("output", reader);
//                    handleIn("error", error);
                    Thread.sleep(200);
                }
            } finally {
                log.info("self exit={} {}", status, getLogId());
//                Utils.killProcess(process.pid(),true);
            }
        });
        return this;
    }

    @SneakyThrows
    private void handleIn(String prefix, LineNumberReader reader) {
        String line = reader.readLine();
        if (Strings.isNullOrEmpty(line)) {
            return;
        }
        log.info("{} {}
	{}", prefix, getLogId(), line);
        if (streamHandler != null) {
            quietly(() -> streamHandler.accept(reader.getLineNumber(), line));
        }
    }

    @SneakyThrows
    public boolean waitExit(int timeoutSeconds) {
        return isRunning() && process.waitFor(timeoutSeconds, TimeUnit.SECONDS);
    }

    @SneakyThrows
    public void restart() {
        try {
            Tasks.run(() -> {
                log.info("restart {}", getLogId());
                synchronized (this) {
                    stop();
                    start(streamHandler);
                }
            }, shell, ThreadPool.ExecuteFlag.Single).get();
        } catch (InterruptedException e) {
            log.warn("Ignore {}", e.getMessage());
        }
    }

    public synchronized void stop() {
        boolean ok = Utils.killProcess(process.pid(), true);
        //sleep 问题
        future.cancel(true);
        status.set(0);
        log.info("stop kill={} {}", ok, getLogId());
    }
}
原文地址:https://www.cnblogs.com/Googler/p/14172515.html