springboot程序进行定时删除elasticsearch6过期索引

package com.shixun.syslogsearch.service.impl;

import com.alibaba.fastjson.JSONObject;
import com.shixun.syslogsearch.dao.SyslogRepository;
import com.shixun.syslogsearch.entity.ESLog;
import com.shixun.syslogsearch.entity.PageResult;
import com.shixun.syslogsearch.entity.SysLogLoadFile;
import com.shixun.syslogsearch.entity.SyslogQueueType;
import com.shixun.syslogsearch.param.SysLogQueryParam;
import com.shixun.syslogsearch.service.SyslogService;
import com.shixun.syslogsearch.utils.*;
import org.apache.commons.lang3.StringUtils;
import org.apache.lucene.document.StringField;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.GetIndexResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.snapshots.SnapshotShardsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.query.*;
import org.springframework.stereotype.Service;

import java.io.*;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

@Service
public class SyslogServiceImpl implements SyslogService {
    @Value("${collection.zipFilePath}")
    private String zipFilePath;
    @Value("${collection.unzipShellPath}")
    private String unzipShellPath;
    @Value("${collection.unzipTempPath}")
    private String unzipTempPath;
    @Value("${elasticsearch.indexPrefix}")
    private String indexPrefix;
    @Value("${elasticsearch.expireDays}")
    private String expireDays;

    @Autowired
    private RedisClient redisClient;
    @Autowired
    private ElasticsearchOperations elasticsearchTemplate;
    @Autowired
    private RestHighLevelClient elasticsearchClient;
    @Autowired
    private SyslogRepository syslogRepository;
    private static final Logger logger = LoggerFactory.getLogger(SyslogServiceImpl.class);


    private SimpleDateFormat sdf1 = new SimpleDateFormat("yyyyMMddHHmmss");
    private SimpleDateFormat sdf2 = new SimpleDateFormat("yyyyMMddHHmmss");

    private final static String aliasName = "syslog"; //索引别名

    @Override
    public void addSyslog(String indexName, ESLog esLog) {
        /***添加数据**/
        IndexQuery indexQuery = new IndexQueryBuilder().withIndexName(indexName).withObject(esLog).build();
        elasticsearchTemplate.index(indexQuery);
    }

    @Override
    public void addSyslogDemo() {
        String indexName = generateIndexName();
        /***创建索引***/
        createIndex(indexName);
        /***添加数据***/
        ESLog esLog = new ESLog();
        esLog.setTime(System.currentTimeMillis());
        esLog.setSource_ip("127.0.0.1");
        esLog.setHost_ip("127.0.0.1");
        IndexQuery indexQuery = new IndexQueryBuilder().withIndexName(indexName).withObject(esLog).build();
        elasticsearchTemplate.index(indexQuery);
    }

    @Override
    public void updateRefreshInterval(String indexName, int second) {
        String value = "";
        if (second < 0) {
            value = "-1";
        } else {
            value = second + "s";
        }
        try {
            elasticsearchClient.indices().putSettings(new UpdateSettingsRequest(indexName).settings(Settings.builder().put("index.refresh_interval", value)));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String generateIndexName() {
        String indexName = indexPrefix + sdf1.format(new Date());
        return indexName;
    }

    @Override
    public String generateIndexName(String fileName) {
        String indexName = indexPrefix + sdf2.format(new Date())+"-"+fileName.replace(".zip","");
        return indexName;
    }


    @Override
    public void createIndex(String indexName) {
        if (!elasticsearchTemplate.indexExists(indexName)) {
            /****生成索引***/
            ESLog esLog = new ESLog();
            elasticsearchTemplate.createIndex(indexName);
            elasticsearchTemplate.putMapping(indexName, "_doc", ESLog.class);
            /***为索引设置别名方便查询***/
            AliasQuery aliasQuery = new AliasBuilder()
                    .withIndexName(indexName)
                    .withAliasName(aliasName).build();
            elasticsearchTemplate.addAlias(aliasQuery);
        }
    }

    @Override
/******清理过期索引****/
public void clearExpireIndex() { logger.debug("索引起始名称为{},保留周期为{}天", indexPrefix, expireDays); String[] allIndexes = getAllIndices(); if (allIndexes.length > 0) { logger.debug("当前索引总数为:{}", allIndexes.length); List<String> timeoutList = getIndicesTimeout(allIndexes,indexPrefix, Long.parseLong(expireDays), TimeUnit.DAYS); if (timeoutList.size() > 0) { logger.debug("对于前缀为 {} 的索引 过期数目为:{}", indexPrefix, timeoutList.size()); timeoutList.forEach(indexName -> { if (elasticsearchTemplate.deleteIndex(indexName)) { logger.debug("成功删除 {} 索引", indexName); } else { logger.debug("删除 {} 索引 失败", indexName); } }); } } } /** * 获取指定索引的创建时间 * @param indexName 索引名称 * @return 索引的创建时间 */ @Override public String getCreateTimeForIndex(String indexName) { return String.valueOf(elasticsearchTemplate.getSetting(indexName).get("index.creation_date")); } /** * 获取前缀相同的过时索引 * @param allIndices 索引列表 * @param indexPrefix 要过滤的索引前缀 * @param time 超时时间 * @param timeUnit 时间单位 * @return 超时的索引列表 */ @Override public List<String> getIndicesTimeout(String[] allIndices, String indexPrefix, Long time, TimeUnit timeUnit) { List<String> list = new ArrayList<>(); for(String indexName:allIndices){ if(indexName.startsWith(indexPrefix)){ String createTime = getCreateTimeForIndex(indexName); if (System.currentTimeMillis() - Long.parseLong(createTime) > timeUnit.toMillis(time)) { logger.debug("索引 {} 已经过期,创建时间为{}", indexName, createTime); list.add(indexName); } } } return list; } /** * 获取所有index */ @Autowired public String[] getAllIndices() { GetIndexRequest request = new GetIndexRequest(indexPrefix+"*"); GetIndexResponse response = null; try { response = elasticsearchClient.indices().get(request, RequestOptions.DEFAULT); String[] indices = response.getIndices(); return indices; } catch (IOException e) { e.printStackTrace(); } return null; } }

参考文档:https://blog.csdn.net/u013084266/article/details/103341569

 

原文地址:https://www.cnblogs.com/yinliang/p/13892575.html