通过Logstash将 CSV 文档到 Elasticsearch

准备文件cars.csv

将csv文件导入elasticsearch

logstash_cars.config 文件的内容如下:

input {
    file {
        path => "/Users/liuxg/data/cars.csv"
        start_position => "beginning"
        sincedb_path => "/dev/null"
    }
}
 
filter {
    csv {
        separator => ","
 
        columns => [ "maker", "model", "mileage", "manufacture_year", "engine_displacement",
        "engine_power", "body_type", "color_slug", "stk_year", "transmission", "door_count",
        "seat_count", "fuel_type", "date_created", "date_last_seen", "price_eur" ]
    }
 
    mutate { convert => ["mileage", "integer"] }
    mutate { convert => ["price_eur", "float"] }
    mutate { convert => ["engine_power", "integer"] }
    mutate { convert => ["door_count", "integer"] }
    mutate { convert => ["seat_count", "integer"] }
}
 
output {
    elasticsearch {
        hosts => "localhost:9200"
        index => "cars"
        document_type => "sold_cars"
    }
 
    stdout {}
}

这里有几点需要说明的:

  • 在 input 中,我们定义了一个文件,它的path指向我们的 csv 文件的位置。start_position 指向beginning。如果对于一个实时的数据源来说,它通常是 ending,这样表示它每次都是从最后拿到那个数据。sincedb_path 通常指向一个文件。这个文件保存上次操作的位置。针对我们的情况,我们设置为 /dev/null,表明,我们不存储这个数据
  • 在 filter 中,CSV filter 是非常直接的,不太需要很多的解释。这里的 column 都来自于我们的 csv 表格。通常 Logstash 会把每个数据都当做是字符串。针对我们的情况,我们可看到 mileage 是一个整型数,price_eur 是一个浮点数。这些我们都需要进行调整。这个调整我们可以通过 mutate 来完成
  • 在 output 之中,我们制定本地的 Elasticsearch 为我们的数据库,它的 index 是 cars,同时 document_type 为_doc。我们也同时使用 stdout,这样我们可以在terminal屏幕中看出数据在处理之中
原文地址:https://www.cnblogs.com/fat-girl-spring/p/14338943.html