PYFLINK 基础 (二):运行相关(二) PYFLINK 集群任务提交

来源:https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/deployment/cli/#submitting-pyflink-jobs

Currently, users are able to submit a PyFlink job via the CLI. It does not require to specify the JAR file path or the entry main class, which is different from the Java job submission.

When submitting Python job via flink run, Flink will run the command “python”. 
Please run the following command to confirm that the python executable in current environment points to a supported Python version of 3.6+.
$ python --version
# the version printed here must be 3.6+

The following commands show different PyFlink job submission use-cases:

  • Run a PyFlink job:
$ ./bin/flink run --python examples/python/table/batch/word_count.py
  • Run a PyFlink job with additional source and resource files. Files specified in --pyFiles will be added to the PYTHONPATH and, therefore, available in the Python code.
$ ./bin/flink run 
      --python examples/python/table/batch/word_count.py 
      --pyFiles file:///user.txt,hdfs:///$namenode_address/username.txt
  • Run a PyFlink job which will reference Java UDF or external connectors. JAR file specified in --jarfile will be uploaded to the cluster.
$ ./bin/flink run 
      --python examples/python/table/batch/word_count.py 
      --jarfile <jarFile>
  • Run a PyFlink job with pyFiles and the main entry module specified in --pyModule:
$ ./bin/flink run 
      --pyModule batch.word_count 
      --pyFiles examples/python/table/batch
  • Submit a PyFlink job on a specific JobManager running on host <jobmanagerHost> (adapt the command accordingly):
$ ./bin/flink run 
      --jobmanager <jobmanagerHost>:8081 
      --python examples/python/table/batch/word_count.py
$ ./bin/flink run 
      --target yarn-per-job
      --python examples/python/table/batch/word_count.py
  • Run a PyFlink application on a native Kubernetes cluster having the cluster ID <ClusterId>, it requires a docker image with PyFlink installed, please refer to Enabling PyFlink in docker:
$ ./bin/flink run-application 
      --target kubernetes-application 
      --parallelism 8 
      -Dkubernetes.cluster-id=<ClusterId> 
      -Dtaskmanager.memory.process.size=4096m 
      -Dkubernetes.taskmanager.cpu=2 
      -Dtaskmanager.numberOfTaskSlots=4 
      -Dkubernetes.container.image=<PyFlinkImageName> 
      --pyModule word_count 
      --pyFiles /opt/flink/examples/python/table/batch/word_count.py

To learn more available options, please refer to Kubernetes or YARN which are described in more detail in the Resource Provider section.

Besides --pyFiles--pyModule and --python mentioned above, there are also some other Python related options. Here’s an overview of all the Python related options for the actions run and run-application supported by Flink’s CLI tool:

OptionDescription
-py,--python Python script with the program entry. The dependent resources can be configured with the --pyFiles option.
-pym,--pyModule Python module with the program entry point. This option must be used in conjunction with --pyFiles.
-pyfs,--pyFiles Attach custom files for job. The standard resource file suffixes such as .py/.egg/.zip/.whl or directory are all supported. These files will be added to the PYTHONPATH of both the local client and the remote python UDF worker. Files suffixed with .zip will be extracted and added to PYTHONPATH. Comma (',') could be used as the separator to specify multiple files (e.g., --pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip).
-pyarch,--pyArchives Add python archive files for job. The archive files will be extracted to the working directory of python UDF worker. Currently only zip-format is supported. For each archive file, a target directory be specified. If the target directory name is specified, the archive file will be extracted to a directory with the specified name. Otherwise, the archive file will be extracted to a directory with the same name of the archive file. The files uploaded via this option are accessible via relative path. '#' could be used as the separator of the archive file path and the target directory name. Comma (',') could be used as the separator to specify multiple archive files. This option can be used to upload the virtual environment, the data files used in Python UDF (e.g., --pyArchives file:///tmp/py37.zip,file:///tmp/data.zip#data --pyExecutable py37.zip/py37/bin/python). The data files could be accessed in Python UDF, e.g.: f = open('data/data.txt', 'r').
-pyexec,--pyExecutable Specify the path of the python interpreter used to execute the python UDF worker (e.g.: --pyExecutable /usr/local/bin/python3). The python UDF worker depends on Python 3.6+, Apache Beam (version == 2.27.0), Pip (version >= 7.1.0) and SetupTools (version >= 37.0.0). Please ensure that the specified environment meets the above requirements.
-pyreq,--pyRequirements Specify the requirements.txt file which defines the third-party dependencies. These dependencies will be installed and added to the PYTHONPATH of the python UDF worker. A directory which contains the installation packages of these dependencies could be specified optionally. Use '#' as the separator if the optional parameter exists (e.g., --pyRequirements file:///tmp/requirements.txt#file:///tmp/cached_dir).

In addition to the command line options during submitting the job, it also supports to specify the dependencies via configuration or Python API inside the code. Please refer to the dependency management for more details.

原文地址:https://www.cnblogs.com/qiu-hua/p/14869358.html