mysql_elasticsearch_flask

面试题

image

这是我2018年的面试题,当时对elasticsearch的使用很陌生,没做出来,上周无意中被翻出来了,现在搞了下
分析下要求:我们需要起三个服务,mysql,es和python的后端服务
后端服务需要两个接口,保存和搜索;数据保存在mysql,搜索的时候查询es,那就需要在数据保存到mysql的时候再同步一份到es;使用flask写接口,logstash同步数据,最后还有些字段校验,返回的要求

1.mysql

mysql 安装不多说了,直接建表
CREATE TABLE `student` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '用户id',
  `username` varchar(32) NOT NULL DEFAULT '' COMMENT '用户名',
  `firstname` varchar(32) NOT NULL DEFAULT '' COMMENT '用户名',
  `lastname` varchar(75) NOT NULL DEFAULT '' COMMENT '用户名',
  `address` varchar(64) NOT NULL DEFAULT '' COMMENT '住址',
  `birthday` date  NULL  COMMENT '生日',
  `description` varchar(256) NOT NULL DEFAULT '' COMMENT '个人简介',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='学生表';

2.elasticsearch

1.使用dockero部署es
git pull elasticsearch:7.7.1
2. 启动
docker run --name es2 -d -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.7.1
3.查看状态
[root@localhost ~]# docker ps
CONTAINER ID   IMAGE          COMMAND                  CREATED        STATUS        PORTS                                            NAMES
76fe0f3112dc   830a894845e3   "/tini -- /usr/local…"   9 hours ago    Up 9 hours    0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp   es2
4.测试访问并建索引
(1)测试服务启动:
GET http://172.30.4.154:9200/
{
  "name" : "76fe0f3112dc",
  "cluster_name" : "docker-cluster",
  "cluster_uuid" : "ZVw9JgmyR7yz4pY4Xnwz5Q",
  "version" : {
    "number" : "7.7.1",
    "build_flavor" : "default",
    "build_type" : "docker",
    "build_hash" : "ad56dce891c901a492bb1ee393f12dfff473a423",
    "build_date" : "2020-05-28T16:30:01.040088Z",
    "build_snapshot" : false,
    "lucene_version" : "8.5.1",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}
(2)建索引(存储mysql中的数据)
PUT http://172.30.4.154:9200/index_student?pretty
返回:
{
    "acknowledged": true,
    "shards_acknowledged": true,
    "index": "index_student"
}
(3)查询索引下数据
GET http://172.30.4.154:9200/index_student/_search
{
    "took": 1,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 0,
            "relation": "eq"
        },
        "max_score": null,
        "hits": []
    }
}

3.logstash

使用logstatus 将mysql 数据同步至elasticsearch
注:安装与es对应的版本
地址:https://www.elastic.co/cn/downloads/past-releases/logstash-7-7-1
1.启动测试
[root@localhost logstash-7.7.1]# ls
bin     CONTRIBUTORS  Gemfile       lib          logs           logstash-core-plugin-api  NOTICE.TXT    syncpoint_table  tools   x-pack
config  data          Gemfile.lock  LICENSE.txt  logstash-core  modules                   student.conf  test_user.conf   vendor
[root@localhost logstash-7.7.1]# ./bin/logstash -e 'input { stdin { } } output { stdout {} }'
Sending Logstash logs to /qqc_data/logstash-7.7.1/logs which is now configured via log4j2.properties
[2021-02-23T19:15:58,556][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2021-02-23T19:15:58,710][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.7.1"}
[2021-02-23T19:16:00,654][INFO ][org.reflections.Reflections] Reflections took 43 ms to scan 1 urls, producing 21 keys and 41 values 
[2021-02-23T19:16:01,773][WARN ][org.logstash.instrument.metrics.gauge.LazyDelegatingGauge][main] A gauge metric of an unknown type (org.jruby.RubyArray) has been created for key: cluster_uuids. This may result in invalid serialization.  It is recommended to log an issue to the responsible developer/development team.
[2021-02-23T19:16:01,806][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>250, "pipeline.sources"=>["config string"], :thread=>"#<Thread:0x4c4c18bb run>"}
[2021-02-23T19:16:02,833][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
[2021-02-23T19:16:02,896][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
The stdin plugin is now waiting for input:
[2021-02-23T19:16:03,229][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
test
/qqc_data/logstash-7.7.1/vendor/bundle/jruby/2.5.0/gems/awesome_print-1.7.0/lib/awesome_print/formatters/base_formatter.rb:31: warning: constant ::Fixnum is deprecated
{
       "message" => "test",
      "@version" => "1",
    "@timestamp" => 2021-02-23T11:16:46.592Z,
          "host" => "localhost.localdomain"
}

2.编辑同步数据的配置文件
vim student.conf
input {
 jdbc {
   jdbc_driver_library => "/qqc_data/mysql-connector-java-5.1.49/mysql-connector-java-5.1.49-bin.jar"
   jdbc_driver_class => "com.mysql.jdbc.Driver"
   jdbc_connection_string => "jdbc:mysql://{IP地址}:3306/test"
   jdbc_user => "****"
   jdbc_password => "****"
   schedule => "*/1 * * * *"
   statement => "SELECT * FROM ip_info WHERE modify_time >= :sql_last_value"
   use_column_value => true
   tracking_column_type => "timestamp"
   tracking_column => "modify_time" # 递增字段的名称,这里使用 update_time 这一列,这列的类型是 timestamp
   last_run_metadata_path => "syncpoint_table"  # 同步点文件,这个文件记录了上次的同步点,重启时会读取这个文件,这个文件可以手动修改
 }
}

# 时间设置
filter {
  ruby { 
    code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" 
  }
  mutate {
    remove_field => ["@timestamp","ecs"]
  }
}

output {
 elasticsearch {
   hosts => ["172.30.4.129:9200"]
   # user => ""
   # password => ""
   # document_type => ""
   index => "index_student"
   document_id => "%{id}"  # 导入到 es 中的文档 id,这个需要设置成主键,否则同一条记录更新后在 es 中会出现两条记录,%{id} 表示引用 mysql 表中 id 字段的值
 }
 stdout {
        # JSON格式输出
        codec => json_lines
    }
}

3.配置pipelines.yml
[root@localhost ~]# cd /qqc_data/logstash-7.7.1/config/
[root@localhost config]# vim pipelines.yml
- pipeline.id: student
  path.config: "/qqc_data/logstash-7.7.1/student.conf"
  
4.在mysql插两条数据
mysql> select * from student;
+----+----------+------------------+----------+----------+------------+-------------+---------------------+---------------------+
| id | username | firstname        | lastname | address  | birthday   | description | create_time         | modify_time         |
+----+----------+------------------+----------+----------+------------+-------------+---------------------+---------------------+
|  1 | peter001 | peter用户first87 | last用户 | 上海     | 2021-02-12 | 是个学生    | 2021-02-23 17:25:16 | 2021-02-23 17:36:32 |
|  2 | jine     | jine用户first87  | last用户 | 上海静安 | 1900-02-12 | 工作了      | 2021-02-24 09:26:29 | 2021-02-24 09:26:34 |
+----+----------+------------------+----------+----------+------------+-------------+---------------------+---------------------+
2 rows in set

5.启动logstash
[root@localhost logstash-7.7.1]# ./bin/logstash
部分日志:
Wed Feb 24 10:40:00 CST 2021 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
[2021-02-24T10:40:01,315][INFO ][logstash.inputs.jdbc     ][student][832e063f3051a808540f4a708489797421af7a331f3dff51ace09fe86d3db1eb] (0.017660s) SELECT version()
[2021-02-24T10:40:01,390][INFO ][logstash.inputs.jdbc     ][student][832e063f3051a808540f4a708489797421af7a331f3dff51ace09fe86d3db1eb] (0.008656s) SELECT * FROM student WHERE modify_time >= '2021-02-23 07:40:44'
{"modify_time":"2021-02-23T09:36:32.000Z","description":"是个学生","create_time":"2021-02-23T09:25:16.000Z","lastname":"last用户","birthday":"2021-02-11T16:00:00.000Z","username":"peter001","timestamp":"2021-02-24T10:40:01.449Z","id":1,"firstname":"peter用户first87","address":"上海","@version":"1"}
{"modify_time":"2021-02-24T01:26:34.000Z","description":"工作了","create_time":"2021-02-24T01:26:29.000Z","lastname":"last用户","birthday":"1900-02-11T15:54:17.000Z","username":"jine","timestamp":"2021-02-24T10:40:01.468Z","id":2,"firstname":"jine用户first87","address":"上海静安","@version":"1"}
注:显示mysql数据已同步

6.查看elasticsearch中数据
GET http://172.30.4.154:9200/index_student/_search
{
    "took": 581,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 2,
            "relation": "eq"
        },
        "max_score": 1.0,
        "hits": [
            {
                "_index": "index_student",
                "_type": "_doc",
                "_id": "1",
                "_score": 1.0,
                "_source": {
                    "modify_time": "2021-02-23T09:36:32.000Z",
                    "description": "是个学生",
                    "create_time": "2021-02-23T09:25:16.000Z",
                    "lastname": "last用户",
                    "birthday": "2021-02-11T16:00:00.000Z",
                    "username": "peter001",
                    "timestamp": "2021-02-24T10:40:01.449Z",
                    "id": 1,
                    "firstname": "peter用户first87",
                    "address": "上海",
                    "@version": "1"
                }
            },
            {
                "_index": "index_student",
                "_type": "_doc",
                "_id": "2",
                "_score": 1.0,
                "_source": {
                    "modify_time": "2021-02-24T01:26:34.000Z",
                    "description": "工作了",
                    "create_time": "2021-02-24T01:26:29.000Z",
                    "lastname": "last用户",
                    "birthday": "1900-02-11T15:54:17.000Z",
                    "username": "jine",
                    "timestamp": "2021-02-24T10:40:01.468Z",
                    "id": 2,
                    "firstname": "jine用户first87",
                    "address": "上海静安",
                    "@version": "1"
                }
            }
        ]
    }
}

4.flask服务

1.路由(实现保存与查询接口)
urls = [
    ('/v1/student/save', one.student_save, ["POST"]),
    ('/v1/student/search', one.student_search),
]
for url in urls:
    app.add_url_rule(url[0], view_func=url[1], methods=url[-1] if isinstance(url[-1], list) else ["GET"])

2.视图函数
def student_save():
    user_data = request.json
    print(user_data, "$######################")
    username = user_data.get('username', '')
    firstname = user_data.get('firstname', '')
    lastname = user_data.get('lastname', '')
    address = user_data.get('address', '')
    description = user_data.get('description', '')
    create_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    birthday = user_data.get('birthday', '')
    if not username or not username.islower() or len(username) < 4:
        return api_response(error=(70005, "More than 4 characters lower case"))
    if len(firstname) > 20:
        return api_response(error=(70006, "Required, less than 20 characters"))
    if not is_valid_date(birthday):
        return api_response(error=(70007, "Date type: YYYY-MM-DD"))

    if not user_data.get('id'):
        insert_sql = """ insert into test.student (username,firstname,lastname,address,birthday,description,create_time) values 
        ('{0}','{1}','{2}','{3}','{4}','{5}','{6}')""".format(username, firstname, lastname, address, birthday, description,
                                                create_time)
        print(insert_sql, "&&&&&&&&&&&&&&&&&&&")
        run_sql(insert_sql)
    else:
        id = user_data.get('id')
        update_sql = """ update test.student set username='{0}',firstname='{1}',lastname='{2}',address='{3}',birthday='{4}',
        description='{5}' where id={6}""".format(username, firstname, lastname, address, birthday, description, id)
        print(update_sql, "*********************************")
        run_sql(update_sql)
    return api_response(data={})


def student_search():
    username = request.args.get('username', '')
    if not username:
        return api_response(error=(80001, "不能为空"))
    url = 'http://172.30.4.154:9200/index_student/_search'
    params = {
        "query": {
            "wildcard": {
                "username": {
                    "value": "*{0}*".format(username)
                }
            }
        }
    }
    user_data = requests.post(url, json=params)
    return api_response(data=user_data.json())
    

# 返回数据的公共方法(api_response)
from flask import make_response
class ErrorCode(object):
    API_REQUESTS_SUCCESS = ("10000", '成功')
    API_PARAMS_ERROR = ("10001", 'API参数缺失或错误')

def api_response(data=None, error=None):
    res = {}
    if data is not None:
        res["data"] = data
        res['code'] = ErrorCode.API_REQUESTS_SUCCESS[0]
        res["message"] = ErrorCode.API_REQUESTS_SUCCESS[1]
    if error is not None:
        res['data'] = {}
        res['code'] = error[0]
        res["message"] = error[1]
    return make_response(res)
    
# 项目地址:
https://github.com/chaochaoge123/interview_flask
原文地址:https://www.cnblogs.com/quqinchao/p/14440965.html