29 友盟大数据--flume源码查看分析ExecSource--UmengExecSource 改造exec源 :监控目录、收集新文件---增加个守护线程不断监控目录

flume自定义源防丢失---解决flume还未收集完日志便已经滚动,数据丢失问题

防重、防丢失

改造exec源 :监控目录、收集新文件---增加个守护线程不断监控目录--- 一次收集完一个文件

redis 3号库维护key  --- 防止重复

UmengExecSource

  1 package com.oldboy.umeng.flume;
  2 
  3 import java.io.*;
  4 import java.util.ArrayList;
  5 import java.util.List;
  6 import java.util.concurrent.ExecutorService;
  7 import java.util.concurrent.Executors;
  8 import java.util.concurrent.Future;
  9 import java.util.concurrent.ScheduledExecutorService;
 10 import java.util.concurrent.ScheduledFuture;
 11 import java.util.concurrent.TimeUnit;
 12 
 13 import org.apache.flume.Channel;
 14 import org.apache.flume.Context;
 15 import org.apache.flume.Event;
 16 import org.apache.flume.EventDrivenSource;
 17 import org.apache.flume.Source;
 18 import org.apache.flume.SystemClock;
 19 import org.apache.flume.channel.ChannelProcessor;
 20 import org.apache.flume.conf.Configurable;
 21 import org.apache.flume.event.EventBuilder;
 22 import org.apache.flume.instrumentation.SourceCounter;
 23 import org.apache.flume.source.AbstractSource;
 24 import org.apache.flume.source.ExecSourceConfigurationConstants;
 25 import org.slf4j.Logger;
 26 import org.slf4j.LoggerFactory;
 27 
 28 import com.google.common.base.Preconditions;
 29 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 30 import redis.clients.jedis.Jedis;
 31 
 32 import java.nio.charset.Charset;
 33 
 34 public class UmengExecSource extends AbstractSource implements EventDrivenSource, Configurable {
 35 
 36     private static final Logger logger = LoggerFactory.getLogger(org.apache.flume.source.ExecSource.class);
 37 
 38     private String shell;
 39     private String command;
 40     private SourceCounter sourceCounter;
 41     private ExecutorService executor;
 42     private Future<?> runnerFuture;
 43     private long restartThrottle;
 44     private boolean restart;
 45     private boolean logStderr;
 46     private Integer bufferCount;
 47     private long batchTimeout;
 48     private UmengExecSource.ExecRunnable runner;
 49     private Charset charset;
 50     private String spooldir ;
 51     private String prefix ;
 52     private String suffix ;
 53     private String redisHost ;
 54     private int redisPort ;
 55 
 56     public void start() {
 57         logger.info("Exec source starting with command:{}", command);
 58 
 59         executor = Executors.newSingleThreadExecutor();
 60 
 61         runner = new UmengExecSource.ExecRunnable(shell, command, getChannelProcessor(),
 62                                                                             sourceCounter, restart, restartThrottle,
 63                                                                             logStderr, bufferCount, batchTimeout,
 64                                                                             charset);
 65 
 66         runnerFuture = executor.submit(runner);
 67         sourceCounter.start();
 68 
 69         new SpoolDirThread(spooldir, prefix, suffix, redisHost, redisPort, this).start();
 70         //启动守护线程
 71         super.start();
 72         logger.debug("Exec source started");
 73     }
 74 
 75     @Override
 76     public void stop() {
 77         logger.info("Stopping exec source with command:{}", command);
 78         if (runner != null) {
 79             runner.setRestart(false);
 80             runner.kill();
 81         }
 82 
 83         if (runnerFuture != null) {
 84             logger.debug("Stopping exec runner");
 85             runnerFuture.cancel(true);
 86             logger.debug("Exec runner stopped");
 87         }
 88         executor.shutdown();
 89 
 90         while (!executor.isTerminated()) {
 91             logger.debug("Waiting for exec executor service to stop");
 92             try {
 93                 executor.awaitTermination(500, TimeUnit.MILLISECONDS);
 94             } catch (InterruptedException e) {
 95                 logger.debug("Interrupted while waiting for exec executor service " + "to stop. Just exiting.");
 96                 Thread.currentThread().interrupt();
 97             }
 98         }
 99 
100         sourceCounter.stop();
101         super.stop();
102 
103         logger.debug("Exec source with command:{} stopped. Metrics:{}", command, sourceCounter);
104     }
105 
106     /**
107      * 获得配置属性
108      */
109     public void configure(Context context) {
110         command = context.getString("command");
111 
112         Preconditions.checkState(command != null, "The parameter command must be specified");
113 
114         restartThrottle = context.getLong(ExecSourceConfigurationConstants.CONFIG_RESTART_THROTTLE,
115                 ExecSourceConfigurationConstants.DEFAULT_RESTART_THROTTLE);
116 
117         restart = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_RESTART,
118                 ExecSourceConfigurationConstants.DEFAULT_RESTART);
119 
120         logStderr = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_LOG_STDERR,
121                 ExecSourceConfigurationConstants.DEFAULT_LOG_STDERR);
122 
123         bufferCount = context.getInteger(ExecSourceConfigurationConstants.CONFIG_BATCH_SIZE,
124                 ExecSourceConfigurationConstants.DEFAULT_BATCH_SIZE);
125 
126         batchTimeout = context.getLong(ExecSourceConfigurationConstants.CONFIG_BATCH_TIME_OUT,
127                 ExecSourceConfigurationConstants.DEFAULT_BATCH_TIME_OUT);
128 
129         charset = Charset.forName(context.getString(ExecSourceConfigurationConstants.CHARSET,
130                 ExecSourceConfigurationConstants.DEFAULT_CHARSET));
131 
132         shell = context.getString(ExecSourceConfigurationConstants.CONFIG_SHELL, null);
133 
134         spooldir = context.getString("spooldir") ;
135         prefix = context.getString("prefix" , "access.log.") ;//判断前缀
136         suffix = context.getString("suffix" , "COMPLETED") ;//加后缀
137         redisHost = context.getString("redisHost" , "s101") ;//配置redis
138         redisPort = context.getInteger("redisPort" , 6379) ;
139         if(spooldir == null){
140             System.out.println("spooldir没有配置!");
141             System.exit(-1);
142         }
143 
144         if (sourceCounter == null) {
145             sourceCounter = new SourceCounter(getName());
146         }
147     }
148 
149     private static class ExecRunnable implements Runnable {
150 
151         public ExecRunnable(String shell, String command, ChannelProcessor channelProcessor, SourceCounter sourceCounter, boolean restart, long restartThrottle, boolean logStderr, int bufferCount, long batchTimeout, Charset charset) {
152             this.command = command;
153             this.channelProcessor = channelProcessor;
154             this.sourceCounter = sourceCounter;
155             this.restartThrottle = restartThrottle;
156             this.bufferCount = bufferCount;
157             this.batchTimeout = batchTimeout;
158             this.restart = restart;
159             this.logStderr = logStderr;
160             this.charset = charset;
161             this.shell = shell;
162         }
163 
164         private final String shell;
165         private final String command;
166         private final ChannelProcessor channelProcessor;
167         private final SourceCounter sourceCounter;
168         private volatile boolean restart;
169         private final long restartThrottle;
170         private final int bufferCount;
171         private long batchTimeout;
172         private final boolean logStderr;
173         private final Charset charset;
174         private Process process = null;
175         private SystemClock systemClock = new SystemClock();
176         private Long lastPushToChannel = systemClock.currentTimeMillis();
177         ScheduledExecutorService timedFlushService;
178         ScheduledFuture<?> future;
179 
180         public void run() {
181             do {
182                 String exitCode = "unknown";
183                 BufferedReader reader = null;
184                 String line = null;
185                 final List<Event> eventList = new ArrayList<Event>();
186 
187                 timedFlushService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat(
188                         "timedFlushExecService" + Thread.currentThread().getId() + "-%d").build());
189                 try {
190                     if (shell != null) {
191                         String[] commandArgs = formulateShellCommand(shell, command);
192                         process = Runtime.getRuntime().exec(commandArgs);
193                     } else {
194                         String[] commandArgs = command.split("\s+");
195                         process = new ProcessBuilder(commandArgs).start();
196                     }
197                     reader = new BufferedReader(new InputStreamReader(process.getInputStream(), charset));
198 
199                     // StderrLogger dies as soon as the input stream is invalid
200                     UmengExecSource.StderrReader stderrReader = new UmengExecSource.StderrReader(new BufferedReader(new InputStreamReader(process.getErrorStream(),
201                                                                                                                                                                                        charset)),
202                                                                                                                                               logStderr);
203                     stderrReader.setName("StderrReader-[" + command + "]");
204                     stderrReader.setDaemon(true);
205                     stderrReader.start();
206 
207                     future = timedFlushService.scheduleWithFixedDelay(new Runnable() {
208                         public void run() {
209                             try {
210                                 synchronized (eventList) {
211                                     if (!eventList.isEmpty() && timeout()) {
212                                         flushEventBatch(eventList);
213                                     }
214                                 }
215                             } catch (Exception e) {
216                                 logger.error("Exception occured when processing event batch", e);
217                                 if (e instanceof InterruptedException) {
218                                     Thread.currentThread().interrupt();
219                                 }
220                             }
221                         }
222                     }, batchTimeout, batchTimeout, TimeUnit.MILLISECONDS);
223 
224                     while ((line = reader.readLine()) != null) {
225                         synchronized (eventList) {
226                             sourceCounter.incrementEventReceivedCount();
227                             eventList.add(EventBuilder.withBody(line.getBytes(charset)));
228                             if (eventList.size() >= bufferCount || timeout()) {
229                                 flushEventBatch(eventList);
230                             }
231                         }
232                     }
233 
234                     synchronized (eventList) {
235                         if (!eventList.isEmpty()) {
236                             flushEventBatch(eventList);
237                         }
238                     }
239                 } catch (Exception e) {
240                     logger.error("Failed while running command: " + command, e);
241                     if (e instanceof InterruptedException) {
242                         Thread.currentThread().interrupt();
243                     }
244                 } finally {
245                     if (reader != null) {
246                         try {
247                             reader.close();
248                         } catch (IOException ex) {
249                             logger.error("Failed to close reader for exec source", ex);
250                         }
251                     }
252                     exitCode = String.valueOf(kill());
253                 }
254                 if (restart) {
255                     logger.info("Restarting in {}ms, exit code {}", restartThrottle, exitCode);
256                     try {
257                         Thread.sleep(restartThrottle);
258                     } catch (InterruptedException e) {
259                         Thread.currentThread().interrupt();
260                     }
261                 } else {
262                     logger.info("Command [" + command + "] exited with " + exitCode);
263                 }
264             } while (restart);
265         }
266 
267         private void flushEventBatch(List<Event> eventList) {
268             channelProcessor.processEventBatch(eventList);
269             sourceCounter.addToEventAcceptedCount(eventList.size());
270             eventList.clear();
271             lastPushToChannel = systemClock.currentTimeMillis();
272         }
273 
274         private boolean timeout() {
275             return (systemClock.currentTimeMillis() - lastPushToChannel) >= batchTimeout;
276         }
277 
278         private static String[] formulateShellCommand(String shell, String command) {
279             String[] shellArgs = shell.split("\s+");
280             String[] result = new String[shellArgs.length + 1];
281             System.arraycopy(shellArgs, 0, result, 0, shellArgs.length);
282             result[shellArgs.length] = command;
283             return result;
284         }
285 
286         public int kill() {
287             if (process != null) {
288                 synchronized (process) {
289                     process.destroy();
290 
291                     try {
292                         int exitValue = process.waitFor();
293 
294                         // Stop the Thread that flushes periodically
295                         if (future != null) {
296                             future.cancel(true);
297                         }
298 
299                         if (timedFlushService != null) {
300                             timedFlushService.shutdown();
301                             while (!timedFlushService.isTerminated()) {
302                                 try {
303                                     timedFlushService.awaitTermination(500, TimeUnit.MILLISECONDS);
304                                 } catch (InterruptedException e) {
305                                     logger.debug("Interrupted while waiting for exec executor service " + "to stop. Just exiting.");
306                                     Thread.currentThread().interrupt();
307                                 }
308                             }
309                         }
310                         return exitValue;
311                     } catch (InterruptedException ex) {
312                         Thread.currentThread().interrupt();
313                     }
314                 }
315                 return Integer.MIN_VALUE;
316             }
317             return Integer.MIN_VALUE / 2;
318         }
319 
320         public void setRestart(boolean restart) {
321             this.restart = restart;
322         }
323     }
324 
325     private static class StderrReader extends Thread {
326         private BufferedReader input;
327         private boolean logStderr;
328 
329         protected StderrReader(BufferedReader input, boolean logStderr) {
330             this.input = input;
331             this.logStderr = logStderr;
332         }
333 
334         public void run() {
335             try {
336                 int i = 0;
337                 String line = null;
338                 while ((line = input.readLine()) != null) {
339                     if (logStderr) {
340                         // There is no need to read 'line' with a charset
341                         // as we do not to propagate it.
342                         // It is in UTF-16 and would be printed in UTF-8 format.
343                         logger.info("StderrLogger[{}] = '{}'", ++i, line);
344                     }
345                 }
346             } catch (IOException e) {
347                 logger.info("StderrLogger exiting", e);
348             } finally {
349                 try {
350                     if (input != null) {
351                         input.close();
352                     }
353                 } catch (IOException ex) {
354                     logger.error("Failed to close stderr reader for exec source", ex);
355                 }
356             }
357         }
358     }
359 
360     public static class SpoolDirThread extends Thread{
361         private String spoolDir ;
362         private String prefix ;
363         private String suffix ;
364         private Jedis redis ;
365         private UmengExecSource source ;
366 
367         public SpoolDirThread(String spooldir, String prefix , String suffix ,String host , int port , UmengExecSource source){
368             //守护线程
369             this.setDaemon(true);
370             this.spoolDir = spooldir ;
371             this.prefix = prefix ;
372             this.suffix = suffix ;
373             this.source = source ;
374             redis = new Jedis(host , port ) ;//配置redis
375             redis.select(3) ;
376 
377         }
378         public void run() {
379             while(true){
380                 File dir = new File(spoolDir) ;
381                 if(dir.isDirectory()){
382                     File[] files = dir.listFiles() ;
383                     for(File f :files){
384                         String fname = f.getName() ;
385                         //处理滚动的文件 有前缀 没后缀  进行处理
386                         if(fname.startsWith(prefix) && !fname.endsWith(suffix)){
387                             doProcessLog(f) ;
388                         }
389                     }
390                     try {
391                         Thread.sleep(2000);
392                     } catch (InterruptedException e) {
393                         e.printStackTrace();
394                     }
395                 }
396             }
397         }
398 
399         /**
400          * 处理一个滚动的日志文件
401          */
402         private void doProcessLog(File f) {
403             try {
404                 BufferedReader br =new BufferedReader(new InputStreamReader(new FileInputStream(f))) ;
405                 String line = null ;
406                 while((line = br.readLine()) != null){
407                     String key = line.substring(0,line.lastIndexOf("#")) ;
408                     //已经收集过了
409                     System.out.println(" : ke防丢失源y = " + key);
410                     if(!redis.exists(key)){
411                         System.out.println(key + " : 未处理!");
412                         //交给通道处理器
413                         source.getChannelProcessor().processEvent(EventBuilder.withBody(line.getBytes()));
414                     }
415                 }
416                 //重命名文件  处理完加后缀COMPLETED
417                 f.renameTo(new File(f.getParentFile() , f.getName() + "." + suffix)) ;
418             } catch (Exception e) {
419                 e.printStackTrace();
420             }
421         }
422     }
423 }
原文地址:https://www.cnblogs.com/star521/p/10004235.html