ElasticSearch&Docker&Kafka

Lucene&ElasticSeach

1 什么是全文检索

1.1 数据分类

我们生活中的数据总体分为两种:结构化数据和非结构化数据。
结构化数据:指具有固定格式或有限长度的数据,如数据库,元数据等。
非结构化数据:指不定长或无固定格式的数据,如邮件,word 文档等磁盘上的文件

1.2 结构化数据搜索

常见的结构化数据也就是数据库中的数据

为什么数据库搜索很容易?

 因为数据库中的数据存储是有规律的,有行有列而且数据格式、数据长度都是固定的。

1.3 非结构化数据查询方法

(1 ) 顺序扫描法(Serial Scanning)

用户搜索----->文件

所谓顺序扫描,比如要找内容包含某一个字符串的文件,就是一个文档一个文档的看,对于每一个文档,从头看到尾,如果此文档包含此字符串,则此文档为我们要找的文件,接着看下一个文件,直到扫描完所有的文件。如利用 windows 的搜索也可以搜索文件内容,只是相当的慢。

(2 ) 全文检索(Full-text Search)

用户通过查询索引库---->生成索引----->文档

全文检索是指计算机索引程序通过扫描文章中的每一个词,对每一个词建立一个索引,指明该词在文章中出现的次数和位置,当用户查询时,检索程序就根据事先建立的索引进行查找,并将查找的结果反馈给用户的检索方法。这个过程类似于通过字典的目录查字的过程。

将非结构化数据中的一部分信息提取出来,重新组织,使其变得有一定结构,然后对此有一定结构的数据进行搜索,从而达到搜索相对较快的目的。这部分从非结构化数据中提取出的然后重新组织的信息,我们称之索引

这种先建立索引,再对索引进行搜索的过程就叫全文检索(Full-Text Search) 。虽然创建索引的过程也是非常耗时的,但是索引一旦创建就可以多次使用,全文检索主要处理的是查询,所以耗时间创建索引是值得的。

1.4 如何实现全文检索

可以使用 Lucene 实现全文检索。Lucene 是 apache 下的一个开放源代码的全文检索引擎工具包。提供了完整的查询引擎和索引引擎,部分文本分析引擎(英文与德文两种西方语言)。Lucene 的目的是为软件开发人员提供一个简单易用的工具包,以方便的在目标系统中实现全文检索的功能。

Lucene适用场景:

  • 在应用中为数据库中的数据提供全文检索实现。
  • 开发独立的搜索引擎服务、系统

Lucene的特性:

1. 稳定、索引性能高

  • 每小时能够索引150GB以上的数据
  • 对内存的要求小,只需要1MB的堆内存
  • 增量索引和批量索引一样快
  • 索引的大小约为索引文本大小的20%~30%

2.高效、准确、高性能的搜索算法

  • 良好的搜索排序
  • 强大的查询方式支持:短语查询、通配符查询、临近查询、范围查询等
  • 支持字段搜索(如标题、作者、内容)
  • 可根据任意字段排序
  • 支持多个索引查询结果合并
  • 支持更新操作和查询操作同时进行
  • 支持高亮、join、分组结果功能
  • 速度快
  • 可扩展排序模块,内置包含向量空间模型、BM25模型可选
  • 可配置存储引擎

 3.跨平台

  • 纯java编写
  • 作为Apache开源许可下的开源项目,你可以在商业或开源项目中使用
  • Lucene有多种语言实现版(如C,C++、Python等),不仅仅是JAVA

Lucene架构:

 

 1.5 全文检索的应用场景

对于数据量大、数据结构不固定的数据可采用全文检索方式搜索,

  • 单机软件的搜索:word、markdown
  • 站内搜索:京东、淘宝、拉勾,索引源是数据库
  • 搜索引擎:百度、Google,索引源是爬虫程序抓取的数据

2 Lucene 实现全文检索的流程说明

2.1 索引和搜索流程图

 

1、绿色表示索引过程,对要搜索的原始内容进行索引构建一个索引库,索引过程包括:
  确定原始内容即要搜索的内容-->采集文档-->创建文档-->分析文档-->索引文档
2、红色表示搜索过程,从索引库中搜索内容,搜索过程包括:
  用户通过搜索界面-->创建查询-->执行搜索,从索引库搜索-->渲染搜索结果

2.2 创建索引

核心概念:

Document:

用户提供的源是一条条记录,它们可以是文本文件、字符串或者数据库表的一条记录等等。一条记录经过索引之后,就是以一个Document的形式存储在索引文件中的。用户进行搜索,也是以Document列表的形式返回。

Field:

一个Document可以包含多个信息域,例如一篇文章可以包含“标题”、“正文”、“最后修改时间”等信息域,这些信息域就是通过Field在Document中存储的。

Field有两个属性可选:存储和索引。通过存储属性你可以控制是否对这个Field进行存储;通过索引属性你可以控制是否对该Field进行索引。

如果对标题和正文进行全文搜索,所以我们要把索引属性设置为真,同时我们希望能直接从搜索结果中提取文章标题,所以我们把标题域的存储属性设置为真,但是由于正文域太大了,我们为了缩小索引文件大小,将正文域的存储属性设置为假,当需要时再直接读取文件;我们只是希望能从搜索解果中提取最后修改时间,不需要对它进行搜索,所以我们把最后修改时间域的存储属性设置为真,索引属性设置为假。上面的三个域涵盖了两个属性的三种组合,还有一种全为假的没有用到,事实上Field不允许你那么设置,因为既不存储又不索引的域是没有意义的。

Term:

Term是搜索的最小单位,它表示文档的一个词语,Term由两部分组成:它表示的词语和这个词语所出现的Field的名称。

网站的索引数据需要提前创建的。以下是创建的过程:

第一步:获得原始文档:就是从mysql数据库中通过sql语句查询需要创建索引的数据

第二步:创建文档对象(Document),把查询的内容构建成lucene能识别的Document对象,获取原始内容的目的是为了索引,在索引前需要将原始内容创建成文档,文档中包括一个一个的域(Field),这个域对应就是表中的列。

注意:每个 Document 可以有多个 Field,不同的 Document 可以有不同的 Field,同一个Document可以有相同的 Field(域名和域值都相同)。每个文档都有一个唯一的编号,就是文档 id。

第三步:分析文档

将原始内容创建为包含域(Field)的文档(document),需要再对域中的内容进行分析,分析的过程是经过对原始文档提取单词、将字母转为小写、去除标点符号、去除停用词等过程生成最终的语汇单元,可以将语汇单元理解为一个一个的单词。

分好的词会组成索引库中最小的单元:term,一个term由域名和词组成

第四步:创建索引,

对所有文档分析得出的语汇单元进行索引,索引的目的是为了搜索,最终要实现只搜索被索引的语汇单元从而找到 Document(文档)。

注意:创建索引是对语汇单元索引,通过词语找文档,这种索引的结构叫 倒排索引结构。
倒排索引结构是根据内容(词语)找文档,如下图:

倒排索引结构也叫反向索引结构,包括索引和文档两部分,索引即词汇表,它的规模较小,而文档集合较大。

2.3 倒排索引

倒排索引记录每个词条出现在哪些文档,及在文档中的位置,可以根据词条快速定位到包含这个词条的文档及出现的位置。

文档:索引库中的每一条原始数据,例如一个商品信息、一个职位信息

词条:原始数据按照分词算法进行分词,得到的每一个词

创建倒排索引,分为以下几步:
1)创建文档列表:
lucene首先对原始文档数据进行编号(DocID),形成列表,就是一个文档列表
2)创建倒排索引列表
对文档中数据进行分词,得到词条(分词后的一个又一个词)。对词条进行编号,以词条创建索引。然后记录下包含该词条的所有文档编号(及其它信息)。
搜索的过程:
 

搜索的过程:
当用户输入任意的词条时,首先对用户输入的数据进行分词,得到用户要搜索的所有词条,然后拿着这些词条去倒排索引列表中进行匹配。找到这些词条就能找到包含这些词条的所有文档的编号。然后根据这些编号去文档列表中找到文档

2.4 查询索引 

 查询索引也是搜索的过程。搜索就是用户输入关键字,从索引(index)中进行搜索的过程。根据关键字搜索索引,根据索引找到对应的文档

第一步:创建用户接口:用户输入关键字的地方
第二步:创建查询 指定查询的域名和关键字 
第三步:执行查询 
第四步:渲染结果 (结果内容显示到页面上 关键字需要高亮) 

3 Lucene实战

3.1 需求说明

 生成职位信息索引库,从索引库检索数据

3.2 准备开发环境

第一步:创建一个maven工程 ,已经学过Spring Boot,我们就创建一个SpringBoot项目
第二步:导入依赖

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
        <relativePath/>
    </parent>
    <groupId>com.lagou</groupId>
    <artifactId>lucene</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>lucene</name>

    <dependencies>
        <!--web依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--测试依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!--lombok工具-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.4</version>
            <scope>provided</scope>
        </dependency>
        <!--热部署-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <optional>true</optional>
        </dependency>
        <!--mybatis-plus-->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.3.2</version>
        </dependency>
        <!--pojo持久化使用-->
        <dependency>
            <groupId>javax.persistence</groupId>
            <artifactId>javax.persistence-api</artifactId>
            <version>2.2</version>
        </dependency>
        <!--mysql驱动-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <!--引入Lucene核心包及分词器包-->
        <dependency>
            <groupId>org.apache.lucene</groupId>
            <artifactId>lucene-core</artifactId>
            <version>4.10.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.lucene</groupId>
            <artifactId>lucene-analyzers-common</artifactId>
            <version>4.10.3</version>
        </dependency>
        <dependency>
            <groupId>org.testng</groupId>
            <artifactId>testng</artifactId>
            <version>RELEASE</version>
            <scope>test</scope>
        </dependency>
        <!--IK中文分词器-->
        <dependency>
            <groupId>com.janeluo</groupId>
            <artifactId>ikanalyzer</artifactId>
            <version>2012_u6</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!--编译插件-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>11</source>
                    <target>11</target>
                    <encoding>utf-8</encoding>
                </configuration>
            </plugin>
            <!--打包插件-->
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

第三步:创建引导类

@SpringBootApplication
@MapperScan("com.lagou.lucene.mapper")
public class LuceneApplication {

    public static void main(String[] args) {
        SpringApplication.run(LuceneApplication.class, args);
    }

}

第四步:配置properties文件

server:
  port: 9000
Spring:
  application:
    name: lagou-lucene
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/es?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC
    username: root
    password: 123456

#开启驼峰命名匹配映射
mybatis:
  configuration:
    map-underscore-to-camel-case: true

第五步:创建实体类、mapper、service

实体类

@Data
@Table(name = "job_info")
public class JobInfo {
  @Id
  private long id;
  private String companyName;
  private String companyAddr;
  private String companyInfo;
  private String jobName;
  private String jobAddr;
  private String jobInfo;
  private int salaryMin;
  private int salaryMax;
  private String url;
  private String time;

}

mapper

public interface JobInfoMapper extends BaseMapper<JobInfo> {

}

service (只是测试,这个可以省略)

@Service
public class JobInfoServiceImpl implements JobInfoService{

    @Autowired
    private JobInfoMapper jobInfoMapper;

    @Override
    public JobInfo selectById(long id) {
        return jobInfoMapper.selectById(id);
    }

    @Override
    public List<JobInfo> selectAll() {
        QueryWrapper<JobInfo> queryWrapper = new QueryWrapper<>();
        List<JobInfo> jobInfoList = jobInfoMapper.selectList(queryWrapper);
        return jobInfoList;
    }
}

整体结构:

3.3创建索引 

在test下创建一个包com.lucene

@RunWith(SpringRunner.class)
@SpringBootTest
public class LuceneIndexTest {

    @Autowired
    private JobInfoService jobInfoService;

    /**
     * 创建索引
     */
    @Test
    public void create()throws Exception{
        //1.指定索引文件的存储位置,索引具体的表现形式就是一组有规则的文件
        Directory directory = FSDirectory.open(new File("D:/class/index"));
        //2.配置版本及其分词器
        Analyzer analyzer = new IKAnalyzer();
        IndexWriterConfig config = new IndexWriterConfig(Version.LATEST,analyzer);
        //3.创建IndexWriter对象,作用就是创建索引
        IndexWriter indexWriter = new IndexWriter(directory,config);
        //先删除已经存在的索引库
        indexWriter.deleteAll();
        //4.获得索引源/原始数据
        List<JobInfo> jobInfoList = jobInfoService.selectAll();
        //5. 遍历jobInfoList,每次遍历创建一个Document对象
        for (JobInfo jobInfo: jobInfoList) {
            //创建Document对象
            Document document = new Document();
            //创建Field对象,添加到document中
            document.add(new LongField("id",jobInfo.getId(), Field.Store.YES));
            //切分词、索引、存储
            document.add(new TextField("companyName",jobInfo.getCompanyName(), Field.Store.YES));
            document.add(new TextField("companyAddr",jobInfo.getCompanyAddr(), Field.Store.YES));
            document.add(new TextField("companyInfo",jobInfo.getCompanyInfo(), Field.Store.YES));
            document.add(new TextField("jobName",jobInfo.getJobName(), Field.Store.YES));
            document.add(new TextField("jobAddr",jobInfo.getJobAddr(), Field.Store.YES));
            document.add(new TextField("jobInfo",jobInfo.getJobInfo(), Field.Store.YES));
            document.add(new IntField("salaryMin",jobInfo.getSalaryMin(), Field.Store.YES));
            document.add(new IntField("salaryMax",jobInfo.getSalaryMax(), Field.Store.YES));
            document.add(new StringField("url",jobInfo.getUrl(), Field.Store.YES));
            //将文档追加到索引库中
            indexWriter.addDocument(document);
        }
        //关闭资源
        indexWriter.close();
        System.out.println("create index success!");
    }
}

生成的索引目录:D:classindex

索引(Index):

  • 在Lucene中一个索引是放在一个文件夹中的。
  • 如下图,同一文件夹中的所有的文件构成一个Lucene索引。

段(Segment):

  • 按层次保存了从索引,一直到词的包含关系:索引(Index) –> 段(segment) –> 文档(Document) –> 域(Field) –> 词(Term)
  • 也即此索引包含了那些段,每个段包含了那些文档,每个文档包含了那些域,每个域包含了那些词。
  • 一个索引可以包含多个段,段与段之间是独立的,添加新文档可以生成新的段,不同的段可以合并。
  • 如上图,具有相同前缀文件的属同一个段,图中共一个段 "_0" 。
  • segments.gen和segments_1是段的元数据文件,也即它们保存了段的属性信息。

Field的特性:

  • Document(文档)是Field(域)的承载体, 一个Document由多个Field组成. Field由名称和值两部分组成
  • Field的值是要索引的内容, 也是要搜索的内容.
  • 是否分词(tokenized)
    • 是: 将Field的值进行分词处理, 分词的目的是为了索引. 如: 商品名称, 商品描述. 这些内容用户会通过输入关键词进行查询, 由于内容多样, 需要进行分词处理建立索引.
    • 否: 不做分词处理. 如: 订单编号, 身份证号, 是一个整体, 分词以后就失去了意义, 故不需要分词
  • 是否索引(indexed)
    • 是: 将Field内容进行分词处理后得到的词(或整体Field内容)建立索引, 存储到索引域. 索引的目的是为了搜索. 如: 商品名称, 商品描述需要分词建立索引. 订单编号, 身份证号作为整体建立索引. 只要可能作为用户查询条件的词, 都需要索引.
    • 否: 不索引. 如: 商品图片路径, 不会作为查询条件, 不需要建立索引.
  • 是否存储(stored)
    • 是: 将Field值保存到Document中. 如: 商品名称, 商品价格. 凡是将来在搜索结果页面展现给用户的内容, 都需要存储.
    • 否: 不存储. 如: 商品描述. 内容多格式大, 不需要直接在搜索结果页面展现, 不做存储. 需要的时候可以从关系数据库取.
  • 常用的Field类型:

 3.4查询索引

@Test
    public void query()throws Exception{
        //1.指定索引文件的存储位置,索引具体的表现形式就是一组有规则的文件
        Directory directory = FSDirectory.open(new File("D:/class/index"));
        //2.IndexReader对象
        IndexReader indexReader = DirectoryReader.open(directory);
        //3.创建查询对象,IndexSearcher
        IndexSearcher indexSearcher = new IndexSearcher(indexReader);
        //使用term,查询公司名称中包含"北京"的所有的文档对象
        Query query = new TermQuery(new Term("companyName","北京"));
        TopDocs topDocs = indexSearcher.search(query, 100);
        //获得符合查询条件的文档数
        int totalHits = topDocs.totalHits;
        System.out.println("符合条件的文档数:"+totalHits);
        //获得命中的文档  ScoreDoc封装了文档id信息
        ScoreDoc[] scoreDocs = topDocs.scoreDocs;
        for(ScoreDoc scoreDoc : scoreDocs){
            //文档id
            int docId = scoreDoc.doc;
            //通过文档id获得文档对象
            Document doc = indexSearcher.doc(docId);
            System.out.println("id:"+doc.get("id"));
            System.out.println("companyName:"+doc.get("companyName"));
            System.out.println("companyAddr:"+doc.get("companyAddr"));
            System.out.println("companyInfo:"+doc.get("companyInfo"));
            System.out.println("jobName:"+doc.get("jobName"));
            System.out.println("jobInfo:"+doc.get("jobInfo"));
            System.out.println("*******************************************");
        }
        //资源释放
        indexReader.close();
    }

查看结果你会发现,居然没有数据,如果把查询的关键字“北京”那里改为“北”或“京”就可以,原因是因为中文会一个字一个字的分词,显然是不合适的,所以我们需要使用可以合理分词的分词器,其中最有名的是IKAnalyzer分词器

3.5中文分词器的使用

第一步:导依赖

<!--IK中文分词器-->
<dependency>
    <groupId>com.janeluo</groupId>
    <artifactId>ikanalyzer</artifactId>
    <version>2012_u6</version>
</dependency>

第二步:可以添加配置文件,放入到resources文件夹中。


第三步 创建索引时使用IKanalyzer

把原来的索引数据删除,再重新生成索引文件,再使用关键字“北京”就可以查询到结果了
一个大型网站中的索引数据会很庞大的,所以使用lucene这种原生的写代码的方式就不合适了,所以需要借助一个成熟的项目或软件来实现,目前比较有名是solr和elasticSearch.

4.Elastic search介绍和安装

ELK技术栈说明

Elastic有一条完整的产品线:Elasticsearch、Logstash、Kibana等,前面说的三个就是大家常说的ELK技术栈(开源实时日志分析平台)。

Logstash 的作用就是一个数据收集器,将各种格式各种渠道的数据通过它收集解析之后格式化输出到Elasticsearch ,最后再由Kibana 提供的比较友好的 Web 界面进行汇总、分析、搜索。

ELK 内部实际就是个管道结构,数据从 Logstash 到 Elasticsearch 再到 Kibana 做可视化展示。这三个组件各自也可以单独使用,比如 Logstash 不仅可以将数据输出到Elasticsearch ,也可以到数据库、缓存等

4.1.简介

4.1.1.Elastic

Elastic官网:https://www.elastic.co/cn/

Elastic有一条完整的产品线:Elasticsearch、Logstash、Kibana等,前面说的三个就是大家常说的ELK技术栈。

4.1.2.Elasticsearch

Elasticsearch官网:https://www.elastic.co/cn/products/elasticsearch

功能:

  • 分布式的搜索引擎:百度、Google、站内搜索
  • 全文检索:提供模糊搜索等自动度很高的查询方式,并进行相关性排名,高亮等功能
  • 数据分析引擎(分组聚合):电商网站—一周内手机销量Top10
  • 对海量数据进行近乎实时处理:水平扩展,每秒钟可处理海量事件,同时能够自动管理索引和查询在集群中的分布方式,以实现极其流畅的操作。

Elasticsearch具备以下特点:

  • 分布式:节点对外表现对等,每个节点都可以作为入门,加入节点自动负载均衡
  • JSON:输入输出格式是JSON
  • Restful风格,一切API都遵循Rest原则,容易上手
  • 近实时搜索,数据更新在Elasticsearch中几乎是完全同步的,数据检索近乎实时
  • 安装方便:没有其它依赖,下载后安装很方便,简单修改几个参数就可以搭建集群
  • 支持超大数据:可以扩展到PB级别的结构化和非结构化数据

4.1.3.版本

目前Elasticsearch最新的版本是7.x,企业内目前用的比较多是6.x,我们以6.2.4进行讲解,需要JDK1.8及以上。

4.2.安装和配置

为了快速看到效果我们直接在本地window下安装Elasticsearch。环境要求:JDK8及以上版本

第一步:把今天资料文件夹中准备好的软件放到一个没有中文没有空格的位置,解压即可

第二步:修改配置文件

1、修改索引数据和日志数据存储的路径

第33行和37行,修改完记得把注释打开

path.data: d:classesdata
#
# Path to log files:
#
path.logs: d:classeslog

第三步:进入bin目录中直接双击 图下的命令文件。

如果启动失败(估计好多同学都会启动失败的),需要修改虚拟机内存的大小找到jvm.options文件 如图修改

Xms 是指设定程序启动时占用内存大小。一般来讲,大点,程序会启动的快一点,但是也可能会导致机器暂时间变慢。

Xmx 是指设定程序运行期间最大可占用的内存大小。如果程序运行需要占用更多的内存,超出了这个设置值,就会抛出OutOfMemory异常。

4.3.访问

可以看到绑定了两个端口:

9300:集群节点间通讯接口,接收tcp协议

9200:客户端访问接口,接收Http协议

我们在浏览器中访问:http://127.0.0.1:9200

4.4.安装kibana

4.4.1.什么是Kibana

Kibana是一个基于Node.js的Elasticsearch索引库数据统计工具,可以利用Elasticsearch的聚合功能,生成各种图表,如柱形图,线状图,饼图等。

而且还提供了操作Elasticsearch索引数据的控制台,并且提供了一定的API提示,非常有利于我们学习Elasticsearch的语法。

4.4.2.安装

因为Kibana依赖于node,需要在windows下先安装Node.js

然后安装kibana,最新版本与elasticsearch保持一致,也是6.2.4

4.4.3.配置运行

进入安装目录下的config目录,修改kibana.yml文件的第21行(注释放开即可):

运行

4.4.4.控制台

访问:http://127.0.0.1:5601

选择左侧的DevTools菜单,即可进入控制台页面:

在页面右侧,我们就可以输入请求,访问Elasticsearch了。

 4.5.安装ik分词器

 Lucene的IK分词器早在2012年已经没有维护了,现在我们要使用的是在其基础上维护升级的版本,并且开发为Elasticsearch的集成插件了,与Elasticsearch一起维护升级,版本也保持一致

4.5.1.安装

https://github.com/medcl/elasticsearch-analysis-ik

1. 解压elasticsearch-analysis-ik-6.2.4.zip后,将解压后的文件夹拷贝到elasticsearch-6.2.4plugins下,并重命名文件夹为ik

2. 重新启动ElasticSearch,即可加载IK分词器

4.5.2.测试

在kibana控制台输入下面的请求:

4.6 安装Head插件

4.6.1 elasticsearch-head 简介

elasticsearch-head是一个界面化的集群操作和管理工具,可以对集群进行傻瓜式操作。你可以通过插件把它集成到es(首选方式),也可以安装成一个独立webapp。

es-head主要有三个方面的操作:

1. 显示集群的拓扑,并且能够执行索引和节点级别操作

2. 搜索接口能够查询集群中原始json或表格格式的检索数据

3. 能够快速访问并显示集群的状态

4.6.2 elasticsearch-head 安装

(1)直接下载压缩包,地址:https://files.cnblogs.com/files/sanduzxcvbnm/elasticsearch-head.7z

(2)解压

(3)在谷歌浏览器中点击“加载已解压的压缩程序”,找到elasticsearch-head文件夹,点击打开即可进行安装。

5.使用kibana对索引库操作

5.1.基本概念

5.1.1.节点、集群、分片及副本

1、节点 (node)

一个节点是一个Elasticsearch的实例。

在服务器上启动Elasticsearch之后,就拥有了一个节点。如果在另一台服务器上启动Elasticsearch,这就是另一个节点。甚至可以通过启动多个Elasticsearch进程,在同一台服务器上拥有多个节点。

2、集群(cluster)

多个协同工作的Elasticsearch节点的集合被称为集群。

在多节点的集群上,同样的数据可以在多台服务器上传播。这有助于性能。这同样有助于稳定性,如果每个分片至少有一个副本分片,那么任何一个节点宕机后,Elasticsearch依然可以进行服务,返回所有数据。

但是它也有缺点:必须确定节点之间能够足够快速地通信,并且不会产生脑裂效应(集群的2个部分不能彼此交流,都认为对方宕机了)。

3、分片 (shard)

索引可能会存储大量数据,这些数据可能超过单个节点的硬件限制。例如,十亿个文档的单个索引占用了1TB的磁盘空间,可能不适合单个节点的磁盘,或者可能太慢而无法单独满足来自单个节点的搜索请求。

为了解决此问题,Elasticsearch提供了将索引细分为多个碎片的功能。创建索引时,只需定义所需的分片数量即可。每个分片本身就是一个功能齐全且独立的“索引”,可以托管在群集中的任何节点上。

分片很重要,主要有两个原因:

  • 它允许您水平分割/缩放内容量
  • 它允许您跨碎片(可能在多个节点上)分布和并行化操作,从而提高性能/吞吐量

分片如何分布以及其文档如何聚合回到搜索请求中的机制完全由Elasticsearch管理,并且对您作为用户是透明的。

在随时可能发生故障的网络/云环境中,非常有用,强烈建议您使用故障转移机制,以防碎片/节点因某种原因脱机或消失。为此,Elasticsearch允许您将索引分片的一个或多个副本制作为所谓的副本分片(简称副本)。

4、副本(replica)

分片处理允许用户推送超过单机容量的数据至Elasticsearch集群。副本则解决了访问压力过大时单机无法处理所有请求的问题。

分片可以是主分片,也可以是副本分片,其中副本分片是主分片的完整副本。副本分片用于搜索,或者是在原有的主分片丢失后成为新的主分片。

注意:可以在任何时候改变每个分片的副本分片的数量,因为副本分片总是可以被创建和移除的。这并不适用于索引划分为主分片的数量,在创建索引之前,必须决定主分片的数量。过少的分片将限制可扩展性,但是过多的分片会影响性能。默认设置的5份是一个不错的开始。

5.1.2 文档、类型、索引及映射

1、文档 (document)

Elasticsearch是面向文档的,这意味着索引和搜索数据的最小单位是文档。

在Elasticsearch中文档有几个重要的属性。

它是自我包含的。一篇文档同时包含字段和它们的取值。

它可以是层次的。文档中还包含新的文档,字段还可以包含其他字段和取值。例如,“location”字段可以同时包含“city”和“street“两个字段

它拥有灵活的结构。文档不依赖于预先定义的模式。并非所有的文档都需要拥有相同的字段,它们不受限于同一个模式。

2、类型 (type)

类型是文档的逻辑容器,类似于表格是行的容器。在不同的类型中,最好放入不同结构的文档。例如,可以用一个类型定义聚会时的分组,而另一个类型定义人们参加的活动。

3、索引 (index)

索引是映射类型的容器。一个Elasticsearch索引是独立的大量的文档集合。 每个索引存储在磁盘上的同组文件中,索引存储了所有映射类型的字段,还有一些设置。

4、映射(mapping)

所有文档在写入索引前都将被分析,用户可以设置一些参数,决定如何将输入文本分割为词条,哪些词条应该被过滤掉,或哪些附加处理有必要被调用(比如移除HTML标签)。这就是映射扮演的角色:存储分析链所需的所有信息。

Elasticsearch也是基于Lucene的全文检索库,本质也是存储数据,很多概念与MySQL类似的。

对比关系:

详细说明:

5.2.创建索引库

5.2.1.语法

Elasticsearch采用Rest风格API,因此其API就是一次http请求,你可以用任何工具发起http请求

创建索引的请求格式:

  • 请求方式:PUT
  • 请求路径:/索引库名
  • 请求参数:json格式:
{
"settings": {
"属性名": "属性值"
}
}

settings:就是索引库设置,其中可以定义索引库的各种属性,目前我们可以不设置,都走默认。

5.2.2.使用kibana创建

kibana的控制台,可以对http请求进行简化,示例:

相当于是省去了elasticsearch的服务器地址

而且还有语法提示,非常舒服。

5.3.查看索引库

Get请求可以帮我们查看索引信息,格式:

5.4.删除索引库

删除索引使用DELETE请求


再次查看lagou:

6.使用kibana对类型及映射操作

有了 索引库 ,等于有了数据库中的 database 。接下来就需要索引库中的 类型 了,也就是数据库中的表 。创建数据库表需要设置字段约束,索引库也一样,在创建索引库的类型时,需要知道这个类型下有哪些字段,每个字段有哪些约束信息,这就叫做 字段映射(mapping)

注意:Elasticsearch7.x取消了索引type类型的设置,不允许指定类型,默认为_doc,但字段仍然是有的,我们需要设置字段的约束信息,叫做字段映射(mapping)

字段的约束我们在学习Lucene中我们都见到过,包括到不限于:

  • 字段的数据类型
  • DELETE /索引库名是否要存储
  • 是否要索引
  • 是否分词
  • 分词器是什么

6.1.创建字段映射

类型名称:就是前面讲的type的概念,类似于数据库中的表

字段名:任意填写,下面指定许多属性,例如:

  • type:类型,可以是text、keyword、long、short、date、integer、object等
  • index:是否索引,默认为true
  • store:是否存储,默认为false
  • analyzer:分词器,这里的 ik_max_word 即使用ik分词器
PUT lagou/_mapping/goods
{
    "properties": {
        "title": {
            "type": "text",
            "store": true, 
            "analyzer": "ik_max_word"
        },
        "images": {
            "type": "keyword",
            "store": true, 
            "index": "false"
        },
        "price": {
            "type": "float"
        }
    }
}

上述案例中,就给lagou这个索引库添加了一个名为 goods 的类型,并且在类型中设置了3个字段:

  • title:商品标题
  • images:商品图片
  • price:商品价格

6.2.查看映射关系

查看某个索引库中的所有类型的映射。如果要查看某个类型映射,可以再路径后面跟上类型名称。即:

6.3.映射属性详解 

1)type

Elasticsearch中支持的数据类型非常丰富:

我们说几个关键的:

String类型,又分两种:

  • text:使用文本数据类型的字段,它们会被分词,文本字段不用于排序,很少用于聚合,如文章标题、正文。
  • keyword:关键字数据类型,用于索引结构化内容的字段,不会被分词,必须完整匹配的内容,如邮箱,身份证号。支持聚合

这两种类型都是比较常用的,但有的时候,对于一个字符串字段,我们可能希望他两种都支持,此时,可以利用其多字段特性

"properties": {
    "title":{
        "type": "text",
        "analyzer": "ik_max_word",
        "fields": {
            "sort":{
                "type": "keyword"
            }
        },
        "index": true
    }
}

Numerical:数值类型,分两类

  • 基本数据类型:long、interger、short、byte、double、float、half_float
    • double 双精度64位
    • float 单精度32位
    • half_float 半精度16位
  • 浮点数的高精度类型:scaled_float
    • 带有缩放因子的缩放类型浮点数,依靠一个 long 数字类型通过一个固定的( double 类型)缩放因数进行缩放.
    • 需要指定一个精度因子,比如10或100。elasticsearch会把真实值乘以这个因子后存储,取出时再还原。

Date:日期类型

  • elasticsearch可以对日期格式化为字符串存储,但是建议我们存储为毫秒值,存储为long,节省空间。

Array:数组类型

  • 进行匹配时,任意一个元素满足,都认为满足
  • 排序时,如果升序则用数组中的最小值来排序,如果降序则用数组中的最大值来排序

字符串数组:["one", "two"]
整数数组:[1,2]
数组的数组:[1, [2, 3]],等价于[1,2,3]
对象数组:[ { "name": "Mary", "age": 12 }, { "name": "John", "age": 10 }]

Object:对象

JSON文档本质上是分层的:文档包含内部对象,内部对象本身还包含内部对象。

{
    "region": "US",
    "manager.age": 30,
    "manager.name ": "John Smith"
}
索引方法如下:
{
    "mappings": {
        "properties": {
            "region": { "type": "keyword" },
            "manager": {
                "properties": {
                    "age": { "type": "integer" },
                    "name": { "type": "text" }
                }
            }
        }
    }
}

ip地址

PUT my_index
{
    "mappings": {
        "_doc": {
            "properties": {
                "ip_addr": {
                    "type": "ip"
                }
            }
        }
    }
}
PUT my_index/_doc/1
{
    "ip_addr": "192.168.1.1"
}
GET my_index/_search
{
    "query": {
        "term": {
            "ip_addr": "192.168.0.0/16"
        }
    }
}

2)index
index影响字段的索引情况。

  • true:字段会被索引,则可以用来进行搜索过滤。默认值就是true,只有当某一个字段的index值设置为true时,检索ES才可以作为条件去检索。
  • false:字段不会被索引,不能用来搜索

index的默认值就是true,也就是说你不进行任何配置,所有字段都会被索引。

但是有些字段是我们不希望被索引的,比如商品的图片信息(URL),就需要手动设置index为false。

3)store

是否将数据进行额外存储。

在学习lucene时,我们知道如果一个字段的store设置为false,那么在文档列表中就不会有这个字段的值,用户的搜索结果中不会显示出来。

但是在Elasticsearch中,即便store设置为false,也可以搜索到结果。

原因是Elasticsearch在创建文档索引时,会将文档中的原始数据备份,保存到一个叫做 _source 的属性中。而且我们可以通过过滤 _source 来选择哪些要显示,哪些不显示。

而如果设置store为true,就会在 _source 以外额外存储一份数据,多余,因此一般我们都会将store设置为false,事实上,store的默认值就是false。

在某些情况下,这对 store 某个领域可能是有意义的。例如,如果您的文档包含一个 title ,一个date 和一个非常大的 content 字段,则可能只想检索the title 和the date 而不必从一个大 _source字段中提取这些字段:

PUT my_index
{
    "mappings": {
        "_doc": {
            "properties": {
                "title": {
                    "type": "text",
                    "store": true
                },
                "date": {
                    "type": "date",
                    "store": true
                },
                "content": {
                    "type": "text"
                }
            }
        }
    }
}

4)boost
网站权重:网站权重是指搜索引擎给网站(包括网页)赋予一定的权威值,对网站(含网页)权威的评估评价。一个网站权重越高,在搜索引擎所占的份量越大,在搜索引擎排名就越好。提高网站权重,不但利于网站(包括网页)在搜索引擎的排名更靠前,还能提高整站的流量,提高网站信任度。所以提高网站的权重具有相当重要的意义。 权重即网站在SEO中的重要性,权威性。英文:Page Strength。1、权重不等于排名 2、权重对排名有着非常大的影响 3、整站权重的提高有利于内页的排名。

权重,新增数据时,可以指定该数据的权重,权重越高,得分越高,排名越靠前。

PUT my_index
{
    "mappings": {
        "_doc": {
            "properties": {
                "title": {
                    "type": "text",
                    "boost": 2
                },
                "content": {
                    "type": "text"
                }
            }
        }
    }
}

title 字段上的匹配项的权重是字段上的匹配项的权重的两倍 content ,默认 boost 值为 1.0 。

提升仅适用于Term查询(不提升prefix,range和模糊查询)。

6.4.一次创建索引库和类型

刚才 的案例中我们是把创建索引库和类型分开来做,其实也可以在创建索引库的同时,直接制定索引库中的类型,基本语法:

put /索引库名
{
    "settings":{
        "索引库属性名":"索引库属性值"
    },
    "mappings":{
        "类型名":{
            "properties":{
                "字段名":{
                    "映射属性名":"映射属性值"
                }
            }
        }
    }
}

来试一下吧:

PUT /lagou2
{
    "settings": {},
    "mappings": {
        "goods": {
            "properties": {
                "title": {
                    "type": "text",
                    "analyzer": "ik_max_word"
                }
            }
        }
    }
}

7.使用kibana对文档操作

文档,即索引库中某个类型下的数据,会根据规则创建索引,将来用来搜索。可以类比做数据库中的每一行数据。

7.1.新增文档

7.1.1.新增并随机生成id

通过POST请求,可以向一个已经存在的索引库中添加文档数据。


可以看到结果显示为: created ,应该是创建成功了。

另外,需要注意的是,在响应结果中有个 _id 字段,这个就是这条文档数据的 唯一标示 ,以后的增删改查都依赖这个id作为唯一标示。

可以看到id的值为: tURGznQB29tVfg_iWHfl ,这里我们新增时没有指定id,所以是ES帮我们随机生成的id。

7.2.查看文档

根据rest风格,新增是post,查询应该是get,不过查询一般都需要条件,这里我们把刚刚生成数据的id带上。

通过kibana查看数据:

GET /lagou/goods/tURGznQB29tVfg_iWHfl

_source :源文档信息,所有的数据都在里面。

_id :这条文档的唯一标识

自动生成的id,长度为20个字符,URL安全,base64编码,GUID(全局唯一标识符),分布式系统并行生成时不可能会发生冲突

在实际开发中不建议使用ES生成的ID,太长且为字符串类型,检索时效率低。建议:将数据表中唯一的ID,作为ES的文档ID

7.3.新增文档并自定义id

如果我们想要自己新增的时候指定id,可以这么做:

POST /索引库名/类型/id值
{
    ...
}

7.4.修改数据

PUT:修改文档

POST:新增文档

把刚才新增的请求方式改为PUT,就是修改了。不过修改必须指定id,

id对应文档不存在,则新增

id对应文档存在,则修改

7.5.删除数据

7.6.智能判断

刚刚我们在新增数据时,添加的字段都是提前在类型中定义过的,如果我们添加的字段并没有提前定义过,能够成功吗?

事实上Elasticsearch非常智能,你不需要给索引库设置任何mapping映射,它也可以根据你输入的数据来判断类型,动态添加数据映射。

subtitle是String类型数据,ES无法智能判断,它就会存入两个字段。例如:

  • subtitle:text类型
  • subtitle.keyword:keyword类型

这种智能映射,底层原理是动态模板映射,如果我们想修改这种智能映射的规则,其实只要修改动态模板即可!

7.7.动态映射模板

动态模板的语法:

1)模板名称,随便起

2)匹配条件,凡是符合条件的未定义字段,都会按照这个规则来映射

3)映射规则,匹配成功后的映射规则

举例,我们可以把所有未映射的string类型数据自动映射为keyword类型:

在这个案例中,我们把做了两个映射配置:

  • title字段:统一映射为text类型,并制定分词器
  • 其它字段:只要是string类型,统一都处理为keyword类型。

这样,未知的string类型数据就不会被映射为text和keyword并存,而是统一以keyword来处理!

我们试试看新增一个数据:

我们只对title做了配置,现在来看看images和price会被映射为什么类型呢:

可以看到images被映射成了keyword,而非之前的text和keyword并存,说明我们的动态模板生效了!

8.查询(重点)

8.1.基本查询:

这里的query代表一个查询对象,里面可以有不同的查询属性

查询类型:

  • 例如: match_all , match , term , range 等等
  • 查询条件:查询条件会根据类型的不同,写法也有差异,后面详细讲解

8.1.1 查询所有(match_all)

query :代表查询对象

match_all :代表查询所有

 

相关概念详解

  • took:查询花费时间,单位是毫秒
  • time_out:是否超时
  • _shards:分片信息
  • hits:搜索结果总览对象
  • total:搜索到的总条数
  • max_score:所有结果中文档得分的最高分
  • hits:搜索结果的文档对象数组,每个元素是一条搜索到的文档信息
    • _index:索引库
    • _type:文档类型
    • _id:文档id
    • _score:文档得分
    • _source:文档的源数据
  • 文档得分:使用ES时,对于查询出的文档无疑会有文档相似度之别。而理想的排序是和查询条件相关性越高排序越靠前,而这个排序的依据就是_score

8.1.2 匹配查询(match)

match 类型查询,会把查询条件进行分词,然后进行查询,多个词条之间是or的关系

 

and关系

某些情况下,我们需要更精确查找:比如在电商平台精确搜索商品时,我们希望这个关系(查询条件切分词之后的关系)变成 and (既要满足你,又要满足我),可以这样做:

8.1.3 词条匹配(term)

term 查询被用于精确值 匹配,这些精确值可能是数字、时间、布尔或者那些未分词的字符串,keyword类型的字符串

效果类似于:select * from tableName where colName='value';

GET /lagou/_search
{
    "query":{
        "term":{
            "price":2699.00
        }
    }
}

8.1.4 布尔组合(bool)

bool 把各种其它查询通过 must (与)、 must_not (非)、 should (或)的方式进行组合

8.1.5 范围查询(range)

range 查询找出那些落在指定区间内的数字或者时间

range 查询允许以下字符:

 

8.1.6 模糊查询(fuzzy)

fuzzy 查询是 term 查询的模糊等价,很少直接使用它

fuzzy 查询是 term 查询的模糊等价。它允许用户搜索词条与实际词条的拼写出现偏差,但是偏差的编辑距离不得超过2:

8.2.结果过滤

默认情况下,elasticsearch在搜索的结果中,会把文档中保存在 _source 的所有字段都返回。如果我们只想获取其中的部分字段,我们可以添加 _source 的过滤

8.2.1.直接指定字段

8.2.2.指定includes和excludes

includes:来指定想要显示的字段

excludes:来指定不想要显示的字段

 

8.3 过滤(filter)

Elasticsearch 使用的查询语言(DSL)拥有一套查询组件,这些组件可以以无限组合的方式进行搭配。这套组件可以在以下两种情况下使用:过滤情况(filtering context)和查询情况(query context)。

如何选择查询与过滤:

通常的规则是,使用查询(query)语句来进行 全文 搜索或者其它任何需要影响 相关性得分 的搜索。除此以外的情况都使用过滤(filters)。

条件查询中进行过滤

所有的查询都会影响到文档的评分及排名。如果我们需要在查询结果中进行过滤,并且不希望过滤条件影响评分,那么就不要把过滤条件作为查询条件来用。而是使用 filter 方式:

GET /lagou/_search
{
    "query":{
        "bool":{
            "must":{ "match": { "title": "小米手机" }},
            "filter":{
                "range":{"price":{"gt":2000.00,"lt":3800.00}}
            }
        }
    }
}

无查询条件,直接过滤

如果一次查询只有过滤,没有查询条件,不希望进行评分,我们可以使用 constant_score 取代只有filter 语句的 bool 查询。在性能上是完全相同的,但对于提高查询简洁性和清晰度有很大帮助。

GET /lagou/_search
{
    "query":{
        "constant_score": {
            "filter": {
                "range":{"price":{"gt":2000.00,"lt":3000.00}}
            }
        }
    }
}

8.4 排序

8.4.1 单字段排序

sort 可以让我们按照不同的字段进行排序,并且通过 order 指定排序的方式

8.4.2 多字段排序

假定我们想要结合使用 price和 _score(得分) 进行查询,并且匹配的结果首先按照价格排序,然后按照相关性得分排序:

8.5.分页

elasticsearch的分页与mysql数据库非常相似,都是指定两个值:

  • from:目标数据的偏移值(开始位置),默认from为0
  • size:每页大小

8.6.高亮

高亮原理:

  • 服务端搜索数据,得到搜索结果
  • 把搜索结果中,搜索关键字都加上约定好的标签
  • 前端页面提前写好标签的CSS样式,即可高亮

在使用match查询的同时,加上一个highlight属性:

  • pre_tags:前置标签
  • post_tags:后置标签
  • fields:需要高亮的字段

9. 聚合aggregations

聚合可以让我们极其方便的实现对数据的统计、分析。例如:

  • 什么品牌的手机最受欢迎?
  • 这些手机的平均价格、最高价格、最低价格?
  • 这些手机每月的销售情况如何

实现这些统计功能的比数据库的sql要方便的多,而且查询速度非常快,可以实现近实时搜索效果

9.1 基本概念

Elasticsearch中的聚合,包含多种类型,最常用的两种,一个叫 桶 ,一个叫 度量 :

桶(bucket) 类似于 group by

桶的作用,是按照某种方式对数据进行分组,每一组数据在ES中称为一个 桶 ,例如我们根据国籍对人划分,可以得到 中国桶 、 英国桶 , 日本桶 ……或者我们按照年龄段对人进行划分:0~10,10~20,20~30,30~40等。

Elasticsearch中提供的划分桶的方式有很多:

  • Date Histogram Aggregation:根据日期阶梯分组,例如给定阶梯为周,会自动每周分为一组
  • Histogram Aggregation:根据数值阶梯分组,与日期类似,需要知道分组的间隔(interval)
  • Terms Aggregation:根据词条内容分组,词条内容完全匹配的为一组
  • Range Aggregation:数值和日期的范围分组,指定开始和结束,然后按段分组
  • ……

综上所述,我们发现bucket aggregations 只负责对数据进行分组,并不进行计算,因此往往bucket中往往会嵌套另一种聚合:metrics aggregations即度量

度量(metrics) 相当于聚合的结果

分组完成以后,我们一般会对组中的数据进行聚合运算,例如求平均值、最大、最小、求和等,这些在ES中称为 度量

比较常用的一些度量聚合方式:

  • Avg Aggregation:求平均值
  • Max Aggregation:求最大值
  • Min Aggregation:求最小值
  • Percentiles Aggregation:求百分比
  • Stats Aggregation:同时返回avg、max、min、sum、count等
  • Sum Aggregation:求和
  • Top hits Aggregation:求前几
  • Value Count Aggregation:求总数
  • ……

测试聚合,案例演示(数据准备)

注意:在ES中,需要进行聚合、排序、过滤的字段其处理方式比较特殊,因此不能被分词,必须使用keyword 或 数值类型 。这里我们将color和make这两个文字类型的字段设置为keyword类型,这个类型不会被分词,将来就可以参与聚合

9.2 聚合为桶

首先,我们按照 汽车的颜色 color来 划分 桶 ,按照颜色分桶,最好是使用TermAggregation类型,按照颜色的名称来分桶。

size: 查询条数,这里设置为0,因为我们不关心搜索到的数据,只关心聚合结果,提高效率

aggs:声明这是一个聚合查询,是aggregations的缩写

popular_colors:给这次聚合起一个名字,可任意指定。

terms:聚合的类型,这里选择terms,是根据词条内容(这里是颜色)划分

field:划分桶时依赖的字段

结果分析

hits:查询结果为空,因为我们设置了size为0

aggregations:聚合的结果

popular_colors:我们定义的聚合名称

buckets:查找到的桶,每个不同的color字段值都会形成一个桶

key:这个桶对应的color字段的值

doc_count:这个桶中的文档数量

9.3 桶内度量

前面的例子告诉我们每个桶里面的文档数量,这很有用。 但通常,我们的应用需要提供更复杂的文档度量。 例如,每种颜色汽车的平均价格是多少?

因此,我们需要告诉Elasticsearch 使用哪个字段 , 使用何种度量方式 进行运算,这些信息要嵌套在 桶内, 度量 的运算会基于 桶 内的文档进行

现在,我们为刚刚的聚合结果添加 求价格平均值的度量:

aggs:我们在上一个aggs(popular_colors)中添加新的aggs。可见度量也是一个聚合

avg_price:聚合的名称

avg:度量的类型,这里是求平均值

field:度量运算的字段

可以看到每个桶中都有自己的 avg_price 字段,这是度量聚合的结果

10.Elasticsearch集群

10.1.单点的问题

单点的elasticsearch存在哪些可能出现的问题呢?

  • 单台机器存储容量有限,无法实现高存储
  1. 单服务器容易出现单点故障,无法实现高可用
  • 单服务的并发处理能力有限,无法实现高并发

10.2.集群的结构

10.2.1.数据分片

我们可以把数据拆分成多份,每一份存储到不同机器节点(node),从而实现减少每个节点数据量的目的。这就是数据的分布式存储,也叫做: 数据分片(Shard) 。

10.2.2.数据备份

我们可以给每个分片数据进行备份,存储到其它节点,防止数据丢失,这就是数据备份,也叫 数据副本(replica) 。

数据备份可以保证高可用,但是每个分片备份一份,所需要的节点数量就会翻一倍,成本实在是太高了!

为了在高可用和成本间寻求平衡,我们可以这样做:

这样可以大大减少所需要的服务节点数量,如图,我们以3分片,每个分片备份一份为例:

在这个集群中,如果出现单节点故障,并不会导致数据缺失,所以保证了集群的高可用,同时也减少了节点中数据存储量。并且因为是多个节点存储数据,因此用户请求也会分发到不同服务器,并发能力也得到了一定的提升。

10.3.搭建集群

集群需要多台机器,我们这里用一台机器来模拟,因此我们需要在一台虚拟机中部署多个elasticsearch节点,每个elasticsearch的端口都必须不一样。

一台机器进行模拟:将我们的ES的安装包复制三份,修改端口号,data和log存放位置的不同。

实际开发中:将每个ES节点放在不同的服务器上。

我们计划集群名称为:lagou-elastic,部署3个elasticsearch节点,分别是:

node-01:http端口9201,TCP端口9301

node-02:http端口9202,TCP端口9302

node-03:http端口9203,TCP端口9303

http:表示使用http协议进行访问时使用 端口,elasticsearch-head、kibana、postman,默认端口号是9200。

tcp:集群间的各个节点进行通讯的端口,默认9300

搭建步骤:

第一步:复制es软件粘贴3次,分别改名

第二步:修改每一个节点的配置文件 config下的elasticsearch.yml,下面已第一份配置文件为例三个节点的配置文件几乎一致,除了:node.name、path.data、path.log、http.port、transport.tcp.port

node-01:

#允许跨域名访问
http.cors.enabled: true
#当设置允许跨域,默认为*,表示支持所有域名
http.cors.allow-origin: "*"
#允许所有节点访问
network.host: 0.0.0.0
# 集群的名称,同一个集群下所有节点的集群名称应该一致
cluster.name: lagou-elastic
#当前节点名称 每个节点不一样
node.name: node-01
#数据的存放路径 每个节点不一样,不同es服务器对应的data和log存储的路径不能一样
path.data: D:classes-9201data
#日志的存放路径 每个节点不一样
path.logs: D:classes-9201logs
# http协议的对外端口 每个节点不一样,默认:9200
http.port: 9201
# TCP协议对外端口 每个节点不一样,默认:9300
transport.tcp.port: 9301
#三个节点相互发现,包含自己,使用tcp协议的端口号
discovery.zen.ping.unicast.hosts: ["127.0.0.1:9301", "127.0.0.1:9302", "127.0.0.1:9303"]
#声明大于几个的投票主节点有效,请设置为(nodes / 2) + 1
discovery.zen.minimum_master_nodes: 2
# 是否为主节点
node.master: true

node-02:

#允许跨域名访问
http.cors.enabled: true
#当设置允许跨域,默认为*,表示支持所有域名
http.cors.allow-origin: "*"
#允许所有节点访问
network.host: 0.0.0.0
# 集群的名称,同一个集群下所有节点的集群名称应该一致
cluster.name: lagou-elastic
#当前节点名称 每个节点不一样
node.name: node-02
#数据的存放路径 每个节点不一样,不同es服务器对应的data和log存储的路径不能一样
path.data: D:classes-9202data
#日志的存放路径 每个节点不一样
path.logs: D:classes-9202logs
# http协议的对外端口 每个节点不一样,默认:9200
http.port: 9202
# TCP协议对外端口 每个节点不一样,默认:9300
transport.tcp.port: 9302
#三个节点相互发现,包含自己,使用tcp协议的端口号
discovery.zen.ping.unicast.hosts: ["127.0.0.1:9301", "127.0.0.1:9302", "127.0.0.1:9303"]
#声明大于几个的投票主节点有效,请设置为(nodes / 2) + 1
discovery.zen.minimum_master_nodes: 2
# 是否为主节点
node.master: true

node-03:

#允许跨域名访问
http.cors.enabled: true
#当设置允许跨域,默认为*,表示支持所有域名
http.cors.allow-origin: "*"
#允许所有节点访问
network.host: 0.0.0.0
# 集群的名称,同一个集群下所有节点的集群名称应该一致
cluster.name: lagou-elastic
#当前节点名称 每个节点不一样
node.name: node-03
#数据的存放路径 每个节点不一样,不同es服务器对应的data和log存储的路径不能一样
path.data: D:classes-9203data
#日志的存放路径 每个节点不一样
path.logs: D:classes-9203logs
# http协议的对外端口 每个节点不一样,默认:9200
http.port: 9203
# TCP协议对外端口 每个节点不一样,默认:9300
transport.tcp.port: 9303
#三个节点相互发现,包含自己,使用tcp协议的端口号
discovery.zen.ping.unicast.hosts: ["127.0.0.1:9301", "127.0.0.1:9302", "127.0.0.1:9303"]
#声明大于几个的投票主节点有效,请设置为(nodes / 2) + 1
discovery.zen.minimum_master_nodes: 2
# 是否为主节点
node.master: true

第三步:启动集群

把三个节点分别启动,启动时不要着急,要一个一个地启动

使用head插件查看:

10.4.测试集群中创建索引库

1. 配置kibana,再重启

2. 创建索引

在这里创建索引名称为lagou,分片数为3

使用json格式对应API创建索引

PUT /lagou
{
    "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 1
    }
}

number_of_shards:分片数量,这里设置为3

number_of_replicas:副本数量,这里设置为1,每个分片一个备份,一个原始数据,共2份。

3. 通过chrome浏览器的head查看,我们可以查看到分片的存储结构:

可以看到,lagou这个索引库,有三个分片,分别是0、1、2,每个分片有1个副本,共6份。

  • node-01上保存了1号分片和2号分片的副本
  • node-02上保存了0号分片和2号分片的副本
  • node-03上保存了0号分片和1号分片的副本

10.5.集群工作原理

10.5.1.shad与replica机制

(1)一个index包含多个shard,也就是一个index存在多个服务器上

(2)每个shard都是一个最小工作单元,承载部分数据,比如有三台服务器,现在有三条数据,这三条数据在三台服务器上各方一条.

(3)增减节点时,shard会自动在nodes中负载均衡

(4)primary shard(主分片)和replica shard(副本分片),每个document肯定只存在于某一个primary shard以及其对应的replica shard中,不可能存在于多个primary shard

(5)replica shard是primary shard的副本,负责容错,以及承担读请求负载

(6)primary shard的数量在创建索引的时候就固定了,replica shard的数量可以随时修改

(7)primary shard的默认数量是5,replica默认是1(每个主分片一个副本分片),默认有10个shard,5个primary shard,5个replica shard

(8)primary shard不能和自己的replica shard放在同一个节点上(否则节点宕机,primary shard和副本都丢失,起不到容错的作用),但是可以和其他primary shard的replica shard放在同一个节点上

10.5.2.集群写入数据

1. 客户端选择一个node发送请求过去,这个node就是coordinating node (协调节点)

2. coordinating node,对document进行路由,将请求转发给对应的node。(根据一定的算法选择对应的节点进行存储)

3. 实际上的node上的primary shard处理请求,将数据保存在本地,然后将数据同步到replica node

4. coordinating node,如果发现primary node和所有的replica node都搞定之后,就会返回请求到客户端

这个路由简单的说就是取模算法,比如说现在有3台服务器,这个时候传过来的id是5,那么5%3=2,就放在第2台服务器

10.5.3.ES查询数据

倒排序算法

查询有个算法叫倒排序:简单的说就是:通过分词把词语出现的id进行记录下来,再查询的时候先去查到哪些id包含这个数据,然后再根据id把数据查出来

查询过程

1. 客户端发送一个请求给coordinate node

2. 协调节点将搜索的请求转发给所有的shard对应的primary shard 或replica shard

3. query phase(查询阶段):每一个shard 将自己搜索的结果(其实也就是一些唯一标识),返回给协调节点,由协调节点进行数据的合并,排序,分页等操作,产出最后的结果

4. fetch phase(获取阶段) ,接着由协调节点,根据唯一标识去各个节点进行拉取数据,最终返回给客户端

11.Elasticsearch客户端

11.1.客户端介绍

在elasticsearch官网中提供了各种语言的客户端:https://www.elastic.co/guide/en/elasticsearch/client/index.html

注意点击进入后,选择版本到 6.2.4 ,因为我们之前按照的都是 6.2.4 版本:

11.2.创建Demo工程

11.2.1.初始化项目

11.2.2.pom文件

注意,这里我们直接导入了SpringBoot的启动器,方便后续讲解。不过还需要手动引入elasticsearch的High-level-Rest-Client的依赖:

<!--ES高级Rest Client-->
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>6.4.3</version>
</dependency>

11.2.3.配置文件

我们在resource下创建application.yml

11.3.索引库及映射

创建索引库的同时,我们也会创建type及其映射关系,但是这些操作不建议使用java客户端完成,原因如下:

  • 索引库和映射往往是初始化时完成,不需要频繁操作,不如提前配置好
  • 官方提供的创建索引库及映射API非常繁琐,需要通过字符串拼接json结构:

依据商品数据创建索引库

package com.lagou.es.pojo;
public class Product {
    private Long id;
    private String title; //标题
    private String category;// 分类
    private String brand; // 品牌
    private Double price; // 价格
    private String images; // 图片地址
}

分析一下数据结构:

id:可以认为是主键,将来判断数据是否重复的标示,不分词,可以使用keyword类型

title:搜索字段,需要分词,可以用text类型

category:商品分类,这个是整体,不分词,可以使用keyword类型

brand:品牌,与分类类似,不分词,可以使用keyword类型

price:价格,这个是double类型

images:图片,用来展示的字段,不搜索,index为false,不分词,可以使用keyword类型

编写配置映射,创建索引

PUT /lagou
{
    "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1
    },
    "mappings": {
        "item": {
            "properties": {
                "id": {
                    "type": "keyword"
                },
                "title": {
                    "type": "text",
                    "analyzer": "ik_max_word"
                },
                "category": {
                    "type": "keyword"
                },
                "brand": {
                    "type": "keyword"
                },
                "images": {
                    "type": "keyword",
                    "index": false
                },
                "price": {
                    "type": "double"
                }
            }
        }
    }
}

11.4.索引数据操作

11.4.1.初始化客户端

编写一个测试类

然后再@Before的方法中编写client初始化:

@RunWith(SpringRunner.class)
@SpringBootTest
public class ElasticsearchDemoApplicationTests {

    private RestHighLevelClient restHighLevelClient;
    private Gson gson = new Gson();

    /**
     * 初始化客户端
     */
    @Before
    public void init(){
        RestClientBuilder restClientBuilder = RestClient.builder(
                new HttpHost("127.0.0.1",9201,"http"),
                new HttpHost("127.0.0.1",9202,"http"),
                new HttpHost("127.0.0.1",9203,"http")
        );
        restHighLevelClient = new RestHighLevelClient(restClientBuilder);
    }
    
    /**
     * 关闭客户端
     */
    @After
    public void close() throws IOException{
        restHighLevelClient.close();
    }
}

11.4.2.新增文档

/**
 * 插入文档
 */
@Test
public void testInsert() throws IOException {
    //1.文档数据
    Product product = new Product();
    product.setBrand("华为");
    product.setCategory("手机");
    product.setId(1L);
    product.setImages("http://image.huawei.com/1.jpg");
    product.setPrice(5999.99);
    product.setTitle("华为P30");
    //2.将文档数据转换为json格式
    String source = gson.toJson(product);
    //3.创建索引请求对象 访问哪个索引库、哪个type、指定文档ID
    //public IndexRequest(String index, String type, String id)
    IndexRequest request = new IndexRequest("lagou","item",product.getId().toString());
    request.source(source, XContentType.JSON);
    //4.发出请求
    IndexResponse response = restHighLevelClient.index(request, RequestOptions.DEFAULT);
    System.out.println(response);
}

相应数据:

response = IndexResponse[
    index=item,
    type=docs,
    id=1,
    version=2,
    result=created,
    seqNo=1,
    primaryTerm=1,
    shards={"total":2,"successful":2,"failed":0}
]

11.4.3.查看文档

根据rest风格,查看应该是根据id进行get查询,难点是对结果的解析:

@Test
public void testView() throws IOException {
    //初始化GetRequest对象
    GetRequest getRequest = new GetRequest("lagou","item","1");
    //执行查询
    GetResponse getResponse = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
    //取出数据
    String source = getResponse.getSourceAsString();
    Product product = gson.fromJson(source, Product.class);
    System.out.println(product);
}

11.4.4.修改文档

新增时,如果传递的id是已经存在的,则会完成修改操作,如果不存在,则是新增

11.4.5.删除文档

@Test
public void testDelete() throws IOException{
    //初始化DeleteRequest对象
    DeleteRequest request = new DeleteRequest("lagou","item","1");
    //执行删除
    DeleteResponse response = restHighLevelClient.delete(request, RequestOptions.DEFAULT);
    System.out.println(response);
}

11.5.搜索数据

11.5.1.查询所有match_all

@Test
public void matchAll() throws IOException {
    //创建搜索请求对象
    SearchRequest request = new SearchRequest();
    //查询构建工具
    SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
    //添加查询条件,执行查询类型
    sourceBuilder.query(QueryBuilders.matchAllQuery());
    request.source(sourceBuilder);
    //执行查询
    SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
    //获得查询结果
    SearchHits hits = response.getHits();
    //获得文件数组
    SearchHit[] hitsHits = hits.getHits();
    for(SearchHit searchHit: hitsHits){
        String json = searchHit.getSourceAsString();
        //将json反序列化为Product格式
        Product product = gson.fromJson(json, Product.class);
        System.out.println(product);
    }
}

注意,上面的代码中,搜索条件是通过 sourceBuilder.query(QueryBuilders.matchAllQuery())来添加的。这个 query() 方法接受的参数是: QueryBuilder 接口类型。

这个接口提供了很多实现类,分别对应我们在之前中学习的不同类型的查询,例如:term查询、match查询、range查询、boolean查询等,如图:

因此,我们如果要使用各种不同查询,其实仅仅是传递给 sourceBuilder.query() 方法的参数不同而已。而这些实现类不需要我们去 new ,官方提供了 QueryBuilders 工厂帮我们构建各种实现类:

11.5.2.关键字搜索match

其实搜索类型的变化,仅仅是利用QueryBuilders构建的查询对象不同而已,其他代码基本一致

因此,我们可以把这段代码封装,然后把查询条件作为参数传递:

public void baseQuery(SearchSourceBuilder sourceBuilder) throws IOException {
    //创建搜索请求对象
    SearchRequest request = new SearchRequest();
    //查询构建工具
    request.source(sourceBuilder);
    //执行查询
    SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
    //获得查询结果
    SearchHits hits = response.getHits();
    //获得文件数组
    SearchHit[] hitsHits = hits.getHits();
    for(SearchHit searchHit: hitsHits){
        String json = searchHit.getSourceAsString();
        //将json反序列化为Product格式
        Product product = gson.fromJson(json, Product.class);
        System.out.println(product);
    }
}

调用封装的方法,并传递查询条件:

@Test
public void testMatchQuery() throws IOException {
// 查询构建工具
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// 添加查询条件,通过QueryBuilders获取各种查询
sourceBuilder.query(QueryBuilders.matchQuery("title", "手机"));
basicQuery(sourceBuilder);
}

11.5.3.范围查询range

示例

/**
     * price: 3600 - 4300
     */
    @Test
    public void rangeQuery()throws IOException{
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        //执行查询条件和查询类型
        RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("price");
        rangeQueryBuilder.gte(3600);
        rangeQueryBuilder.lte(4300);
        sourceBuilder.query(rangeQueryBuilder);
        baseQuery(sourceBuilder);;
    }

11.5.4.source过滤

默认情况下,索引库中所有数据都会返回,如果我们想只返回部分字段,可以通过source filter来控制。

11.6.排序

@Test
public void testSortQuery() throws IOException {
    // 创建搜索对象
    SearchRequest request = new SearchRequest();
    // 查询构建工具
    SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
    // 添加查询条件,通过QueryBuilders获取各种查询
    sourceBuilder.query(QueryBuilders.matchAllQuery());
    // 添加排序
    sourceBuilder.sort("price", SortOrder.ASC);
    basicQuery(sourceBuilder);
}

11.7.分页

分页需要视图层传递两个参数给我们:

  • 当前页:page
  • 每页大小:size

而elasticsearch中需要的不是当前页,而是起始位置,还好有公式可以计算出:

  • from-->起始位置,0表示第一条
  • 起始位置:start = (page - 1) * size
  • 第一页:(1-1)*5 = 0
  • 第二页:(2-1)*5 = 5

代码:

@Test
    public void sortAndPage()throws IOException{
        //创建搜索请求对象
        SearchRequest request = new SearchRequest();
        //查询构建工具
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        //添加查询条件,执行查询类型
        sourceBuilder.query(QueryBuilders.matchAllQuery());
        //执行排序 价格降序排序
        sourceBuilder.sort("price", SortOrder.DESC);

        //分页信息
        int pageNumber = 1;
        int pageSize = 3;
        int from = (pageNumber-1)*pageSize;
        //设置分页
        sourceBuilder.from(from);
        sourceBuilder.size(3);

        baseQuery(sourceBuilder);
    }

12.Spring Data Elasticsearch

12.1.什么是SpringDataElasticsearch

Spring Data Elasticsearch(以后简称SDE)是Spring Data项目下的一个子模块。

Spring Data 的使命是给各种数据访问提供统一的编程接口,不管是关系型数据库(如MySQL),还是非关系数据库(如Redis),或者类似Elasticsearch这样的索引数据库。从而简化开发人员的代码,提高开发效率。

Spring Data Elasticsearch的页面:https://projects.spring.io/spring-data-elasticsearch/

特征:

支持Spring的基于 @Configuration 的java配置方式,或者XML配置方式

提供了用于操作ES的便捷工具类 ElasticsearchTemplate 。包括实现文档到POJO之间的自动智能映射。

利用Spring的数据转换服务实现的功能丰富的对象映射

基于注解的元数据映射方式,而且可扩展以支持更多不同的数据格式,可以定义JavaBean:类名、属性

根据持久层接口自动生成对应实现方法,无需人工编写基本操作代码(类似mybatis,根据接口自动得到实现)。当然,也支持人工定制查询

12.2.配置SpringDataElasticsearch

在pom文件中,引入SpringDataElasticsearch的启动器:

<!--Spring data elasticsearch-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>

只需要在resources下新建application.yml文件,引入elasticsearch的host和port即可:

spring:
  data:
    elasticsearch:
      cluster-name: lagou-elastic
      cluster-nodes: 127.0.0.1:9301,127.0.0.1:9302,127.0.0.1:9303

需要注意的是,SpringDataElasticsearch底层使用的不是Elasticsearch提供的RestHighLevelClient,而是TransportClient,并不采用Http协议通信,而是访问elasticsearch对外开放的tcp端口,我们之前集群配置中,设置的分别是:9301,9302,9303

添加引导类:

@SpringBootApplication
public class ElasticsearchDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(ElasticsearchDemoApplication.class, args);
    }
}

12.3.索引库操作

12.3.1.创建索引库

创建实体类,添加注解,指明实体和索引库文档的关系

@Data
@Document(indexName = "lagou",type = "product",shards = 3,replicas = 1)
public class Product {
    @Id //作为document的id
    private Long id;
    @Field(type = FieldType.Text,analyzer = "ik_max_word")
    private String title; //标题
    @Field(type = FieldType.Keyword)
    private String category;// 分类
    @Field(type = FieldType.Keyword)
    private String brand; // 品牌
    @Field(type = FieldType.Double)
    private Double price; // 价格
    @Field(type = FieldType.Keyword,index = false)
    private String images; // 图片地址

    public Product() {
    }

    public Product(Long id, String title, String category, String brand, Double price, String images) {
        this.id = id;
        this.title = title;
        this.category = category;
        this.brand = brand;
        this.price = price;
        this.images = images;
    }
}

注解说明:

@Document:声明索引库配置

  • indexName:索引库名称
  • type:类型名称,默认是“docs”
  • shards:分片数量,默认5
  • replicas:副本数量,默认1

@Id:声明实体类的id

@Field:声明字段属性

  • type:字段的数据类型
  • analyzer:指定分词器类型
  • index:是否创建索引

创建索引库API

@Test
public void createIndex() {
    //创建索引的方法
    template.createIndex(Product.class);
}

12.3.2.创建映射

刚才的注解已经把映射关系也配置上了,所以创建映射只需要这样:

@Test
public void createType() {
    //创建类型映射
    template.putMapping(Product.class);
}

12.4.索引数据CRUD

SDE的索引数据CRUD并没有封装在ElasticsearchTemplate中,而是有一个叫做ElasticsearchRepository的接口:

要自定义接口,继承ElasticsearchRespository:

package com.lagou.es.repository;
import com.lagou.es.pojo.Goods;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;

public interface GoodsRepository extends ElasticsearchRepository<Goods, Long> {
}

12.4.1.创建索引数据

单个创建

@Test
public void insertDocument() {
    Product product = new Product(6L, "小米手机", "手机", "锤子", 3299.99, "http://image.chuizi.com/1.jpg");
    //新增
    productRepository.save(product);
    System.out.println("save success");
}

批量创建:

@Test
public void insertDocuments() {
    Product product1 = new Product(2L, "坚果手机", "手机", "phone", 3299.99, "http://image.chuizi.com/1.jpg");
    Product product2 = new Product(3L, "华为手机", "手机", "phone", 3299.99, "http://image.chuizi.com/1.jpg");
    Product product3 = new Product(4L, "苹果手机", "手机", "phone", 3299.99, "http://image.chuizi.com/1.jpg");
    Product product4 = new Product(5L, "索尼手机", "手机", "phone", 3299.99, "http://image.chuizi.com/1.jpg");
    List<Product> list = new ArrayList<>();
    list.add(product1);
    list.add(product2);
    list.add(product3);
    list.add(product4);
    productRepository.saveAll(list);
    System.out.println("save success");
}

12.4.2.查询索引数据

默认提供了根据id查询,查询所有两个功能:

根据id查询

@Test
public void testQueryById(){
    Optional<Goods> goodsOptional = goodsRepository.findById(3L);
    System.out.println(goodsOptional.orElse(null));
}

查询所有:

@Test
public void testQueryAll(){
    Iterable<Goods> list = goodsRepository.findAll();
    list.forEach(System.out::println);
}

12.4.3.自定义方法查询

ProductRepository提供的查询方法有限,但是它却提供了非常强大的自定义查询功能:

只要遵循SpringData提供的语法,我们可以任意定义方法声明:

/**
 * 当SDE访问索引库时,需要定义一个持久层的接口去继承ElasticsearchRepository接口即可,无需实现
 */
public interface ProductRepository extends ElasticsearchRepository<Product,Long> {

    /**
     * 查询价格范围
     * @param from
     * @param to
     * @return
     */
    List<Product> findByPriceBetween(Double from, Double to);

}

无需写实现,SDE会自动帮我们实现该方法,我们只需要用即可:

@Test
public void testQueryByPrice(){
    List<Product> list = goodsRepository.findByPriceBetween(1000d, 4000d);
    list.forEach(System.out::println);
}

支持的一些语法示例:

12.5.原生查询

如果觉得上述接口依然不符合你的需求,SDE也支持原生查询,这个时候还是使用ElasticsearchTemplate

而查询条件的构建是通过一个名为 NativeSearchQueryBuilder 的类来完成的,不过这个类的底层还是使用的原生API中的 QueryBuilders 、 AggregationBuilders 、 HighlightBuilders 等工具。

需求:

查询title中包含小米手机的商品,以价格升序排序,分页查询:每页展示2条,查询第1页。

对查询结果进行聚合分析:获取品牌及个数

示例:

@Test
public void testNativeQuery() {
    // 原生查询构建器
     NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder();
    // 1.1 source过滤
     queryBuilder.withSourceFilter(new FetchSourceFilter(new String[0], new
                    String[0]));
    // 1.2搜索条件
    queryBuilder.withQuery(QueryBuilders.matchQuery("title", "小米手机"));
    // 1.3分页及排序条件
    queryBuilder.withPageable(
                    PageRequest.of(0, 2,
                            Sort.by(Sort.Direction.ASC, "price")));
    // 1.4高亮显示
    // queryBuilder.withHighlightBuilder(new HighlightBuilder().field("title"));
    // 1.5聚合
    queryBuilder.addAggregation(AggregationBuilders.terms("brandAgg").field("brand"
            ));
    // 构建查询条件,并且查询
    AggregatedPage<Product> result = template.queryForPage(queryBuilder.build(),
            Product.class);
    // 2、解析结果:
    // 2.1分页结果
    long total = result.getTotalElements();
    int totalPages = result.getTotalPages();
    List<Product> list = result.getContent();
    System.out.println("总条数 = " + total);
    System.out.println("总页数 = " + totalPages);
    System.out.println(list);
    // 2.2.聚合结果
    Aggregations aggregations = result.getAggregations();
    Terms terms = aggregations.get("brandAgg");
    terms.getBuckets().forEach(b -> {
        System.out.println("品牌 = " + b.getKeyAsString());
        System.out.println("count = " + b.getDocCount());
    });

}

在上述结果基础上,添加支持高亮显示

1、自定义搜索结果映射

/**
 * 自定义结果映射,处理高亮
 */
public class ESSearchResultMapper implements SearchResultMapper {
    /**
     * 完成结果映射
     * 操作的重点应该是将原有的结果:_source取出来,放入高亮的数据
     *
     * @param response
     * @param clazz
     * @param pageable
     * @param <T>
     * @return AggregatedPage需要三个参数进行构建:pageable,List<product>,总记录数
     */
    @Override
    public <T> AggregatedPage<T> mapResults(SearchResponse response, Class<T> clazz, Pageable pageable) {
        //获得总记录数
        long totalHits = response.getHits().getTotalHits();
        //记录列表
        List<T> list = new ArrayList<>();
        //获取原始的搜索结果
        SearchHits hits = response.getHits();
        for (SearchHit hit : hits) {
            if (hits.getHits().length <= 0) {
                return null;
            }
            //获取_source属性中的所有数据
            Map<String, Object> map = hit.getSourceAsMap();
            //获得高亮的字段
            Map<String, HighlightField> highlightFields = hit.getHighlightFields();
            //每个高亮字段都需要进行设置
            for(Map.Entry<String, HighlightField> highlightField : highlightFields.entrySet()){
                //获得高亮的key:高亮字段
                String key = highlightField.getKey();
                //获得value:高亮之后的效果
                HighlightField value = highlightField.getValue();
                //将高亮字段和文本效果放入到map中
                map.put(key,value.getFragments()[0].toString());
            }
            //将map转换为对象
            Gson gson = new Gson();
            //map-->jsonString-->对象
            T t = gson.fromJson(gson.toJson(map), clazz);
            list.add(t);
        }
        return new AggregatedPageImpl<>(list,pageable,totalHits);
    }
}

2、高亮实现:

@Autowired
private ElasticsearchTemplate template;

@Autowired
private ProductRepository productRepository;

/**
 * 需求:
 * 查询title中包含小米手机的商品,以价格升序排序,分页查询:每页展示2条,查询第1页。
 * 对查询结果进行聚合分析:获取品牌及个数
 */
@Test
public void nativeQuery(){
    //1.构架一个原生查询器
    NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder();
    //2.source过滤
    //2.1 参数:final String[] includes, final String[] excludes
    //如果不想执行source过滤可以将该行注释
    queryBuilder.withSourceFilter(new FetchSourceFilter(new String[0],new String[0]));
    //3.查询条件
    queryBuilder.withQuery(QueryBuilders.matchQuery("title","小米手机"));
    //4.设置分页和排序规则
    queryBuilder
            .withPageable(PageRequest.of(0,10, Sort.by(Sort.Direction.DESC,"price")));
    //5.高亮
    HighlightBuilder.Field field = new HighlightBuilder.Field("title");
    field.preTags("<font style='color:red'>");
    field.postTags("</font>");
    queryBuilder.withHighlightFields(field);
    //6.聚合
    queryBuilder.addAggregation(AggregationBuilders.terms("brandAgg").field("brand"));
    //7.查询
    AggregatedPage<Product> result = template.queryForPage(queryBuilder.build(), Product.class,new ESSearchResultMapper());
    //获取结果
    long total = result.getTotalElements();
    //页码
    int totalPages = result.getTotalPages();
    //获得本页的数据集合
    List<Product> content = result.getContent();
    System.out.println(total+"  "+totalPages);
    content.stream().forEach(product -> System.out.println(product));

}

Docker

一、Docker简介

1.1 虚拟化技术

虚拟化技术是一种计算机资源管理技术,是将计算机的各种实体资源,如服务器、网络、内存及存储等,予以抽象、转换后呈现出来。虚拟化技术打破了计算机实体结构间的,不可切割的障碍。使用户可以比原本的组态更好的方式,来应用这些资源。

虚拟化技术主要作用:

  • 高性能的物理硬件产能过剩和老的旧的硬件产能过低的重组重用,透明化底层物理硬件
  • 软件跨环境迁移问题(代码的水土不服)

在一台主机上实现多个操作系统,关键技术就是硬件的虚拟化。

1.2 什么是Docker

首先,我们先来看几个问题:

1.合作开发的时候,在本机可以跑,别人的电脑跑不起来

  • 这里我们拿Java Web应用程序举例,我们一个Java Web应用程序涉及很多东西,比如JDK、tomcat、spring等等。当这些其中某一项版本不一致的时候,可能就会导致应用程序跑不起来这种情况。Docker则将程序直接打包成镜像,直接运行在容器中即可。

2.服务器自己的程序挂了,结果发现是别人程序出了问题把内存吃完了,自己程序因为内存不够就挂了

  • 这种也是一种比较常见的情况,如果你的程序重要性不是特别高的话,公司基本上不可能让你的程序独享一台服务器的,这时候你的服务器就会跟公司其他人的程序共享一台服务器,所以不可避免地就会受到其他程序的干扰,导致自己的程序出现问题。Docker就很好解决了环境隔离的问题,别人程序不会影响到自己的程序。

3.公司要弄一个活动,可能会有大量的流量进来,公司需要再多部署几十台服务器

  • 在没有Docker的情况下,要在几天内部署几十台服务器,这对运维来说是一件非常折磨人的事,而且每台服务器的环境还不一定一样,就会出现各种问题,最后部署地头皮发麻。用Docker的话,我只需要将程序打包到镜像,你要多少台服务,我就给你跑多少容器,极大地提高了部署效率。

Docker 是一个开源的应用容器引擎,诞生于 2013 年初,基于 Go 语言实现, dotCloud 公司出品(后改名为Docker Inc);

Docker 可以让开发者打包他们的应用,以及依赖包到一个轻量级、可移植的容器中,然后发布到任何流行的 Linux 机器上。Docker容器是完全使用沙箱机制,相互隔离,性能开销也极低。

Docker通俗的讲是服务器中高性能的虚拟机,可以将一台物理机虚拟N多台虚拟机的机器,互相之间隔离,互不影响。

想要搞懂Docker,其实看它的两句口号就行。

  • 第一句,是“Build, Ship and Run”。
    • 也就是,“搭建、发送、运行”,三板斧。
  • 第二句口号就是:“Build once,Run anywhere(搭建一次,到处能用)”。
    • Docker 是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器或Windows 机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。

特点:

  • 标准化交付:Docker将应用打包成标准化单元,用于交付、部署;
  • 轻量级:容器及包含了软件运行所需的所有环境,而且非常轻量级
  • 高可靠:容器化的应用程序,可以在任何Linux环境中始终如一的运行
  • 隔离性:容器化的应用程序,具备隔离性,这样多团队可以共享同一Linux系统资源

1.3 容器与虚拟机比较

下面的图片比较了 Docker 和传统虚拟化方式的不同之处,可见Docker是在操作系统层面上实现虚拟化,直接复用本地主机的操作系统,而传统方式则是在硬件层面实现。

相同:容器和虚拟机都是虚拟化技术,具备资源隔离和分配优势

不同:

  • Docker虚拟化的是操作系统,虚拟机虚拟化的是硬件
  • 传统虚拟机可以运行不同的操作系统,Docker主要运行同一类操作系统(Linux)

比较上面两张图,我们发现虚拟机是携带操作系统,本身很小的应用程序却因为携带了操作系统而变得非常大,很笨重。Docker是不携带操作系统的,所以Docker的应用就非常的轻巧

1.4 Docker 基本概念

宿主机:安装Docker守护进程的Linux服务器,称之为宿主机;

镜像(Image):Docker 镜像,就相当于是一个 root 文件系统。除了提供容器运行时所需的程序、库、资源、配置等文件外,还包含了一些为运行时准备的一些配置参数。

容器(Container):镜像运行之后的实体,镜像和容器的关系,就像是面向对象程序设计中的类和对象一样,镜像是静态的定义,容器是镜像运行时的实体。容器可以被创建、启动、停止、删除、暂停等。

仓库(Repository):仓库可看成一个镜像控制中心,用来保存镜像

二、Docker安装与启动

环境准备

前提:虚拟机环境准备

  • 本地电脑安装虚拟机,虚拟机搭配Linux操作系统,在Linux操作系统上安装Docker容器。
  • Docker需要从镜像仓库下载镜像,需要联网。
  • 提供的虚拟机网卡地址:192.168.199.128(根据个人本机具体设置)
  • 账号:root
  • 密码:lagou

准备工作:确保Linux系统能够连接网络

1、调整虚拟网络编辑器:

(1)桥接模式的虚拟机,就像一个在路由器"民政局"那里"上过户口"的成年人,有自己单独的居住地址,虽然和主机住在同一个大院里,但好歹是有户口的人,可以大摇大摆地直接和外面通信。

(2)NAT模式的虚拟机,纯粹就是一个没上过户口的黑户,路由器"民政局"根本不知道有这么个人,自然也不会主动和它通信。即使虚拟机偶尔要向外面发送点的信件,都得交给主机以主机的名义转发出去,主机还专门请了一位叫做NAT的老大爷来专门负责这些虚拟机的发信、收信事宜。

(3)仅主机模式的虚拟机,纯粹是一个彻彻底底的黑奴,不仅没有户口、路由器"民政局"不知道这么号人,还被主机关在小黑屋里,连信件也不准往外发。

其中这个仅主机模式能够保障我们在拔掉网线的情况下继续连接我们的虚拟机,不依靠公网连接,而是依靠物理机和虚拟机的关系连接。在断网的情况下,利用这个模式,我们可以继续连接虚拟机,实现我们的操作。

2、VMware Network Adapter VMnet8网卡设置

3、打开虚拟机文件:

提供的虚拟机文件已经安装了CentOS7,并且在CentOS7上已经安装了Docker环境,IP地址:192.168.200.128

4、连接到虚拟机

2.1 安装

Docker官方建议在Ubuntu中安装Docker软件。因为Docker基于Ubuntu发布,而且Docker出现问题时,Ubuntu系统版本的一般是先打补丁。很多版本在CentOS中,是不支持更新最新补丁包的。没有好的解决方案。

但是,由于我们学习的环境都使用CentOS。因此,这里我们将Docker安装到CentOS上。注意,一定安装在CentOS 7.x及以上版本,CentOS6.x的版本中有Bug!

Docker命令安装

(1)查看电脑上已经已经安装Docker

yum list installed | grep docker

(2)安装docker

yum -y install docker

(3)安装后查看docker版本

docker -v

2.2 Docker守护进程相关命令

启动docker:

systemctl start docker

停止docker:

systemctl stop docker

重启docker:

systemctl restart docker

查看docker状态:

systemctl status docker

开机启动:

systemctl enable docker

查看docker概要信息

docker info

查看docker帮助文档

docker --help

2.3 镜像加速的2个方案

默认情况,将从docker hub(https://hub.docker.com/)下载docker镜像太慢,一般都会配置镜像加速器;

方案一:中科大

中国科学技术大学(ustc)是老牌的linux镜像服务提供者了,还在遥远的ubuntu 5.04版本的时候就在用。ustc的docker镜像加速器速度很快。ustc docker mirror的优势之一就是不需要注册,是真正的公共服务。

https://lug.ustc.edu.cn/wiki/mirrors/help/docker

方案二:阿里云

如果中科大镜像加载速度很慢,建议配置阿里云镜像加速,这个镜像仓库如果不好使,可以自己从阿里云上申请!速度杠杠的~

编辑该文件:

vim /etc/docker/daemon.json

中科大方案

{
    "registry-mirrors": ["https://docker.mirrors.ustc.edu.cn"]
}

阿里云方案

{
    "registry-mirrors": ["https://3ad96kxd.mirror.aliyuncs.com"]
}

三、Docker常用命令

3.1 镜像相关命令

3.1.1 查看镜像

这些镜像都是存储在Docker宿主机的/var/lib/docker目录下

3.1.2 搜索镜像

如果你需要从网络中查找需要的镜像,可以通过以下命令搜索;==注意,必须确保当前系统能联网==

docker search 镜像名称

3.1.3 拉取镜像

拉取镜像:从Docker仓库下载镜像到本地,镜像名称格式为 名称:版本号,如果版本号不指定则是最新的版本。如果不知道镜像版本,可以去docker hub 搜索对应镜像查看。

例如,我要下载centos7镜像

docker pull centos:7

3.1.4 删除镜像

按镜像ID删除镜像

docker rmi 镜像ID

删除之前要确认此镜像已经没有被容器在使用,如果存在正在运行的docker容器,删除会报错“Error: container_delete: Impossible to remove a running container, please stop it first”

删除所有镜像

docker images -q #查看所有镜像的ID

docker rmi `docker images -q` #批量删除镜像

3.2 容器相关命令

3.2.1 查看容器

查看正在运行的容器

docker ps

查看所有容器(查看正在运行的和已经停止运行的)

docker ps –a

docker ps -all

查看最后一次运行的容器

docker ps –l

查看停止的容器

docker ps -f status=exited

3.2.2 创建与启动容器

创建容器命令:

docker run 参数 镜像名称:镜像标签 /bin/bash

创建容器常用的参数说明:

## 命令参数详解
-i:表示运行容器,如果不加该参数那么只是通过镜像创建容器,而不启动。

-t:表示容器启动后会进入其命令行。加入这两个参数后,容器创建就能登录进去。即分配一个伪终端(如果只加it两个参数,创建后就会自动进去容器)。

-d:在run后面加上-d参数,则会创建一个守护式容器在后台运行(这样创建容器后不会自动登录容器)。

--name :为创建的容器命名。

-v:表示目录映射关系(前者是宿主机目录,后者是映射到宿主机上的目录),可以使用多个-v做多个目录或文件映射。注意:最好做目录映射,在宿主机上做修改,然后共享到容器上。

-p:表示端口映射,前者是宿主机端口,后者是容器内的映射端口。可以使用多个-p做多个端口映射,例如:
可以将Docker中Tomcat容器的8080端口映射到宿主机上的某一个端口8080,那么以后访问tomcat只需
要:http://宿主机的IP:8080/

进入容器之后,初始化执行的命令:/bin/bash;可写可不写

(1)交互式容器

  • 使用交互式运行容器,容器运行后直接进入到容器内部,退出容器内部后,容器直接关闭
  • 只有第一次才会这样,以后再启动容器就是一个守护式的。
  • docker run -it --name=容器名称 镜像名称:标签 /bin/bash
  • 这时我们通过ps命令查看,发现可以看到启动的容器,状态为启动状态

(2)守护式容器:

docker run -di --name=容器名称 镜像名称(或镜像ID):标签 /bin/bash

退出当前容器

exit

(3)登录容器/进入容器的目录:

docker exec -it 容器名称 (或者容器ID) /bin/bash

注意:这里的登陆容器之后执行的脚本/bin/bash必须写

3.2.3 停止与启动容器

停止容器:

docker stop 容器名称(或者容器ID)

启动容器:

docker start 容器名称(或者容器ID)

3.2.4 文件拷贝

将宿主机的文件拷贝到容器:

docker cp 需要拷贝的文件或目录 容器名称:容器目录

demo:

#新建一个空文件
touch latou.html
#拷贝到tomcat容器的webapps目录下
docker cp lagou.html 59b35c0bbe6d:/usr/local/tomcat/webapps
#切换到tomcat容器中查看
docker exec -it tomcat容器ID /bin/bash

将文件从容器内拷贝出来:

docker cp 容器名称:容器目录 需要拷贝的文件或目录

demo:

#将copy到tomcat容器的文件再copy出来
docker cp 59b35c0bbe6d:/usr/local/tomcat/webapps/lagou.html ./

3.2.5 目录挂载

我们可以在创建容器的时候,将宿主机的目录与容器内的目录进行映射,这样我们就可以通过修改宿主机某个目录的文件从而去影响容器。

创建容器 添加-v参数 后边为 宿主机目录:容器目录 例如:

docker run -di -v /usr/local/myhtml:/usr/local/myhtml --name=mycentos3 centos:7

3.2.6 查看容器IP地址

docker inspect 容器名称(容器ID)

直接执行下面的命令直接输出IP地址:

docker inspect --format='{{.NetworkSettings.IPAddress}}' 容器名称(容器ID)

3.2.7 删除容器

删除指定的容器,正在运行的容器无法删除

#删除容器
docker rm 容器名称(容器ID)
#删除镜像
docker rmi 镜像ID(镜像名称)

四、Docker数据卷(Volumes)

4.1 数据卷概述

数据卷是宿主机中的一个目录或文件,当容器目录和数据卷目录绑定后,对方的修改会立即同步。

一个数据卷可以被多个容器同时挂载,一个容器也可以被挂载多个数据卷。

简单来说数据卷本质其实是共享文件夹,是宿主机与容器间数据共享的桥梁。

数据卷作用:

  • 容器数据持久化
  • 外部机器和容器间接通信
  • 容器之间数据交换

4.2 数据卷配置方式

(1).1个容器挂载1个数据卷

创建启动容器时,使用 –v 参数 设置数据卷

docker run ... –v 宿主机目录(文件):容器内目录(文件) ...

注意事项:

1. 目录必须是绝对路径

2. 如果宿主机目录不存在,会自动创建

3. 可以挂载多个数据卷

案例:

#拉取centos镜像
docker pull centos:7
#安装启动容器并挂载
docker run -di --name=c1 -v /root/host_data1:/root/c1_data centos:7 /bin/bash

(2).查看容器已挂载的数据卷

我们可以通过以下命令,查看容器中挂载的数据卷

docker inspect 容器名称(容器ID)

(3).1个容器挂载多个数据卷

我们可以通过以下命令,挂载多个数据卷

docker run -di --name=c1 -v /root/host_data1:/root/c1_data1 -v/root/host_data2:/root/c1_data2 centos:7 /bin/bash

(4).多个容器挂载1个数据卷

多个容器挂载1个数据卷,实现数据共享

docker run -di --name=c2 -v /root/host_data_common:/root/c2_data centos:7

docker run -di --name=c3 -v /root/host_data_common:/root/c3_data centos:7

(5)多个容器挂载1个容器(这个容器挂载1个数据卷)

##创建启动c3数据卷容器,使用 –v 参数 设置数据卷
docker run -di --name=c3 -v /root/host_data_common:/root/c3_data centos:7 /bin/bash
##创建启动 c1 c2 容器,使用 –-volumes-from 参数 设置数据卷
docker run -di --name=c1 --volumes-from c3 centos:7 /bin/bash
docker run -di --name=c2 --volumes-from c3 centos:7 /bin/bash

 

五、在Docker中部署软件

5.1 MySQL部署

实现步骤:

1. 搜索MySQL镜像

2. 拉取MySQL镜像

3. 创建容器、设置端口映射、设置数据卷

4. 进入容器操作mysql

5. 使用Navicat连接MySQL

实现过程:

1. 搜索mysql镜像

docker search mysql

2. 拉取mysql镜像

docker pull mysql:5.7

3. 创建容器,设置端口映射、目录映射

参数说明:

-p 3307:3306:将容器的 3306 端口映射到宿主机的 3307 端口。

-v /root/mysql/logs:/logs:将主机目录(/root/mysql)下的 logs 目录挂载到容器中的/logs 日志目录

-v /root/mysql/data:/var/lib/mysql :将宿主机目录(/root/mysql)下的data目录挂载到容器的 /var/lib/mysql 数据目录

-e MYSQL_ROOT_PASSWORD=123456:初始化 root 用户的密码。

4. 进入容器,操作mysql

docker exec –it c_mysql /bin/bash

5. 使用Navicat连接容器中的mysq

因为我们做了端口映射,所以连接的是192.168.200.128:3307

5.2 Tomcat部署

实现步骤:

1. 搜索Tomcat镜像

2. 拉取Tomcat镜像

3. 创建容器、设置端口映射、设置数据卷

4. 向Tomcat中部署服务

5. 使用外部机器访问Tomcat,测试部署服务

实现过程:

1. 搜索tomcat镜像

docker search tomcat

2. 拉取tomcat镜像

docker pull tomcat:8-jdk8

3. 创建容器,设置端口映射、目录映射

docker run -di --name=c_tomcat -p 8080:8080 -v /root/tomcat/webapps:/usr/local/tomcat/webapps tomcat:8-jdk8

参数说明:

-p 8080:8080:将容器的8080端口映射到主机的8080端口

-v /root/tomcat/webapps:/usr/local/tomcat/webapps:将主机目录(/root/tomcat/webapps)挂载到容器的webapps

4. 向Tomcat中部署服务,使用FinalShell文件上传

5. 使用外部机器访问Tomcat,测试部署服务

5.3 Nginx部署

实现步骤:

1. 搜索Nginx镜像

docker search Nginx

2. 拉取Nginx镜像

docker run -di --name=mynginx -p 80:80 nginx

测试访问:http://192.168.200.128:80

5.4 Redis部署

实现步骤:

1. 搜索Redis镜像

2. 拉取Redis镜像

3. 创建容器、设置端口映射

4. 使用外部机器连接Redis,测试

实现过程:

1. 搜索redis镜像

docker search redis

2. 拉取redis镜像

docker pull redis

3. 创建容器,设置端口映射

docker run -id --name=c_redis -p 6379:6379 redis

4. 使用外部机器连接redis,测试

六、迁移与备份

应用场景:

开发环境Docker,在Docker中安装很多的容器,进行对应的配置,将Docker中的运行的容器持久化为镜像,将对应的镜像安装到生产环境中。

1.将开发环境下的Docker中对应的容器持久化为镜像

2.将镜像保存为一个压缩包,发送到生产环境服务器中

3.生产环境中需要将压缩包-->镜像-->容器

6.1、容器保存为镜像

令将容器保存为镜像

docker commit {正在运行容器名称/容器ID} {镜像名称}:{镜像标签}
{ImageName} : {Tag}
# 例如
docker commit dbc261edcdff redis:version_lagou_1.0.0

6.2、镜像备份

将镜像保存为tar 文件

docker save -o {压缩包存放路径} {镜像名称/镜像ID}
# 举例
docker save -o redis.tar redis:version_lagou_1.0.0 #压缩包在生产环境下会还原为一个
image,还原之后的name和tag
# -o :输出到的文件

6.3、镜像恢复与迁移

先删除掉c_tomcat_bak镜像 然后执行此命令进行恢复

docker load -i {备份的镜像文件}
# 举例
docker load -i redis.tar
# -i :指定导入的文件

执行后再次查看镜像,可以看到镜像已经恢复,可以再次运行测试

docker run -di --name=mytomcat -p 8081:8080 -v /root/tomcat/webapps/:/usr/local/tomcat/webapps redis:version_lagou_1.0.0

Kafka

1. 消息队列

1.1 什么是消息队列

队列(Queue):

Queue 是一种先进先出的数据结构,容器

消息(Message):

不同应用之间传送的数据。

消息队列:

我们可以把消息队列比作是一个存放消息的容器,当我们需要使用消息的时候可以取出消息供自己使用。消息队列是分布式系统中重要的组件,使用消息队列主要是为了通过异步处理提高系统性能和削峰、降低系统耦合性。队列 Queue 是一种先进先出的数据结构,所以消费消息时也是按照顺序来消费的。比如生产者发送消息1,2,3...对于消费者就会按照1,2,3...的顺序来消费。

1.2 消息队列的应用场景

消息队列在实际应用中包括如下四个场景:

1) 应用耦合:多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败

2) 异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间;

3) 限流削峰:广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况;

4) 消息驱动的系统:系统分为消息队列、消息生产者、消息消费者,生产者负责产生消息,消费者(可能有多个)负责对消息进行处理

1.2.1 异步处理

1.2.2 应用耦合

1.2.3 限流削峰

1.2.4 消息事件驱动的系统

1.3 消息队列的两种模式

消息队列包括两种模式,点对点模式(point to point, queue)和发布/订阅模式(publish/subscribe,topic)

1.3.1 点对点模式

点对点模式下包括三个角色:

  • 消息队列
  • 发送者 (生产者)
  • 接收者(消费者)

每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,可以放在内存 中也可以持久化,直到他们被消费或超时。

点对点模式特点:

  • 每个消息只有一个消费者,一旦被消费,消息就不再在消息队列中;
  • 发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息;
  • 接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息;

1.3.2 发布/订阅模式

发布/订阅模式下包括三个角色:

  • 角色主题(Topic)
  • 发布者(Publisher)
  • 订阅者(Subscriber)

消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被多个订阅者消费。

发布/订阅模式特点:

  • 每个消息可以有多个订阅者;
  • 发布者和订阅者之间有时间上的依赖性
  • 为了消费消息,订阅者必须保持在线运行。

1.4 消息队列实现机制

1.4.1 JMS

JMS(JAVA Message Service,Java消息服务)是一个Java平台中关于面向消息中间件的API

允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息

是一个消息服务的标准或者说是规范,是 Java 平台上有关面向消息中间件的技术规范

便于消息系统中的 Java 应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口,简化企业应用的开发。

JMS 消息机制主要分为两种模型:PTP 模型和 Pub/Sub 模型。

实现产品:Apache ActiveMQ

1.4.2 AMQP

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。

1.4.3 JMS VS AMQP

1.5常见的消息队列产品

RabbitMQ

  • RabbitMQ 2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。

ActiveMQ

  • ActiveMQ是由Apache出品,ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。它非常快速,支持多种语言的客户端和协议,而且可以非常容易的嵌入到企业的应用环境中,并有许多高级功能

RocketMQ

  • RocketMQ出自 阿里公司的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进,消息可靠性上比 Kafka 更好。RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理等

Kafka

  • Apache Kafka是一个分布式消息发布订阅系统。它最初由LinkedIn公司基于独特的设计实现为一个分布式的提交日志系统( a distributed commit log),之后成为Apache项目的一部分。Kafka系统快速、可扩展并且可持久化。它的分区特性,可复制和可容错都是其不错的特性。

综合上面的材料得出以下两点:

(1)中小型软件公司:
建议选RabbitMQ.一方面,erlang语言天生具备高并发的特性,而且他的管理界面用起来十分方便。正所谓,成也萧何,败也萧何!他的弊端也在这里,虽然RabbitMQ是开源的,然而国内有几个能定制化开发erlang的程序员呢?所幸,RabbitMQ的社区十分活跃,可以解决开发过程中遇到的bug,这点对于中小型公司来说十分重要。不考虑rocketmq和kafka的原因是,一方面中小型软件公司不如互联网公司,数据量没那么大,选消息中间件,应首选功能比较完备的,所以kafka排除。不考虑rocketmq的原因是,rocketmq是阿里出品,如果阿里放弃维护rocketmq,中小型公司一般抽不出人来进行rocketmq的定制化开发,因此不推荐。

(2)大型软件公司:

根据具体使用在rocketMq和kafka之间二选一。一方面,大型软件公司,具备足够的资金搭建分布式环境,也具备足够大的数据量。针对rocketMQ,大型软件公司也可以抽出人手对rocketMQ进行定制化开发,毕竟国内有能力改JAVA源码的人,还是相当多的。至于kafka,根据业务场景选择,如果有日志采集功能,肯定是首选kafka了。

2. kafka的基本介绍

2.1 什么是Kafka

Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

主要应用场景是:日志收集系统和消息系统。

Kafka主要设计目标如下:

以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。

高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。

支持普通服务器每秒百万级写入请求

Memory mapped Files

支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。

同时支持离线数据处理和实时数据处理。

Scale out:支持在线水平扩展

2.2 kafka的特点

(1)解耦。Kafka具备消息系统的优点,只要生产者和消费者数据两端遵循接口约束,就可以自行扩展或修改数据处理的业务过程。

(2)高吞吐量、低延迟。即使在非常廉价的机器上,Kafka也能做到每秒处理几十万条消息,而它的延迟最低只有几毫秒。

(3)持久性。Kafka可以将消息直接持久化在普通磁盘上,且磁盘读写性能优异。

(4)扩展性。Kafka集群支持热扩展,Kaka集群启动运行后,用户可以直接向集群添。

(5)容错性。Kafka会将数据备份到多台服务器节点中,即使Kafka集群中的某一台加新的Kafka服务节点宕机,也不会影响整个系统的功能。

(6)支持多种客户端语言。Kafka支持Java、.NET、PHP、Python等多种语言。

  (7) 支持多生产者和多消费者。

2.3 kafka的主要应用场景

消息处理(MQ)

  • KafKa可以代替传统的消息队列软件,使用KafKa来实现队列有如下优点:
  • KafKa的append来实现消息的追加,保证消息都是有序的有先来后到的顺序,
  • 稳定性强队列在使用中最怕丢失数据,KafKa能做到理论上的写成功不丢失
  • 分布式容灾好
  • 容量大相对于内存队列,KafKa的容量受硬盘影响
  • 数据量不会影响到KafKa的速度

分布式日志系统(Log)
在很多时候我们需要对一些庞大的数据进行存留,日志存储这块会遇到巨大的问题,日志不能丢,日志存文件不好找,定位一条消息成本高(遍历当天日志文件),实时显示给用户难,这几类问题KafKa都能游刃有余

  • KafKa的集群备份机制能做到n/2的可用,当n/2以下的机器宕机时存储的日志不会丢失
  • KafKa可以对消息进行分组分片
  • KafKa非常容易做到实时日志查询

流式处理

  • 流式处理就是指实时地处理一个或多个事件流。
  • 流式的处理框架(spark, storm , flink) 从主题中读取数据, 对其进行处理, 并将处理后的结果数据写入新的主题, 供用户和应用程序使用,
  • kafka的强耐久性在流处理的上下文中也非常的有用

3. kafka的架构

3.1 架构案例

Kafka Cluster:由多个服务器组成。每个服务器单独的名字broker(掮客)。

kafka broker:kafka集群中包含的服务器

Kafka Producer:消息生产者、发布消息到 kafka 集群的终端或服务。

Kafka consumer:消息消费者、负责消费数据。

Kafka Topic: 主题,一类消息的名称。存储数据时将一类数据存放在某个topci下,消费数据也是消费一类数据。

注意:Kafka的元数据都是存放在zookeeper中。

3.2 架构剖析

kafka架构的内部细节剖析:

 

说明:kafka支持消息持久化,消费端为拉模型来拉取数据,消费状态和订阅关系由客户端负责维护,消息消费完 后,不会立即删除,会保留历史消息。因此支持多订阅时,消息只会存储一份就可以了。

Broker:kafka集群中包含一个或者多个服务实例,这种服务实例被称为Broker

Topic:每条发布到kafka集群的消息都有一个类别,这个类别就叫做Topic

Partition:分区,物理上的概念,每个topic包含一个或多个partition,一个partition对应一个文件夹,这个文件夹下存储partition的数据和索引文件,每个partition内部是有序的

3.3 关系解释

Topic & Partition

Topic 就是数据主题,是数据记录发布的地方,可以用来区分业务系统。

Kafka中的Topics总是多订阅者模式,一个topic可以拥有一个或者多个消费者来订阅它的数据。

一个topic为一类消息,每条消息必须指定一个topic。

对于每一个topic, Kafka集群都会维持一个分区日志。如下图

每个分区都是有序且顺序不可变的记录集,并且不断地追加到结构化的commit log文件。

分区中的每一个记录都会分配一个id号来表示顺序,称之为offset,offset用来唯一的标识分区中每一条记录。

在每一个消费者中唯一保存的元数据是offset(偏移量)即消费在log中的位置,偏移量由消费者所控制:通常在读取记录后,消费者会以线性的方式增加偏移量

但是实际上,由于这个位置由消费者控制,所以消费者可以采用任何顺序来消费记录。例如,一个消费者可以重置到一个旧的偏移量,从而重新处理过去的数据;也可以跳过最近的记录,从"现在"开始消费。

这些细节说明Kafka 消费者是非常廉价的—消费者的增加和减少,对集群或者其他消费者没有多大的影响。

4. kafka集群环境搭建

ZooKeeper 作为给分布式系统提供协调服务的工具被 kafka 所依赖。在分布式系统中,消费者需要知道有哪些生产者是可用的,而如果每次消费者都需要和生产者建立连接并测试是否成功连接,那效率也太低了,显然是不可取的。而通过使用 ZooKeeper 协调服务,Kafka就能将 Producer,Consumer,Broker 等结合在一起,同时借助 ZooKeeper,Kafka 就能够将所有组件在无状态的条件下建立起生产者和消费者的订阅关系,实现负载均衡。

4.1 准备工作

环境准备:

准备三台服务器, 安装jdk1.8 ,其中每一台虚拟机的hosts文件中都需要配置如下的内容

192.168.200.11 node1
192.168.200.12 node2
192.168.200.13 node3

实现方式:

1.将原有学习Docker时的Centos7克隆,克隆出一台虚拟机

 

 

 

 

2、修改IP地址

修改网卡配置文件 vi /etc/sysconfig/network-scripts/ifcfg-ens33

(1)bootproto=static,表示使用静态IP

(2)onboot=yes,表示将网卡设置为开机启用

(3)将原有的原有IP修改为192.168.200.11

(4)重启网络服务

service network restart

(5)安装目录创建

(6)修改host

执行命令“cd /etc/”进入服务器etc目录;

执行命令“vi hosts”编辑hosts文件;

输入你要修改的内容:

执行命令”/etc/init.d/network restart“ 重启hosts;

执行命令”cat /etc/hosts“可以查看到hosts文件修改成功。

4.2 Zookeeper集群搭建

1. Linux安装JDK,三台Linux都安装。

查看JDK是否正确安装

2. Linux 安装Zookeeper,三台Linux都安装,以搭建Zookeeper集群

上传zookeeper-3.4.14.tar.gz

解压并配置zookeeper(配置data目录,集群节点)

node2配置

node3配置

启动zookeeper

4.3 下载安装包

中文网站: http://kafka.apachecn.org/

英文网站: http://kafka.apache.org/

由于kafka是scala语言编写的,基于scala的多个版本,kafka发布了多个版本。其中2.11是推荐版本.

4.4 上传安装包并解压

4.5 修改kafka的核心配置文件

4.6 将配置好的kafka分发到其他二台主机

cd /export/servers

scp -r kafka/ node2:$PWD

scp -r kafka/ node3:$PWD

在每一台的服务器执行创建数据文件的命令

mkdir -p /export/data/kafka

4.7 启动集群

注意事项:在kafka启动前,一定要让zookeeper启动起来

4.8 Docker环境下的Kafka集群搭建

4.8.1 准备工作

1) 克隆VM,修改IP地址为192.168.200.20

修改网络配置: vi /etc/sysconfig/network-scrpits/ifcfg-ens33

2) 安装docker - compose

Compose 是用于定义和运行多容器 Docker 应用程序的工具。

如果我们还是使用原来的方式操作docker,那么就需要下载三个镜像:Zookeeper、Kafka、Kafka-Manager,需要对Zookeeper安装三次并配置集群、需要对Kafka安装三次,修改配置文件,Kafka-Manager安装一次,但是需要配置端口映射机器Zooker、Kafka容器的信息。但是引入Compose之后可以使用yaml格式的配置文件配置好这些信息,每个image只需要编写一个yaml文件,可以在文件中定义集群信息、端口映射等信息,运行该文件即可创建完成集群。

通过 Compose,您可以使用 YML 文件来配置应用程序需要的所有服务。然后,使用一个命令,就可以从 YML 文件配置中创建并启动所有服务。

Compose 使用的两个步骤:

  • 使用 docker-compose.yml 定义构成应用程序的服务,这样它们可以在隔离环境中一起运行。
  • 执行 docker-compose up 命令来启动并运行整个应用程序。

3)拉取镜像

4) 创建集群网络

基于Linux宿主机而工作的,也是在Linux宿主机创建,创建之后Docker容器中的各个应用程序可以使用该网络。

5)网络设置

新建网段之后可能会出现:WARNING: IPv4 forwarding is disabled. Networking will not work.

解决方式:

第一步:在宿主机上执行: echo "net.ipv4.ip_forward=1" >>/usr/lib/sysctl.d/00-system.conf

第二步:重启network和docker服务

[root@localhost /]# systemctl restart network && systemctl restart docker

4.8.2 搭建过程

每个镜像一个Yml文件,Zookeeper、Kafka、Kafka-Manager一个

1) 编写yml文件

1)docker-compose-zookeeper.yml

Zookeeper各个节点的信息,端口映射,集群信息,网络配置

2)docker-compose-kafka.yml

version: '2'


services:
  kafka1:
    image: wurstmeister/kafka
    restart: always
    hostname: kafka1
    container_name: kafka1
    privileged: true
    ports:
    - 9092:9092
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka1
      KAFKA_LISTENERS: PLAINTEXT://kafka1:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
    external_links:
    - zoo1
    - zoo2
    - zoo3
    networks:
      kafka:
        ipv4_address: 192.168.0.14


  kafka2:
    image: wurstmeister/kafka
    restart: always
    hostname: kafka2
    container_name: kafka2
    privileged: true
    ports:
    - 9093:9093
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka2
      KAFKA_LISTENERS: PLAINTEXT://kafka2:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9093
      KAFKA_ADVERTISED_PORT: 9093
      KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
    external_links:
    - zoo1
    - zoo2
    - zoo3
    networks:
      kafka:
        ipv4_address: 192.168.0.15


  kafka3:
    image: wurstmeister/kafka
    restart: always
    hostname: kafka3
    container_name: kafka3
    privileged: true
    ports:
    - 9094:9094
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka3
      KAFKA_LISTENERS: PLAINTEXT://kafka3:9094
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9094
      KAFKA_ADVERTISED_PORT: 9094
      KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
    external_links:
    - zoo1
    - zoo2
    - zoo3
    networks:
      kafka:
        ipv4_address: 192.168.0.16


networks:
  kafka:
    external:
      name: kafka

3)docker-compose-manager.yml

version: '2'


services:
  kafka-manager:
    image: sheepkiller/kafka-manager:latest
    restart: always
    container_name: kafka-manager
    hostname: kafka-manager
    ports:
     - 9000:9000
    environment:
     ZK_HOSTS: zoo1:2181,zoo2:2181,zoo3:2181
     KAFKA_BROKERS: kafka1:9092,kafka2:9092,kafka3:9092
     APPLICATION_SECRET: letmein
     KM_ARGS: -Djava.net.preferIPv4Stack=true
    networks:
     kafka:
      ipv4_address: 192.168.0.17




networks:
  kafka:
    external:
      name: kafka

2)将yaml文件上传到Docker宿主机中

安装:yum install -y lrzsz

上传到指定目录

3)开始部署

使用命令:docker-compose up -d

参数说明: up表示启动, -d表示后台运行

docker-compose -f /home/docker-compose-zookeeper.yml up -d

docker-compose -f /home/docker-compose-kafka.yml up -d

docker-compose -f /home/docker-compose-manager.yml up -d

参数说明: -f:表示加载指定位置的yaml文件

4)测试

浏览器访问宿主机:http://192.168.200.20:9000/

在这个网页控制台可以直接进行topic的创建

5. kafka的基本操作

(1) 创建topic

创建一个名字为test的主题, 有一个分区,有三个副本。一个主题下可以有多个分区,每个分区可以用对应的副本。

--create:新建命令

--zookeeper:Zookeeper节点,一个或多个

--replication-factor:指定副本,每个分区有三个副本。

 

(2) 查看主题命令

查看kafka当中存在的主题

kafka-topics.sh --list --zookeeper zoo1:2181,zoo2:2181,zoo3:2181

__consumer_offsets 这个topic是由kafka自动创建的,默认50个分区,存储消费位移信息(offset),老版本架构中是存储在Zookeeper中。

(3) 生产者生产数据

Kafka自带一个命令行客户端,它从文件或标准输入中获取输入,并将其作为message(消息)发送到Kafka集群。

默认情况下,每行将作为单独的message发送。

运行 producer,然后在控制台输入一些消息以发送到服务器。

kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9093,kafka3:9094 --topic test

This is a message
This is another message

(4) 消费者消费数据

kafka-console-consumer.sh --bootstrap-server kafka1:9092, kafka2:9093, kafka3:9094 --topic test --from-beginning

在使用的时候会用到bootstrap与broker.list其实是实现一个功能,broker.list是旧版本命令。

一、确保消费者消费的消息是顺序的,需要把消息存放在同一个topic的同一个分区

二、一个主题多个分区,分区内消息有序。

(5) 运行describe的命令

运行describe查看topic的相关详细信息

#查看topic主题详情,Zookeeper节点写一个和全部写,效果一致
kafka-topics.sh --describe --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --topic test
#结果列表
Topic: test1 PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1001 Replicas: 1001,1003,1002 Isr: 1001,1003,1002
Topic: test1 Partition: 1 Leader: 1002 Replicas: 1002,1001,1003 Isr: 1002,1001,1003
Topic: test1 Partition: 2 Leader: 1003 Replicas: 1003,1002,1001 Isr: 1003,1002,1001

结果说明:

这是输出的解释。第一行给出了所有分区的摘要,每个附加行提供有关一个分区的信息。有几个分区,下面就显示几行

leader:是负责给定分区的所有读取和写入的节点。每个节点将成为随机选择的分区部分的领导者。

replicas:显示给定partiton所有副本所存储节点的节点列表,不管该节点是否是leader或者是否存活。

isr:副本都已同步的的节点集合,这个集合中的所有节点都是存活状态,并且跟leader同步

(6) 增加topic分区数

任意kafka服务器执行以下命令可以增加topic分区数

kafka-topics.sh --zookeeper zkhost:port --alter --topic test --partitions 8

(7) 增加配置

flush.messages:此项配置指定时间间隔:强制进行fsync日志,默认值为None。

例如,如果这个选项设置为1,那么每条消息之后都需要进行fsync,如果设置为5,则每5条消息就需要进行一次fsync。

一般来说,建议你不要设置这个值。此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡。

如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞)。

如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟,物理server故障,将会导致没有fsync的消息丢失。

动态修改kakfa的配置

kafka-topics.sh --zookeeper zoo1:2181 --alter --topic test --config flush.messages=1

(8) 删除配置

kafka-topics.sh --zookeeper zoo1:2181 --alter --topic test --delete-config flush.messages

(9) 删除topic

目前删除topic在默认情况只是打上一个删除的标记,在重新启动kafka后才删除。如果需要立即删除,则需要在server.properties中配置:

delete.topic.enable=true(集群中的所有实例节点),一个主题会在不同的kafka节点中分配分组信息和副本信息

然后执行以下命令进行删除topic

kafka-topics.sh --zookeeper zoo1:2181 --delete --topic test

6. Java API操作kafka

 项目搭建准备

修改Windows的Host文件:

192.168.200.20 kafka1

192.168.200.20 kafka2

192.168.200.20 kafka3
<dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.0.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- java编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>

6.1 生产者代码

public class ProducerDemo {
    public static void main(String[] args) {
        // 要构造一个消息生产者对象,关于kafka集群等相关信息,恶意从properties文件中加载也可以从Properties对象中
        // KafkaProducer按照固定的key取出对应的value
        Properties properties = new Properties();
        // 指定集群节点
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.199.20:9092,192.168.199.20:9093,192.168.199.20:9094");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 创建消息生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        String topic = "lagou";
        for(int i= 1; i < 100; i++){
            // 设置消息内容
            String msg = "hello," + i;
            // 构建一个消息对象,主题(如果不存在,kafka会帮我们创建一个一个分区一个副本的主题),消息
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg);
            producer.send(record);
            System.out.println("消息发送成功,msg:" + msg);
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
        producer.close();
    }
}

6.2 消费者代码

public class ConsumerDemo {
    public static void main(String[] args) {
        // 属性对象
        Properties properties = new Properties();
        // 指定集群节点
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.199.20:9092,192.168.199.20:9093,192.168.199.20:9094");
        // 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "lagou");

        // 消息消费者对象
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        // 主题
        String topic = "lagou";
        // 订阅消息
        kafkaConsumer.subscribe(Collections.singletonList(topic));
        while (true){
            // 获取消息的方法是一个阻断式方法
            ConsumerRecords<String, String> records = kafkaConsumer.poll(500);
            for (ConsumerRecord<String, String> record : records){
                System.out.println("主题:" + record.topic() + ",偏移量:" + record.offset() + ", msg:" + record.value());
            }
        }
    }
}

多个follower副本通常存放在和leader副本不同的broker中。通过这样的机制实现了高可用,当某台机器挂掉后,其他follower副本也能迅速”转正“,开始对外提供服务。

7. Apache kafka原理

7.1 分区副本机制

分区机制:主要解决了单台服务器存储容量有限和单台服务器并发数限制的问题 一个分片的不同副本不能放到同一个broker上。

当主题数据量非常大的时候,一个服务器存放不了,就将数据分成两个或者多个部分,存放在多台服务器上。每个服务器上的数据,叫做一个分片

分区对于 Kafka 集群的好处是:实现负载均衡,高存储能力、高伸缩性。分区对于消费者来说,可以提高并发度,提高效率。

当数据只保存一份的时候,有丢失的风险。为了更好的容错和容灾,将数据拷贝几份,保存到不同的机器上。

kafka的副本都有哪些作用?

  在kafka中,实现副本的目的就是冗余备份,且仅仅是冗余备份,所有的读写请求都是由leader副本进行处理的。follower副本仅有一个功能,那就是从leader副本拉取消息,尽量让自己跟leader副本的内容一致。

说说follower副本为什么不对外提供服务?

这个问题本质上是对性能和一致性的取舍。试想一下,如果follower副本也对外提供服务那会怎么样呢?首先,性能是肯定会有所提升的。但同时,会出现一系列问题。类似数据库事务中的幻读,脏读。

比如你现在写入一条数据到kafka主题a,消费者b从主题a消费数据,却发现消费不到,因为消费者b去读取的那个分区副本中,最新消息还没写入。而这个时候,另一个消费者c却可以消费到最新那条数据,因为它消费了leader副本。

为了提高那么些性能而导致出现数据不一致问题,那显然是不值得的。

7.2 kafka保证数据不丢失机制

从Kafka的大体角度上可以分为数据生产者,Kafka集群,还有就是消费者,而要保证数据的不丢失也要从这三个角度去考虑。

7.2.1. 消息生产者

消息生产者保证数据不丢失:消息确认机制(ACK机制),参考值有三个:0,1,-1

//producer无需等待来自broker的确认而继续发送下一批消息。
//这种情况下数据传输效率最高,但是数据可靠性确是最低的。
properties.put(ProducerConfig.ACKS_CONFIG,"0");
//producer只要收到一个分区副本成功写入的通知就认为推送消息成功了。
//这里有一个地方需要注意,这个副本必须是leader副本。
//只有leader副本成功写入了,producer才会认为消息发送成功。
properties.put(ProducerConfig.ACKS_CONFIG,"1");
//ack=-1,简单来说就是,producer只有收到分区内所有副本的成功写入的通知才认为推送消息成功了。
properties.put(ProducerConfig.ACKS_CONFIG,"-1");

7.2.2 消息消费者

kafka消费消息的模型:

即消费消息,设置好offset,类比一下:

什么时候消费者丢失数据呢?

由于Kafka consumer默认是自动提交位移的(先更新位移,再消费消息),如果消费程序出现故障,没消费完毕,则丢失了消息,此时,broker并不知道。

解决方案:

  • enable.auto.commit=false 关闭自动提交位移
  • 在消息被完整处理之后再手动提交位移
  • properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");

7.3 消息存储及查询机制

kafka 使用日志文件的方式来保存生产者消息,每条消息都有一个 offset 值来表示它在分区中的偏移量。

Kafka 中存储的一般都是海量的消息数据,为了避免日志文件过大,一个分片 并不是直接对应在一个磁盘上的日志文件,而是对应磁盘上的一个目录,这个目录的命名规则是<topic_name>_<partition_id>。

7.3.1 消息存储机制

log分段:

  • 每个分片目录中,kafka 通过分段的方式将 数据分为多个 LogSegment。
  • 一个 LogSegment 对应磁盘上的一个日志文件(00000000000000000000.log)和一个索引文件(如上:00000000000000000000.index)。
  • 其中日志文件是用来记录消息的。索引文件是用来保存消息的索引。
  • 每个LogSegment 的大小可以在server.properties 中log.segment.bytes=107370 (设置分段大小,默认是1gb)选项进行设置。
  • 当log文件等于1G时,新的会写入到下一个segment中。

7.3.2 通过 offset 查找 message

存储的结构:一个主题 --> 多个分区 ----> 多个日志段(多个文件)

查询步骤

第一步:查询segment file:

  segment file命名规则跟offset有关,根据segment file可以知道它的起始偏移量,因为Segment file的命名规则是上一个segment文件最后一条消息的offset值。所以只要根据offset 二分查找文件列表,就可以快速定位到具体文件。

  比如:

    第一个segment file是00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0。

    第二个是00000000000000091932.index:代表消息量起始偏移量为91933 = 91932 + 1。那么offset=5000时应该定位00000000000000000000.index

第二步通过segment file查找message:

  通过第一步定位到segment file,当offset=5000时,依次定位到00000000000000000000.index的元数据物理位置和00000000000000000000.log的物理偏移地址,然后再通过00000000000000000000.log顺序查找直到offset=5000为止。

7.4 生产者消息分发策略

kafka在数据生产的时候,有一个数据分发策略。默认的情况使用DefaultPartitioner.class类。

这个类中就定义数据分发的策略。

public interface Partitioner extends Configurable, Closeable {
    /**
    * Compute the partition for the given record.
    *
    * @param topic The topic name
    * @param key The key to partition on (or null if no key)
    * @param keyBytes The serialized key to partition on( or null if no key)
    * @param value The value to partition on or null
    * @param valueBytes The serialized value to partition on or null
    * @param cluster The current cluster metadata
    */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
    /**
    * This is called when partitioner is closed.
    */
    public void close();
}

默认实现类:org.apache.kafka.clients.producer.internals.DefaultPartitioner

1) 如果是用户指定了partition,生产就不会调用DefaultPartitioner.partition()方法

数据分发策略的时候,可以指定数据发往哪个partition。

当ProducerRecord 的构造参数中有partition的时候,就可以发送到对应partition上

/**
* Creates a record to be sent to a specified topic and partition
*
* @param topic The topic the record will be appended to
* @param partition The partition to which the record should be sent
* @param key The key that will be included in the record
* @param value The record contents
*/
public ProducerRecord(String topic, Integer partition, K key, V value) {
    this(topic, partition, null, key, value, null);
}

2) DefaultPartitioner源码

如果指定key,是取决于key的hash值

如果不指定key,轮询分发

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    //获取该topic的分区列表
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    //获得分区的个数
    int numPartitions = partitions.size();
    //如果key值为null
    if (keyBytes == null) {//如果没有指定key,那么就是轮询
        //维护一个key为topic的ConcurrentHashMap,并通过CAS操作的方式对value值执行递增+1操作
        int nextValue = nextValue(topic);
        //获取该topic的可用分区列表
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {//如果可用分区大于0
            //执行求余操作,保证消息落在可用分区上
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // 没有可用分区的话,就给出一个不可用分区
            return Utils.toPositive(nextValue) % numPartitions;
        }
    } else {//不过指定了key,key肯定就不为null
        // 通过计算key的hash,确定消息分区
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

7.5 消费者负载均衡机制

同一个分区中的数据,只能被一个消费者组中的一个消费者所消费。例如 P0分区中的数据不能被Consumer Group A中C1与C2同时消费。

消费组:一个消费组中可以包含多个消费者,properties.put(ConsumerConfig.GROUP_ID_CONFIG,"groupName");如果该消费组有四个消费者,主题有四个分区,那么每人一个。多个消费组可以重复消费消息。

如果有3个Partition, p0/p1/p2,同一个消费组有3个消费者,c0/c1/c2,则为一一对应关系;

如果有3个Partition, p0/p1/p2,同一个消费组有2个消费者,c0/c1,则其中一个消费者消费2个分区的数据,另一个消费者消费一个分区的数据;

如果有2个Partition, p0/p1,同一个消费组有3个消费者,c0/c1/c3,则其中有一个消费者空闲,另外2个消费者消费分别各自消费一个分区的数据;

8. kakfa配置文件说明

server.properties

1、broker.id=0:

kafka集群是由多个节点组成的,每个节点称为一个broker,中文翻译是代理。每个broker都有一个不同的brokerId,由broker.id指定,是一个不小于0的整数,各brokerId必须不同,但不必连续。如果我们想扩展kafka集群,只需引入新节点,分配一个不同的broker.id即可。

启动kafka集群时,每一个broker都会实例化并启动一个kafkaController,并将该broker的brokerId注册到zooKeeper的相应节点中。集群各broker会根据选举机制选出其中一个broker作为leader,即leader kafkaController。leader kafkaController负责主题的创建与删除、分区和副本的管理等。当leader kafkaController宕机后,其他broker会再次选举出新的leader kafkaController。

2、log.dir = /export/data/kafka/

broker持久化消息到哪里,数据目录

3、log.retention.hours = 168

log文件最小存活时间,默认是168h,即7天。相同作用的还有log.retention.minutes、log.retention.ms。retention是保存的意思。

数据存储的最大时间超过这个时间会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据。

log.retention.bytes和log.retention.hours任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖。

4、log.retention.check.interval.ms

多长时间检查一次是否有log文件要删除。默认是300000ms,即5分钟。

5、log.retention.bytes

限制单个分区的log文件的最大值,超过这个值,将删除旧的log,以满足log文件不超过这个值。默认是-1,即不限制。

6、log.roll.hours

多少时间会生成一个新的log segment,默认是168h,即7天。相同作用的还有log.roll.ms、segment.ms。

7、log.segment.bytes

log segment多大之后会生成一个新的log segment,默认是1073741824,即1G。

8、log.flush.interval.messages

指定broker每收到几个消息就把消息从内存刷到硬盘(刷盘)。默认是9223372036854775807 好大。

kafka官方不建议使用这个配置,建议使用副本机制和操作系统的后台刷新功能,因为这更高效。这个配置可以根据不同的topic设置不同的值,即在创建topic的时候设置值。

补充说明:

  在Linux操作系统中,当我们把数据写入到文件系统之后,数据其实在操作系统的page cache里面,并没有刷到磁盘上去。如果此时操作系统挂了,其实数据就丢了。

  1、kafka是多副本的,当你配置了同步复制之后。多个副本的数据都在page cache里面,出现多个副本同时挂掉的概率比1个副本挂掉,概率就小很多了

  2、操作系统有后台线程,定期刷盘。如果应用程序每写入1次数据,都调用一次fsync,那性能损耗就很大,所以一般都会在性能和可靠性之间进行权衡。因为对应一个应用来说,虽然应用挂了,只要操作系统不挂,数据就不会丢。

9、log.flush.interval.ms

指定broker每隔多少毫秒就把消息从内存刷到硬盘。默认值同log.flush.interval.messages一样, 9223372036854775807。

同log.flush.interval.messages一样,kafka官方不建议使用这个配置。

10、delete.topic.enable=true

是否允许从物理上删除topic

9. kafka监控与运维

9.1 kafka-eagle概述

在生产环境下,在Kafka集群中,消息数据变化是我们关注的问题,当业务前提不复杂时,我们可以使用Kafka 命令提供带有Zookeeper客户端工具的工具,可以轻松完成我们的工作。随着业务的复杂性,增加Group和 Topic,那么我们使用Kafka提供命令工具,已经感到无能为力,那么Kafka监控系统目前尤为重要,我们需要观察 消费者应用的细节。

为了简化开发者和服务工程师维护Kafka集群的工作有一个监控管理工具,叫做 Kafka-eagle。这个管理工具可以很容易地发现分布在集群中的哪些topic分布不均匀,或者是分区在整个集群分布不均匀的的情况。它支持管理多个集群、选择副本、副本重新分配以及创建Topic。同时,这个管理工具也是一个非常好的可以快速浏览这个集群的工具,

9.2 搭建安装 kafka-eagle

环境要求:需要安装jdk,启动zk以及kafka的服务

# 启动Zookeeper
zkServer.sh start
#启动Kafka
nohup ./kafka-server-start.sh /export/servers/kafka/config/server.properties 2>&1 &

修改windows host文件

127.0.0.1 LagouCloudEurekaServerB
127.0.0.1 LagouCloudEurekaServerA
127.0.0.1 localhost
127.0.0.1 www.xmind.net
192.168.200.20 kafka1
192.168.200.20 kafka2
192.168.200.20 kafka3
192.168.200.11 node1
192.168.200.12 node2
192.168.200.13 node3

搭建步骤:

1) 下载kafka-eagle的源码包

kafka-eagle官网:

http://download.kafka-eagle.org/

我们可以从官网上面直接下载最新的安装包即可kafka-eagle-bin-1.3.2.tar.gz这个版本即可

代码托管地址:

https://github.com/smartloli/kafka-eagle/releases

2) 上传安装包并解压:

这里我们选择将kafak-eagle安装在第三台

如果要解压的是zip格式,需要先安装命令支持。

yum install unzip

unzip xxxx.zip

#将安装包上传至 node01服务器的/export/softwares路径下, 然后解压

cd /export/softwares/

unzip kafka-eagle.zip

cd kafka-eagle-web/target/

tar -zxf kafka-eagle-web-2.0.1-bin.tar.gz -C /export/servers

3) 准备数据库:

kafka-eagle需要使用一个数据库来保存一些元数据信息,我们这里直接使用msyql数据库来保存即可,在node01服务器执行以下命令创建一个mysql数据库即可

SQLite、MySQL

--进入mysql客户端:

create database eagle;

默认情况下MySQL只允许本机连接到MYSQL实例中,所以如果要远程访问,必须开放权限:

update user set host = '%' where user ='root'; //修改权限

flush privileges; //刷新配置

4) 修改kafka-eagle配置文件

cd /export/servers/kafka-eagle-bin-1.3.2/kafka-eagle-web-1.3.2/conf

vim system-config.properties

#内容如下:

kafka.eagle.zk.cluster.alias=cluster1

cluster1.zk.list=node1:2181,node2:2181,node3:2181

kafka.eagle.driver=com.mysql.jdbc.Driver

kafka.eagle.url=jdbc:mysql://10.1.192.208:3306/eagle

kafka.eagle.username=root

kafka.eagle.password=wu7787879

5) 配置环境变量

kafka-eagle必须配置环境变量,node03服务器执行以下命令来进行配置环境变量

vi /etc/profile

#内容如下:

export KE_HOME=/export/servers/kafka-eagle-bin-1.3.2/kafka-eagle-web-1.3.2

export PATH=:$KE_HOME/bin:$PATH

#让修改立即生效,执行

source /etc/profile

6) 启动kakfa-eagle

cd kafka-eagle-web-1.3.2/bin

chmod u+x ke.sh

./ke.sh start

6) 访问主界面:

http://node03:8048/ke/account/signin?/ke/

用户名:admin

密码:123456

原文地址:https://www.cnblogs.com/zhf123/p/14619607.html