PrestoSQL(trinodb)源码分析 优化和调度

通过TpchQueryRunner可以跑起来一个测试服务

仍然使用‘SELECT SUPPKEY, sum(QUANTITY) from lineitem where QUANTITY > 5 group by SUPPKEY limit 10’

Mac M1, Java CLI有bug,可以用python替代

conn = trino.dbapi.connect(
    host='localhost',
    port=8080,
    user='test',
    catalog='tpch',
    schema='tiny',
    request_timeout=30000
)
cur = conn.cursor()
cur.execute('SELECT SUPPKEY, sum(QUANTITY) from lineitem where QUANTITY > 5 group by SUPPKEY limit 10')
rows = cur.fetchmany()

SqlQueryExecution

前面的流程忽略,直接到SqlQueryExecution

start的核心逻辑,

planQuery -> doPlanQuery

plan,优化并生成plan

Presto的优化器不太具有参考价值,简单看下数据的变化,

analysis是AST,语法树,

 转化成初始的逻辑计划,已经从语法树变成布尔的逻辑算子,

PlanNode root = planStatement(analysis, analysis.getStatement())

 

优化完的结果,最大的不同是加上ExchangeNode

createSubPlans

首先是,用Fragmenter,Visit整个plan,根据ExchangeNode生成fragment,

 

产生的效果如下,SimplePlanRewriter.rewriteWith

RemoteSourceNode,被替换成,Fragment“1”,

在FragmentProperties可以找到所有的Fragments,这里生成出两个fragments,

 

buildRootFragment

将上面的fragments,封装成SubPlan,

这里会将root封装成fragment“0”,代表OutputNode

planDistribution

将Fragment封装成StageExecutionPlan,

doPlan

封装StageExecutionPlan,这里除了Fragment,

还多出3个东西,SplitSource,dependencies,tables;

其中SplitSource和tables,只有包含tableScan的Stage会有,这里就是Fragment2 

 

dependences包含当前stage所依赖的stages,比如对于Fragment1,

splitSources

获取存储输入的splits信息,依赖于存储的实现,这里是tpch

获取逻辑参考调用栈,

对于3个Fragment递归调用doPlan,在visit中,只有TableScan算子会触发getSplits,其他的算子都是传递visit

Tpch的getSplits,只是根据节点数,每个节点splits数目,创建一堆TpchSplit

这里TpchSplit的组成很简单,仅仅是partNum,node地址

最终doPlan得到的结果,

scheduler.start()

产生Scheduler,

SqlQueryExecution.start -> SqlQueryExecution.schedule

可以看到这里schedule是异步调用的,

对于每个stage,调动schedule

Schedule的过程,首先会选取一个Scheduler,

可以看到stage0和1,没有source,所以选的是FixedCountScheduler,

对于不同Scheduler的区别,详细参考,https://github.com/prestodb/presto/wiki/Stage-and-Source-Scheduler-and-Grouped-Execution

这个调度逻辑就是,对于每个node调度一个task

对于stage2,选择FixedSourcePartitionedScheduler

逻辑是先类似FixedCountScheduler去创建task,然后再调用SourcePartitionedScheduler的逻辑(SourcePartitionedScheduler会为一批splits动态调度一个新的task,而FixedSourcePartitionedScheduler是使用先前调度好的task)

SourcePartitionedScheduler

调用栈,

SourcePartitionedScheduler,把splits分配到各个node上,

SqlStageExecution,把对应的splits,加入到task中,这里如果没有事先生成的Task,会动态的生成一个新的task

最终Scheduler生成的调度结果是ScheduleResult

对于Fragment2对应的stage,生成了3个Task,平均分配了包含的24个splits

  

scheduleTask

在scheduleTask中,创建RemoteTask,并且start

继续调用到,HttpRemoteTask的scheduleUpdate,

用线程池去调用,executor.execute(this::sendUpdate)

最终,通过HttpClient,将json化的task request发出到worker。

 

 

原文地址:https://www.cnblogs.com/fxjwind/p/15598461.html