solr中通过SFTP访问文件建立索引

需求:

  从oracle数据库中根据记录的文件名filename_html(多个文件以逗号隔开),文件路径path,备用文件名bakpath中获取

主机172.21.0.31上对应的html文件内容,并且只能通过sftp访问html文件,获取文件内容建立索引.

问题:

  目前的难点是字段filename_html中可以有多个文件名,并且多个文件抽取到一个索引字段content下面.另一个是数据访问方式sftp方式.目前DIH组件中没有相应的SFTP访问.

解决方法:

  引入jsch组件包.开发相应SFTP组件.

1.编写BinSFTPDataSource数据源,用于生成响应的InputStream流.编写过程中注意流的关闭,否则容易造成Too many files 异常.

package org.apache.solr.handler.dataimport;

import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.regex.Pattern;

/**
 */
public class BinSFTPDataSource extends DataSource<InputStream> {
    Logger LOG = LoggerFactory.getLogger(BinSFTPDataSource.class);

    private Session session ;
    private ChannelSftp channel;
    private InputStream is;
    
    private String baseUrl;

    private String username;

    private String password;

    private String host;

    private int connectionTimeout = CONNECTION_TIMEOUT;

    private int readTimeout = READ_TIMEOUT;

    private Context context;

    private Properties initProps;

    public BinSFTPDataSource() {
    }

    @Override
    public void init(Context context, Properties initProps) {
        this.context = context;
        this.initProps = initProps;

        baseUrl = getInitPropWithReplacements(BASE_URL);
        String cTimeout = getInitPropWithReplacements(CONNECTION_TIMEOUT_FIELD_NAME);
        String rTimeout = getInitPropWithReplacements(READ_TIMEOUT_FIELD_NAME);
        username = getInitPropWithReplacements(USERNAME);
        password = getInitPropWithReplacements(PASSWORD);
        host = getInitPropWithReplacements(HOST);
        if (cTimeout != null) {
            try {
                connectionTimeout = Integer.parseInt(cTimeout);
            } catch (NumberFormatException e) {
                LOG.warn("Invalid connection timeout: " + cTimeout);
            }
        }
        if (rTimeout != null) {
            try {
                readTimeout = Integer.parseInt(rTimeout);
            } catch (NumberFormatException e) {
                LOG.warn("Invalid read timeout: " + rTimeout);
            }
        }
        try {
            JSch jsch = new JSch(); // 创建JSch对象
            session = jsch.getSession(username, host, PORT);
            if (password != null)
                session.setPassword(password);
            Properties config = new Properties();
            config.put("StrictHostKeyChecking", "no");
            session.setConfig(config);

            session.setTimeout(readTimeout);
            session.connect(connectionTimeout);
        } catch (JSchException e) {
            close();
            e.printStackTrace();
        }
    }

    @Override
    public InputStream getData(String filename) {
        if(StringUtils.isEmpty(filename)) return null;
        if(StringUtils.isNotEmpty(baseUrl))
            filename = baseUrl + filename;
        try {
            LOG.info("session isConnect:"+session.isConnected());
            channel = (ChannelSftp) session.openChannel("sftp");
            channel.connect(); // 建立SFTP通道的连接
            LOG.info("channel isConnect:"+channel.isConnected());
            is = channel.get(filename);
            return  is;
        } catch (Exception e) {
            close();
            LOG.error("Exception thrown while getting data", e);
            wrapAndThrow(SEVERE, e, "Exception in invoking url " +filename);
            return null;// unreachable
        }
    }

    @Override
    public void close() {
        if(is!=null)
            try {
                is.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        if(channel!=null) channel.disconnect();
//        if(session!=null) session.disconnect();
    }

    public String getBaseUrl() {
        return baseUrl;
    }

    private String getInitPropWithReplacements(String propertyName) {
        final String expr = initProps.getProperty(propertyName);
        if (expr == null) {
            return null;
        }
        return context.replaceTokens(expr);
    }

    static final Pattern URIMETHOD = Pattern.compile("sftp:/",
            Pattern.CASE_INSENSITIVE);

    public static final String ENCODING = "encoding";

    public static final String BASE_URL = "baseUrl";

    public static final String UTF_8 = "UTF-8";

    public static final String CONNECTION_TIMEOUT_FIELD_NAME = "connectionTimeout";

    public static final String READ_TIMEOUT_FIELD_NAME = "readTimeout";

    public static final int CONNECTION_TIMEOUT = 5000;

    public static final int READ_TIMEOUT = 10000;

    public static final String USERNAME = "username";

    public static final String PASSWORD = "password";

    public static final String HOST = "host";

    public static final int PORT = 22;

}

2. 编写URLListEntityProcessor.java类,用于循环遍历多url文件.

package org.apache.solr.handler.dataimport;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * 主要用于抽取多个文件内容.可是是本地主机也可以是远程主机上的文件
 */
public class URLListEntityProcessor extends EntityProcessorBase {
  /**
   * 文件名字符串
   */
  protected String fileNames;
  /**
   * 文件名字符串分隔符
   */
  protected String regex;

  /**
   * data-config.xml中给定的基础目录
   */
  protected String baseDir;

  /**
   * The recursive given in data-config. Default value is false.
   */
  protected boolean recursive = false;

  @Override
  public void init(Context context) {
    super.init(context);
    fileNames = context.getEntityAttribute(FILE_NAMES);
    if (fileNames != null) {
        fileNames = context.replaceTokens(fileNames);
    }
    regex = context.getEntityAttribute(REGEX);
    if (regex != null) {
        regex = context.replaceTokens(regex);
    }
    baseDir = context.getEntityAttribute(BASE_DIR);
    if (baseDir == null)
      throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
              "'baseDir' is a required attribute");
    baseDir = context.replaceTokens(baseDir);
    
    String r = context.getEntityAttribute(RECURSIVE);
    if (r != null)
      recursive = Boolean.parseBoolean(r);
  }

  @Override
  public Map<String, Object> nextRow() {
    if (rowIterator != null)
      return getNext();
    List<Map<String, Object>> fileDetails = new ArrayList<Map<String, Object>>();
    getUrls(fileDetails);
    rowIterator = fileDetails.iterator();
    return getNext();
  }

  private void getUrls(final List<Map<String, Object>> fileDetails) {
      String[] names = fileNames.split(regex);
      for(String name : names){
          Map<String, Object> details = new HashMap<String, Object>();
          details.put(FILE_NAME, baseDir+name);
          fileDetails.add(details);
      }
  }

  public static final String DIR = "fileDir";

  public static final String ABSOLUTE_FILE = "fileAbsolutePath";

  public static final String FILE_NAME = "fileName";
  
  public static final String FILE_NAMES = "fileNames";

  public static final String BASE_DIR = "baseDir";

  public static final String REGEX = "regex";

  public static final String RECURSIVE = "recursive";

}

3.配置data-config.xml文件:

<dataConfig>
<dataSource name="jdbc" driver="oracle.jdbc.driver.OracleDriver"
    url="jdbc:oracle:thin:@127.0.0.1:1522:ORCLLI" user="kms_iep" password="kms_iep" batchSize="2000"/>
<dataSource name="binSftp" type="BinSFTPDataSource" 
        username="kms" password="kms" host="127.0.0.1"
        connectionTimeout="10000" readTimeout="20000" />
    <document>
        <entity  pk="ID"  dataSource="jdbc" name="province"
            query="select (provincecode || '_' || kng_id) as id,
                   kng_id,
                   kng_type as type,
                   kng_title as title,
                   provincecode,
                   opertime,
                   modify_date,
                   url,
                   pack_month_fee,
                   pack_type,
                   pack_sen_flow,
                   filename_html,
                   ('/kmsinterface/jt/province_bak/' || provincecode || '/' ||
                   to_char(opertime, 'yyyymmdd') || substr(filepath,instr(filepath,'/',2)) || '/'
                   ) as path,
                   ('/kmsinterface/' || provincecode ||filepath) as bakpath
              from IEP_UPLOAD_DOCUMENT t
             where kng_status = 0  and provincecode='ah'  and to_char(opertime,'yyyy-mm-dd')='2014-12-24'
                   "
            deltaQuery="select (provincecode || '_' || kng_id) as id  from IEP_UPLOAD_DOCUMENT  where kng_status = 0  and  opertime &gt; to_date('${dih.last_index_time}','yyyy-mm-dd hh24:mi:ss') order by opertime asc"
            deltaImportQuery="select * from (
                            select (provincecode || '_' || kng_id) as id,
                                   kng_id,
                                   kng_type as type,
                                   kng_title as title,
                                   provincecode,
                                   opertime,
                                   modify_date,
                                   url,
                                   pack_month_fee,
                                   pack_type,
                                   pack_sen_flow,
                                   filename_html,
                                   ('/kmsinterface/jt/province_bak/' || provincecode || '/' ||
                                   to_char(opertime, 'yyyymmdd') || substr(filepath,instr(filepath,'/',2)) || '/' ) as path,
                                   ('/kmsinterface/' || provincecode ||filepath) as bakpath
                              from IEP_UPLOAD_DOCUMENT t
                             where kng_status = 0 )
                                where id = '${dih.delta.ID}'"
            deletePKQuery="select (provincecode || '_' || kng_id) as id  from IEP_UPLOAD_DOCUMENT  where kng_status = 1   and opertime &gt; to_date('${dih.last_index_time}','yyyy-mm-dd hh24:mi:ss') order by id desc"
            transformer="DateFormatTransformer,RegexTransformer"
            onError="skip">
            <field column="ID" name="id" />
            <field column="KNG_ID" name="kng_id" />
            <field column="type" name="type" />
            <field column="TITLE" name="title" />
            <field column="PROVINCECODE" name="provincecode" />
            <field column="OPERTIME" name="opertime" dateTimeFormat="yyyy-MM-dd HH:mm:ss"/>
            <field column="MODIFY_DATE" name="modify_date" dateTimeFormat="yyyy-MM-dd HH:mm:ss"/>
            <field column="URL" name="url"  />
            <field column="PACK_MONTH_FEE" name="pack_month_fee"  />
            <field column="PACK_TYPE" name="pack_type"  />
            <field column="PACK_SEN_FLOW" name="pack_sen_flow"  />
            <entity name="urllist1" processor="URLListEntityProcessor"  
                    baseDir="/kms/solr${province.PATH}" 
                    fileNames="${province.FILENAME_HTML}" regex=",">
            
                <!--解析附件-->
                <entity name="test1" processor="TikaEntityProcessor" url="${urllist1.fileName}"
                        dataSource="url"      format="text" 
                        transformer="HTMLStripTransformer,RegexTransformer" onError="skip">
                        <field column="text" name="content" stripHTML="true" regex="	|
|
|s"  replaceWith="" />
                </entity>
            </entity>
            
            <entity name="urllist2" processor="URLListEntityProcessor"  
                    baseDir="/kms/solr${province.BAKPATH}" 
                    fileNames="${province.FILENAME_HTML}" regex=",">

                <entity name="test2" processor="TikaEntityProcessor" url="${urllist2.fileName}" 
                        dataSource="url"  format="text" 
                        transformer="HTMLStripTransformer,RegexTransformer" onError="skip">
                        <field column="text" name="content" stripHTML="true" regex="	|
|
|s"  replaceWith="" />
                </entity>
            </entity>
            
        </entity>
    </document>
</dataConfig>
原文地址:https://www.cnblogs.com/a198720/p/4269154.html