flume自定义sink

package me;

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;

public class MySink extends AbstractSink implements Configurable {

	// 在整个sink结束时执行一遍
	@Override
	public synchronized void stop() {
		// TODO Auto-generated method stub
		super.stop();
	}

	// 在整个sink开始时执行一遍
	@Override
	public synchronized void start() {
		// TODO Auto-generated method stub
		super.start();
	}

	// 不断循环调用
	@Override
	public Status process() throws EventDeliveryException {
		   // TODO Auto-generated method stub
        Channel ch = getChannel();
        Transaction txn = ch.getTransaction();
        Event event =null;
        txn.begin();
        while(true){
            event = ch.take();
            if (event!=null) {
                break;
            }
        }
        try {

            String body = new String(event.getBody());
            System.out.println("event.getBody()-----" + body);
            txn.commit();
            return Status.READY;
        } catch (Throwable th) {
            txn.rollback();

            if (th instanceof Error) {
                throw (Error) th;
            } else {
                throw new EventDeliveryException(th);
            }
        } finally {
            txn.close();
        }

	}

	@Override
	public void configure(Context arg0) {
		// TODO Auto-generated method stub
		System.out.println("configure-------" + arg0);
	}

}
agent.sources = s1    
agent.channels = c1  
agent.sinks = sk1  

agent.sources.s1.type = netcat  
agent.sources.s1.bind = localhost  
agent.sources.s1.port = 5678  
agent.sources.s1.channels = c1  

agent.sinks.sk1.type = me.MySink
agent.sinks.sk1.hostname=192.168.16.33
agent.sinks.sk1.port=3306
agent.sinks.sk1.databaseName=test
agent.sinks.sk1.tableName=user
agent.sinks.sk1.user=root
agent.sinks.sk1.password=WoChu@123
agent.sinks.sk1.column_name=id, username, password
agent.sinks.sk1.field_separator=\|
agent.sinks.sk1.channel = c1  

agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000     
agent.channels.c1.transactionCapacity = 100
lihudeMacBook-Pro:~ SunAndLi$ telnet  localhost 5678

 

原文地址:https://www.cnblogs.com/sunyaxue/p/6644466.html