【spark】动态加载python包

背景

在实际的项目中,遇到了一个场景:  spark启动后,在执行部分java/scala算法后需要调用python实现的算法。python和java算法预先不能部署到服务器,因为客户可能随时扩展算子

方案:

动态调用java算法方案不赘述。大致思路就是部署一个web容器,该容器内实现一个主驱动类entry.jar,java算子通过web容器启动spark-submit,通过 --jars 将java程序动态上传。

问题1: java运行过程中切换到python程序

  方案:借助py4j,通过子进程,在driver侧启动pyspark.

  java端核心代码

val server = new py4j.GatewayServer.GatewayServerBuilder()
      .authToken(secret)
      .javaPort(0)
      .entryPoint(pythonJobEndpoint)
      .build()
server.start()
val builder = new ProcessBuilder(Seq(pythonExec, "-m","python_entry").asJava)
    val env = builder.environment()
env.put("PY4J_PORT", server.getListeningPort.toString)
sparkContext.addSparkListener(new SparkListener {
      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
        process.waitFor(5, TimeUnit.SECONDS)
        server.shutdown()
        if (process.isAlive) {
          process.destroy()
        }
      }
    })

 python端核心代码:

port = int(PYSPARK_ENV['PY4J_PORT'])
        params = GatewayParameters(port=port, auto_convert=True, auth_token=PYSPARK_ENV['PY4J_SECRET'])
        gateway = JavaGateway(gateway_parameters=params)
        ep = gateway.entry_point

  核心是借助于JavaGateway

问题2: 动态加载python

解决这个问题需要解决两个问题:1.python环境变量问题,2:依赖包上传问题

解决思路: 借助 --py-files 将python文件下发到所有节点。该参数不仅能够下发文件,还能将python追加到worker的python环境变量中,即PYTHONPATH中

  • 步骤1 借助 --files 上传文件到driver上

  --py-files 该参数只存在于pySpark中,java侧的sparkSubmit类中根本没有这个启动参数。所以由于驱动类是java的entry,需要使用 --files 将python文件上传。这里仍然存在问题

  在yarn环境下,--files 参数会在client将所需的文件资源上传给ResourceManage,ResourceManage通过ApplicationMaster与NodeManager协商,协商完成NodeManager启动Worker,Worker会从ResourceManager中拉取所需的文件等资源,上述方式完全没有问题。但是standalone中,sparkSubmit提交命令后,资源上传给Master,Master将启动命令分发给worker。这导致了如果使用 --files命令指向的是client的路径,worker执行时会各种的文件Not found异常。因此 --files 需要指向一个网络地址。实际追踪后发现 --files实际支持hadoop支持的文件系统,包活 hdfs和http

  • 步骤2 driver侧python驱动环境设置

  --files 将文件上传集群。 yarn 模式下,资源会被上传到一个临时目录,但是该临时目录与entry.jar 的classpath处于同一级目录,PYTHONPATH只需要追加相对目录。standalone模式下,python文件会被下载到一个随机目录,根本不可控。这里需要借助SparkFIle获取绝对目录

SparkFiles.get("myPython.py")
  • 步骤3 python下方到worker,且追加到环境变量

  源码追踪,java侧的spark-submit没有 --py-files和 spark.submit.pyFiles,而pySpark侧有,因此需要通过一定的方式将该参数在python端启动时注入进去。

 

原文地址:https://www.cnblogs.com/zhouwenyang/p/14715220.html