python 监听文件夹下的文件,将文本内容写入kafka,支持断电续传 (docker 发布)

python 监听文件夹下的文件,将文本内容写入kafka,支持断电续传 (docker  发布)

1.代码

2.docker 部署

1.代码:

python 监听文件夹下的文件,将文本内容写入kafka,支持断电续传 (docker  发布)

.代码:
#! /usr/bin/env python3
# coding = utf-8
import json
import os
import sys
import time
from concurrent.futures.thread import ThreadPoolExecutor

import Constant
import MongoData
import jsonpath
import LogConfig
import logging
from watchdog.observers import Observer
from watchdog.events import *

"""
 @Creation Date   : Aug 3, 2021 11:35:23 AM
 
 @Author          :  Sea  
"""


def get_file_name(file_dir):
    for root, dirs, files in os.walk(file_dir):
        return files


def hander_files(dirs):
    """ fileNames """
    booking_no_list = []
    booking_map = {}
    s = time.time()
    print("start to parse files")
    logging.info("start to parse files")
    for name in dirs:
        if name.endswith(".COMPLETED"):
            continue
        else:
            pass
            # 1.读取文件内容
            # f = open(Constant.FILE_PATH + name, "r")
            # f_str = f.read()
            # doing some thing
            os.rename(Constant.FILE_PATH + name, Constant.FILE_PATH + name+".COMPLETED")




def hander_path():
    dirs = get_file_name(Constant.FILE_PATH)
    hander_files(dirs)




class FileEventHandler(FileSystemEventHandler):
    def __init__(self):
        FileSystemEventHandler.__init__(self)
        self.pools = ThreadPoolExecutor(2)

    def on_created(self, event):
        if event.is_directory:
            pass
            # print("directory created:{0}".format(event.src_path))
        else:
            # print("file created:{0}".format(event.src_path))
            print(str(event.src_path).split("/").pop())
            filename = str(event.src_path).split("/").pop()
            filenameList = []
            filenameList.append(filename)
            time.sleep(1)
            # hander_files(filenameList)
            self.pools.submit(hander_files, filenameList)


# --USERNAME=mongodb --PASSWORD=ACahlofh --MONGO_IP=192.168.18.129 --FILE_PATH=/home/sea/Desktop/flume/XXX/
if __name__ == '__main__':
    iargs = sys.argv
    do_date = ''
    print("input args is :"+str(iargs))
    for ag in iargs:
        kv = ag.split("=")
        k = kv[0].replace("-", "")
        v = kv[-1]
        if not v:
            continue
        if k == "USERNAME":
            Constant.USERNAME = v
        if k == "PASSWORD":
            Constant.PASSWORD = v
        if k == "MONGO_IP":
            Constant.MONGO_IP = v
        if k == "FILE_PATH":
            Constant.FILE_PATH = v
        Constant.MONGO_CLIENT = 'mongodb://' + Constant.USERNAME + ':' + Constant.PASSWORD + '@' + Constant.MONGO_IP + '/'
    hander_path()
    observer = Observer()
    event_handler = FileEventHandler()
    observer.schedule(event_handler, Constant.FILE_PATH, True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

代码中的环境变量也可以直接这么获取,这样在执行代码的时候,就不用显示的给值:

import os
def os_environ(evn_name,default=None):
    try:
       return os.environ[evn_name]
    except Exception as e:
        return default

if __name__ == '__main__':
    environ = os_environ("JAVA_HOME1")
    print(environ)
    c = environ if environ else "default"
    print(c)
    print(os_environ("JAVA_HOME2","xxx"))

docker 发布:

Dockerfile:  FROM python:3.7.11-alpine3.13       43M

FROM python:3.7.11-slim
MAINTAINER Sea <lshan523@163.com>
VOLUME /tmp
RUN mkdir -p /opt/app/
RUN chmod -R  777  /opt/app/
RUN pip3 install --default-timeout=1000 --no-cache-dir --upgrade pip setuptools  pymongo  jsonpath watchdog
COPY docker-entrypoint.sh /opt/app/
#add python code
COPY  lazada_move/   /opt/app/
WORKDIR  /opt
# set evn 
ENV USERNAME=$USERNAME
ENV PASSWORD=$PASSWORD
ENV MONGO_IP=$MONGO_IP
ENV FILE_PATH=$FILE_PATH

ENTRYPOINT ["sh","/opt/app/docker-entrypoint.sh"]
docker-entrypoint.sh:
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/bin/sh

#if [ x"$AP_ENV" = x ]
#then
#   echo "AP_ENV IS NULL , USE DEFAULT DEV AS DEFAULT !"
#   echo "dev=$AP_ENV"
#else
#   echo "AP_ENV IS $AP_ENV !"
#fi
#if [ x"$AGENT_SERVICE_NAME" = x ]
#then
#  echo "No MS Trace Agent $AGENT_COLLECTOR_ADDRESS Setting, @@ NOT USE MS TRACE  !"
#  java -Denv=$AP_ENV -jar /opt/app.jar --apollo.meta=$CONFIG_SERVERS
#else
#  echo " USE MS TRACE Agent to start the service !"
python3 /opt/app/PackageData.py --USERNAME=$USERNAME --PASSWORD=$PASSWORD --MONGO_IP=$MONGO_IP --FILE_PATH=$FILE_PATH

条件参考:

if [[ $FE_ROLE = 'fe-leader' ]]; then
    /home/doris/fe/bin/start_fe.sh
elif [[ $FE_ROLE = 'be' ]]; then
    /home/doris/be/bin/start_be.sh
elif [[ $FE_ROLE = 'fe-follower' ]]; then
    /home/doris/fe/bin/start_fe.sh --helper $FE_LEADER
else
    /home/doris/fs_broker/bin/start_broker.sh
fi

build : sudo docker build -t  xxx  .

run :

   

 /home/sea/Desktop/flume/XXX/   需要监听的文件path
sudo docker run  -it  --privileged  -e 'USERNAME=mongodb' -e 'PASSWORD=sea' -e 'MONGO_IP=192.168.18.129' -e 'FILE_PATH=/home/sea/Desktop/flume/XXX/'  
-v /home/sea/Desktop/flume/XXX/:/home/sea/Desktop/flume/XXX/
 -v /etc/localtime:/etc/localtime:ro    -v /etc/timezone:/etc/timezone    xxx
原文地址:https://www.cnblogs.com/lshan/p/15093634.html