28 友盟大数据--flume源码查看分析- ExecSource-参照主机名拦截器HostInterceptor ---写限速拦截器

  1 //
  2 // Source code recreated from a .class file by IntelliJ IDEA
  3 // (powered by Fernflower decompiler)
  4 //
  5 
  6 package org.apache.flume.source;
  7 
  8 import com.google.common.base.Preconditions;
  9 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 10 import java.io.BufferedReader;
 11 import java.io.IOException;
 12 import java.io.InputStreamReader;
 13 import java.nio.charset.Charset;
 14 import java.util.ArrayList;
 15 import java.util.List;
 16 import java.util.concurrent.ExecutorService;
 17 import java.util.concurrent.Executors;
 18 import java.util.concurrent.Future;
 19 import java.util.concurrent.ScheduledExecutorService;
 20 import java.util.concurrent.ScheduledFuture;
 21 import java.util.concurrent.TimeUnit;
 22 import org.apache.flume.Context;
 23 import org.apache.flume.Event;
 24 import org.apache.flume.EventDrivenSource;
 25 import org.apache.flume.SystemClock;
 26 import org.apache.flume.channel.ChannelProcessor;
 27 import org.apache.flume.conf.Configurable;
 28 import org.apache.flume.event.EventBuilder;
 29 import org.apache.flume.instrumentation.SourceCounter;
 30 import org.slf4j.Logger;
 31 import org.slf4j.LoggerFactory;
 32 
 33 public class ExecSource extends AbstractSource implements EventDrivenSource, Configurable {
 34     private static final Logger logger = LoggerFactory.getLogger(ExecSource.class);
 35     private String shell;
 36     private String command;
 37     private SourceCounter sourceCounter;
 38     private ExecutorService executor;
 39     private Future<?> runnerFuture;
 40     private long restartThrottle;
 41     private boolean restart;
 42     private boolean logStderr;
 43     private Integer bufferCount;
 44     private long batchTimeout;
 45     private ExecSource.ExecRunnable runner;
 46     private Charset charset;
 47 
 48     public ExecSource() {
 49     }
 50 
 51     public void start() {
 52         logger.info("Exec source starting with command:{}", this.command);
 53         this.executor = Executors.newSingleThreadExecutor();
 54         this.runner = new ExecSource.ExecRunnable(this.shell, this.command, this.getChannelProcessor(), this.sourceCounter, this.restart, this.restartThrottle, this.logStderr, this.bufferCount, this.batchTimeout, this.charset);
 55         this.runnerFuture = this.executor.submit(this.runner);
 56         this.sourceCounter.start();
 57         super.start();
 58         logger.debug("Exec source started");
 59     }
 60 
 61     public void stop() {
 62         logger.info("Stopping exec source with command:{}", this.command);
 63         if (this.runner != null) {
 64             this.runner.setRestart(false);
 65             this.runner.kill();
 66         }
 67 
 68         if (this.runnerFuture != null) {
 69             logger.debug("Stopping exec runner");
 70             this.runnerFuture.cancel(true);
 71             logger.debug("Exec runner stopped");
 72         }
 73 
 74         this.executor.shutdown();
 75 
 76         while(!this.executor.isTerminated()) {
 77             logger.debug("Waiting for exec executor service to stop");
 78 
 79             try {
 80                 this.executor.awaitTermination(500L, TimeUnit.MILLISECONDS);
 81             } catch (InterruptedException var2) {
 82                 logger.debug("Interrupted while waiting for exec executor service to stop. Just exiting.");
 83                 Thread.currentThread().interrupt();
 84             }
 85         }
 86 
 87         this.sourceCounter.stop();
 88         super.stop();
 89         logger.debug("Exec source with command:{} stopped. Metrics:{}", this.command, this.sourceCounter);
 90     }
 91 
 92     public void configure(Context context) {
 93         this.command = context.getString("command");
 94         Preconditions.checkState(this.command != null, "The parameter command must be specified");
 95         this.restartThrottle = context.getLong("restartThrottle", 10000L);
 96         this.restart = context.getBoolean("restart", false);
 97         this.logStderr = context.getBoolean("logStdErr", false);
 98         this.bufferCount = context.getInteger("batchSize", 20);
 99         this.batchTimeout = context.getLong("batchTimeout", 3000L);
100         this.charset = Charset.forName(context.getString("charset", "UTF-8"));
101         this.shell = context.getString("shell", (String)null);
102         if (this.sourceCounter == null) {
103             this.sourceCounter = new SourceCounter(this.getName());
104         }
105 
106     }
107 
108     private static class StderrReader extends Thread {
109         private BufferedReader input;
110         private boolean logStderr;
111 
112         protected StderrReader(BufferedReader input, boolean logStderr) {
113             this.input = input;
114             this.logStderr = logStderr;
115         }
116 
117         public void run() {
118             try {
119                 int i = 0;
120                 String line = null;
121 
122                 while((line = this.input.readLine()) != null) {
123                     if (this.logStderr) {
124                         Logger var10000 = ExecSource.logger;
125                         ++i;
126                         var10000.info("StderrLogger[{}] = '{}'", i, line);
127                     }
128                 }
129             } catch (IOException var11) {
130                 ExecSource.logger.info("StderrLogger exiting", var11);
131             } finally {
132                 try {
133                     if (this.input != null) {
134                         this.input.close();
135                     }
136                 } catch (IOException var10) {
137                     ExecSource.logger.error("Failed to close stderr reader for exec source", var10);
138                 }
139 
140             }
141 
142         }
143     }
144 
145     private static class ExecRunnable implements Runnable {
146         private final String shell;
147         private final String command;
148         private final ChannelProcessor channelProcessor;
149         private final SourceCounter sourceCounter;
150         private volatile boolean restart;
151         private final long restartThrottle;
152         private final int bufferCount;
153         private long batchTimeout;
154         private final boolean logStderr;
155         private final Charset charset;
156         private Process process = null;
157         private SystemClock systemClock = new SystemClock();
158         private Long lastPushToChannel;
159         ScheduledExecutorService timedFlushService;
160         ScheduledFuture<?> future;
161 
162         public ExecRunnable(String shell, String command, ChannelProcessor channelProcessor, SourceCounter sourceCounter, boolean restart, long restartThrottle, boolean logStderr, int bufferCount, long batchTimeout, Charset charset) {
163             this.lastPushToChannel = this.systemClock.currentTimeMillis();
164             this.command = command;
165             this.channelProcessor = channelProcessor;
166             this.sourceCounter = sourceCounter;
167             this.restartThrottle = restartThrottle;
168             this.bufferCount = bufferCount;
169             this.batchTimeout = batchTimeout;
170             this.restart = restart;
171             this.logStderr = logStderr;
172             this.charset = charset;
173             this.shell = shell;
174         }
175 
176         public void run() {
177             do {
178                 String exitCode = "unknown";
179                 BufferedReader reader = null;
180                 String line = null;
181                 final List<Event> eventList = new ArrayList();
182                 this.timedFlushService = Executors.newSingleThreadScheduledExecutor((new ThreadFactoryBuilder()).setNameFormat("timedFlushExecService" + Thread.currentThread().getId() + "-%d").build());
183 
184                 try {
185                     String[] commandArgs;
186                     if (this.shell != null) {
187                         commandArgs = formulateShellCommand(this.shell, this.command);
188                         this.process = Runtime.getRuntime().exec(commandArgs);
189                     } else {
190                         commandArgs = this.command.split("\s+");
191                         this.process = (new ProcessBuilder(commandArgs)).start();
192                     }
193 
194                     reader = new BufferedReader(new InputStreamReader(this.process.getInputStream(), this.charset));
195                     ExecSource.StderrReader stderrReader = new ExecSource.StderrReader(new BufferedReader(new InputStreamReader(this.process.getErrorStream(), this.charset)), this.logStderr);
196                     stderrReader.setName("StderrReader-[" + this.command + "]");
197                     stderrReader.setDaemon(true);
198                     stderrReader.start();
199                     this.future = this.timedFlushService.scheduleWithFixedDelay(new Runnable() {
200                         public void run() {
201                             try {
202                                 List var1 = eventList;
203                                 synchronized(eventList) {
204                                     if (!eventList.isEmpty() && ExecRunnable.this.timeout()) {
205                                         ExecRunnable.this.flushEventBatch(eventList);
206                                     }
207                                 }
208                             } catch (Exception var4) {
209                                 ExecSource.logger.error("Exception occured when processing event batch", var4);
210                                 if (var4 instanceof InterruptedException) {
211                                     Thread.currentThread().interrupt();
212                                 }
213                             }
214 
215                         }
216                     }, this.batchTimeout, this.batchTimeout, TimeUnit.MILLISECONDS);
217 
218                     while((line = reader.readLine()) != null) {
219                         synchronized(eventList) {
220                             this.sourceCounter.incrementEventReceivedCount();
221                             eventList.add(EventBuilder.withBody(line.getBytes(this.charset)));
222                             if (eventList.size() >= this.bufferCount || this.timeout()) {
223                                 this.flushEventBatch(eventList);
224                             }
225                         }
226                     }
227 
228                     synchronized(eventList) {
229                         if (!eventList.isEmpty()) {
230                             this.flushEventBatch(eventList);
231                         }
232                     }
233                 } catch (Exception var23) {
234                     ExecSource.logger.error("Failed while running command: " + this.command, var23);
235                     if (var23 instanceof InterruptedException) {
236                         Thread.currentThread().interrupt();
237                     }
238                 } finally {
239                     if (reader != null) {
240                         try {
241                             reader.close();
242                         } catch (IOException var19) {
243                             ExecSource.logger.error("Failed to close reader for exec source", var19);
244                         }
245                     }
246 
247                     exitCode = String.valueOf(this.kill());
248                 }
249 
250                 if (this.restart) {
251                     ExecSource.logger.info("Restarting in {}ms, exit code {}", this.restartThrottle, exitCode);
252 
253                     try {
254                         Thread.sleep(this.restartThrottle);
255                     } catch (InterruptedException var20) {
256                         Thread.currentThread().interrupt();
257                     }
258                 } else {
259                     ExecSource.logger.info("Command [" + this.command + "] exited with " + exitCode);
260                 }
261             } while(this.restart);
262 
263         }
264 
265         private void flushEventBatch(List<Event> eventList) {
266             this.channelProcessor.processEventBatch(eventList);//通道处理器  详细见下面代码
267             this.sourceCounter.addToEventAcceptedCount((long)eventList.size());
268             eventList.clear();
269             this.lastPushToChannel = this.systemClock.currentTimeMillis();
270         }
271 
272         private boolean timeout() {
273             return this.systemClock.currentTimeMillis() - this.lastPushToChannel >= this.batchTimeout;
274         }
275 
276         private static String[] formulateShellCommand(String shell, String command) {
277             String[] shellArgs = shell.split("\s+");
278             String[] result = new String[shellArgs.length + 1];
279             System.arraycopy(shellArgs, 0, result, 0, shellArgs.length);
280             result[shellArgs.length] = command;
281             return result;
282         }
283 
284         public int kill() {
285             if (this.process != null) {
286                 Process var1 = this.process;
287                 synchronized(this.process) {
288                     this.process.destroy();
289 
290                     int var10000;
291                     try {
292                         int exitValue = this.process.waitFor();
293                         if (this.future != null) {
294                             this.future.cancel(true);
295                         }
296 
297                         if (this.timedFlushService != null) {
298                             this.timedFlushService.shutdown();
299 
300                             while(!this.timedFlushService.isTerminated()) {
301                                 try {
302                                     this.timedFlushService.awaitTermination(500L, TimeUnit.MILLISECONDS);
303                                 } catch (InterruptedException var5) {
304                                     ExecSource.logger.debug("Interrupted while waiting for exec executor service to stop. Just exiting.");
305                                     Thread.currentThread().interrupt();
306                                 }
307                             }
308                         }
309 
310                         var10000 = exitValue;
311                     } catch (InterruptedException var6) {
312                         Thread.currentThread().interrupt();
313                         return -2147483648;
314                     }
315 
316                     return var10000;
317                 }
318             } else {
319                 return -1073741824;
320             }
321         }
322 
323         public void setRestart(boolean restart) {
324             this.restart = restart;
325         }
326     }
327 }

ChannelProcessor  processEventBatch()

  1  public void processEventBatch(List<Event> events) {
  2         Preconditions.checkNotNull(events, "Event list must not be null");
  3         events = this.interceptorChain.intercept(events);//拦截器链---拦截事件
  4         Map<Channel, List<Event>> reqChannelQueue = new LinkedHashMap();
  5         Map<Channel, List<Event>> optChannelQueue = new LinkedHashMap();
  6         Iterator i$ = events.iterator();
  7 
  8         List batch;
  9         Iterator i$;
 10         while(i$.hasNext()) {
 11             Event event = (Event)i$.next();
 12             List<Channel> reqChannels = this.selector.getRequiredChannels(event);
 13 
 14             Object eventQueue;
 15             for(Iterator i$ = reqChannels.iterator(); i$.hasNext(); ((List)eventQueue).add(event)) {
 16                 Channel ch = (Channel)i$.next();
 17                 eventQueue = (List)reqChannelQueue.get(ch);
 18                 if (eventQueue == null) {
 19                     eventQueue = new ArrayList();
 20                     reqChannelQueue.put(ch, eventQueue);
 21                 }
 22             }
 23 
 24             batch = this.selector.getOptionalChannels(event);
 25 
 26             Object eventQueue;
 27             for(i$ = batch.iterator(); i$.hasNext(); ((List)eventQueue).add(event)) {
 28                 Channel ch = (Channel)i$.next();
 29                 eventQueue = (List)optChannelQueue.get(ch);
 30                 if (eventQueue == null) {
 31                     eventQueue = new ArrayList();
 32                     optChannelQueue.put(ch, eventQueue);
 33                 }
 34             }
 35         }
 36 
 37         i$ = reqChannelQueue.keySet().iterator();
 38 
 39         Channel optChannel;
 40         Transaction tx;
 41         Event event;
 42         while(i$.hasNext()) {
 43             optChannel = (Channel)i$.next();
 44             tx = optChannel.getTransaction();
 45             Preconditions.checkNotNull(tx, "Transaction object must not be null");
 46 
 47             try {
 48                 tx.begin();
 49                 batch = (List)reqChannelQueue.get(optChannel);
 50                 i$ = batch.iterator();
 51 
 52                 while(i$.hasNext()) {
 53                     event = (Event)i$.next();
 54                     optChannel.put(event);
 55                 }
 56 
 57                 tx.commit();
 58             } catch (Throwable var23) {
 59                 tx.rollback();
 60                 if (var23 instanceof Error) {
 61                     LOG.error("Error while writing to required channel: " + optChannel, var23);
 62                     throw (Error)var23;
 63                 }
 64 
 65                 if (var23 instanceof ChannelException) {
 66                     throw (ChannelException)var23;
 67                 }
 68 
 69                 throw new ChannelException("Unable to put batch on required channel: " + optChannel, var23);
 70             } finally {
 71                 if (tx != null) {
 72                     tx.close();
 73                 }
 74 
 75             }
 76         }
 77 
 78         i$ = optChannelQueue.keySet().iterator();
 79 
 80         while(i$.hasNext()) {
 81             optChannel = (Channel)i$.next();
 82             tx = optChannel.getTransaction();
 83             Preconditions.checkNotNull(tx, "Transaction object must not be null");
 84 
 85             try {
 86                 tx.begin();
 87                 batch = (List)optChannelQueue.get(optChannel);
 88                 i$ = batch.iterator();
 89 
 90                 while(i$.hasNext()) {
 91                     event = (Event)i$.next();
 92                     optChannel.put(event);
 93                 }
 94 
 95                 tx.commit();
 96             } catch (Throwable var21) {
 97                 tx.rollback();
 98                 LOG.error("Unable to put batch on optional channel: " + optChannel, var21);
 99                 if (var21 instanceof Error) {
100                     throw (Error)var21;
101                 }
102             } finally {
103                 if (tx != null) {
104                     tx.close();
105                 }
106 
107             }
108         }
109 
110     }

参照主机名拦截器HostInterceptor ---写限速拦截器  实现  Interceptor 

  1 //
  2 // Source code recreated from a .class file by IntelliJ IDEA
  3 // (powered by Fernflower decompiler)
  4 //
  5 
  6 package org.apache.flume.interceptor;
  7 
  8 import java.net.InetAddress;
  9 import java.net.UnknownHostException;
 10 import java.util.Iterator;
 11 import java.util.List;
 12 import java.util.Map;
 13 import org.apache.flume.Context;
 14 import org.apache.flume.Event;
 15 import org.slf4j.Logger;
 16 import org.slf4j.LoggerFactory;
 17 
 18 public class HostInterceptor implements Interceptor {
 19     private static final Logger logger = LoggerFactory.getLogger(HostInterceptor.class);
 20     private final boolean preserveExisting;
 21     private final String header;
 22     private String host;
 23 
 24     private HostInterceptor(boolean preserveExisting, boolean useIP, String header) {
 25         this.host = null;
 26         this.preserveExisting = preserveExisting;
 27         this.header = header;
 28 
 29         try {
 30             InetAddress addr = InetAddress.getLocalHost();
 31             if (useIP) {
 32                 this.host = addr.getHostAddress();
 33             } else {
 34                 this.host = addr.getCanonicalHostName();
 35             }
 36         } catch (UnknownHostException var6) {
 37             logger.warn("Could not get local host address. Exception follows.", var6);
 38         }
 39 
 40     }
 41 
 42     public void initialize() {
 43     }
 44 
 45     public Event intercept(Event event) {
 46         Map<String, String> headers = event.getHeaders();
 47         if (this.preserveExisting && headers.containsKey(this.header)) {
 48             return event;
 49         } else {
 50             if (this.host != null) {
 51                 headers.put(this.header, this.host);
 52             }
 53 
 54             return event;
 55         }
 56     }
 57 
 58     public List<Event> intercept(List<Event> events) {
 59         Iterator i$ = events.iterator();
 60 
 61         while(i$.hasNext()) {
 62             Event event = (Event)i$.next();
 63             this.intercept(event);
 64         }
 65 
 66         return events;
 67     }
 68 
 69     public void close() {
 70     }
 71 
 72     public static class Constants {
 73         public static String HOST = "host";
 74         public static String PRESERVE = "preserveExisting";
 75         public static boolean PRESERVE_DFLT = false;
 76         public static String USE_IP = "useIP";
 77         public static boolean USE_IP_DFLT = true;
 78         public static String HOST_HEADER = "hostHeader";
 79 
 80         public Constants() {
 81         }
 82     }
 83 
 84     public static class Builder implements org.apache.flume.interceptor.Interceptor.Builder {
 85         private boolean preserveExisting;
 86         private boolean useIP;
 87         private String header;
 88 
 89         public Builder() {
 90             this.preserveExisting = HostInterceptor.Constants.PRESERVE_DFLT;
 91             this.useIP = HostInterceptor.Constants.USE_IP_DFLT;
 92             this.header = HostInterceptor.Constants.HOST;
 93         }
 94 
 95         public Interceptor build() {
 96             return new HostInterceptor(this.preserveExisting, this.useIP, this.header);
 97         }
 98 
 99         public void configure(Context context) {
100             this.preserveExisting = context.getBoolean(HostInterceptor.Constants.PRESERVE, HostInterceptor.Constants.PRESERVE_DFLT);
101             this.useIP = context.getBoolean(HostInterceptor.Constants.USE_IP, HostInterceptor.Constants.USE_IP_DFLT);
102             this.header = context.getString(HostInterceptor.Constants.HOST_HEADER, HostInterceptor.Constants.HOST);
103         }
104     }
105 }
原文地址:https://www.cnblogs.com/star521/p/10003750.html