filebeat对接kafka

工作中遇到了,filebeat对接kafka,记下来,分享一下,也为了防止自己忘记

对于filebeat是运行在客户端的一个收集日志的agent,filebeat是一个耳朵进一个耳朵出,进的意思是监听哪个日志文件,出的意思是监听的日志内容输出到哪里去,当然,这里我们输出到kafka消息队列中,而kafka就是一个消息队列,为什么要用kafka?因为现在用的很多,而且工作中也确实遇到filebeat对接kafka了。具体的可以自行百度查询,废话不多说,开始做

第一步,安装helm3

helm3的使用方法和安装,博客里有,在哪下载?https://github.com/helm/helm/releases/tag/v3.5.2,目前是最新的

第二步, 加载helm仓库,本来是需要加载官网的仓库地址的,可惜,翻不了墙,用阿里的代理一下吧,也能用,亲测

 第三步,下载helm文件,解压出来就是filebeat目录

helm pull apphub/kafka
helm pull apphub/filebeat

 

 第四步,应用kafka文件内容,红色字体非常有用,接下来给你们解释一下

[root@VM-0-15-centos ~]# helm install kafka2 ./kafka
NAME: kafka2
LAST DEPLOYED: Fri Feb  5 22:57:45 2021
NAMESPACE: default
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
** Please be patient while the chart is being deployed **

Kafka can be accessed via port 9092 on the following DNS name from within your cluster:

    kafka2.default.svc.cluster.local #在k8s这个内容可以当做域名来解析出来kafka的ip地址

To create a topic run the following command:#创建主题

    export POD_NAME=$(kubectl get pods --namespace default -l "app.kubernetes.io/name=kafka,app.kubernetes.io/instance=kafka2,app.kubernetes.io/component=kafka" -o jsonpath="{.items[0].metadata.name}")
    kubectl --namespace default exec -it $POD_NAME -- kafka-topics.sh --create --zookeeper kafka2-zookeeper:2181 --replication-factor 1 --partitions 1 --topic test

To list all the topics run the following command:#查看所有的主题

    export POD_NAME=$(kubectl get pods --namespace default -l "app.kubernetes.io/name=kafka,app.kubernetes.io/instance=kafka2,app.kubernetes.io/component=kafka" -o jsonpath="{.items[0].metadata.name}")
    kubectl --namespace default exec -it $POD_NAME -- kafka-topics.sh --list --zookeeper kafka2-zookeeper:2181

To start a kafka producer run the following command:#进入kafka生产者命令行可以给主题添加消息

    export POD_NAME=$(kubectl get pods --namespace default -l "app.kubernetes.io/name=kafka,app.kubernetes.io/instance=kafka2,app.kubernetes.io/component=kafka" -o jsonpath="{.items[0].metadata.name}")
    kubectl --namespace default exec -it $POD_NAME -- kafka-console-producer.sh --broker-list localhost:9092 --topic test

To start a kafka consumer run the following command:#消费者窗口,可以查看到生产者发出的信息

    export POD_NAME=$(kubectl get pods --namespace default -l "app.kubernetes.io/name=kafka,app.kubernetes.io/instance=kafka2,app.kubernetes.io/component=kafka" -o jsonpath="{.items[0].metadata.name}")
    kubectl --namespace default exec -it $POD_NAME -- kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    #以下两行是在容器内部运行的命令,一个是生产者,一个是消费者
    PRODUCER:
        kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
    CONSUMER:
        kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning

接下来我们来安装filebeat

filebeat有两种方式:

  1.以sidecar方式去手机容器日志,也就是说,一个pod中必须要运行一个filebeat容器,这样的话,如果有1000个pod,每一个pod跑一个应用,一个filebeat,那么就是2000个,果断放弃

  2.以daemonSet方式,以节点运行,那么只需要有几个node就运行几个filebeat就可以了,所以我么选择第二种

第一步,修改values.yml文件,如下,红色字体需要注意,filebeat.input是监听的文件,output.kafka是输出到哪里去,我们这里配的是域名,coredns会自动解析成ip,具体规则是

pod名称.名称空间.svc.cluster.local,topic为我们创建的主题名称

[root@VM-0-15-centos filebeat]# cat values.yaml | grep -v "#" | grep -v "^$"
image:
  repository: docker.elastic.co/beats/filebeat-oss
  tag: 7.4.0
  pullPolicy: IfNotPresent
config:
  filebeat.config:
    modules:
      path: ${path.config}/modules.d/*.yml
      reload.enabled: false
  filebeat.inputs:
    - type: log
      enabled: true
      paths:
        - /var/a.log
  output.kafka:
     enabled: true
     hosts: ["kafka.default.svc.cluster.local:9092"]
     topic: test1111
  http.enabled: true
  http.port: 5066
overrideConfig: {}
data:
  hostPath: /var/lib/filebeat
indexTemplateLoad: []
plugins: []
command: []
args: []
extraVars: []
extraVolumes: []
extraVolumeMounts: []
extraSecrets: {}
extraInitContainers: []
resources: {}
priorityClassName: ""
nodeSelector: {}
annotations: {}
tolerations: []
affinity: {}
rbac:
  create: true
serviceAccount:
  create: true
  name:
podSecurityPolicy:
  enabled: False
  annotations: {}
privileged: false
monitoring:
  enabled: true
  serviceMonitor:
    enabled: true
  image:
    repository: trustpilot/beat-exporter
    tag: 0.1.1
    pullPolicy: IfNotPresent
  resources: {}
  args: []
  exporterPort: 9479
  targetPort: 9479

第二步,应用filebeat

[root@VM-0-15-centos ~]# helm install filebeat2 ./filebeat
NAME: filebeat2
LAST DEPLOYED: Fri Feb  5 23:09:51 2021
NAMESPACE: default
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
To verify that Filebeat has started, run:

  kubectl --namespace=default get pods -l "app=filebeat,release=filebeat2"

第三步,我们查看一下所有的pod,zookeeper是kafka集群必带的

 第四步,测试

进入到容器,然后在/var/a.log这个文件内输入点东西

 第五步,查看另一边监听的

 补充

如果要监控另外容器日志,那么,我们可以把这个应用的容器日志输出到宿主机的目录下面,然后再在filebeat容器跟这个宿主机目录做映射,在配置filebeat.yml文件,来完成filebeat对应用容器产生的日志做监控。最主要就是配置values.yml这个文件,此文件内容如下:

[root@iZ8vb1m9mvb3ev1tqgrldwZ shell]# cat filebeat/values.yaml | grep -v "#" | grep -v "^$"
image:
  repository: docker.elastic.co/beats/filebeat-oss
  tag: 7.4.0
  pullPolicy: IfNotPresent
config:
  filebeat.config:
    modules:
      path: ${path.config}/modules.d/*.yml
      reload.enabled: false
  processors:
    - add_cloud_metadata:
  filebeat.inputs:
    - type: log
      enabled: true
      paths:
      - /host/var/melocal/logs/*.log
      - /host/var/geo/logs/*.log
      - /host/var/rgeo/log/*.log
  output.kafka:
    enabled: true
    hosts: ["kafka.default.svc.cluster.local:9092"]
    topic: test_topic
  http.enabled: true
  http.port: 5066
overrideConfig: {}
data:
  hostPath: /var/lib/filebeat
indexTemplateLoad: []
plugins: []
command: []
args: []
extraVars: []
extraVolumes:
   - hostPath:
       path: /root/jiaohang/amap-melocal/logs
     name: melocal-log
   - hostPath:
       path: /root/jiaohang/amap-geo/data/geocoding/log
     name: geo-log
   - hostPath:
       path: /root/jiaohang/amap-rgeo/data/reverse_geocoding/log
     name: rgeo-log
extraVolumeMounts:
   - name: melocal-log
     mountPath: /host/var/melocal/logs
     readOnly: true
   - name: geo-log
     mountPath: /host/var/geo/log
     readOnly: true
   - name: rgeo-log
     mountPath: /host/var/rgeo/log
     readOnly: true
extraSecrets: {}
extraInitContainers: []
resources: {}
priorityClassName: ""
nodeSelector: {}
annotations: {}
tolerations: []
affinity: {}
rbac:
  create: true
serviceAccount:
  create: true
  name:
podSecurityPolicy:
  enabled: False
  annotations: {}
privileged: false
monitoring:
  enabled: true
  serviceMonitor:
    enabled: true
  image:
    repository: trustpilot/beat-exporter
    tag: 0.1.1
    pullPolicy: IfNotPresent
  resources: {}
  args: []
  exporterPort: 9479
  targetPort: 9479

我在这里监听了3个目录下面的日志动态,配置成功后在kafka下面就会有日志输出了

原文地址:https://www.cnblogs.com/fengzi7314/p/14380203.html