时间:2024-05-04
李 旺 双 锴
(北京邮电大学网络技术研究院 北京 100876)
社交媒体、移动设备及传感器以前所未有的速度持续产生着海量数据,这些数据经过简单的预处理之后被存储到分布式数据仓库中,用于后期的计算、分析与挖掘。由于数据量巨大,需要采用分布式计算架构对计算进行拆分后分发到成百甚至上千台机器上并行执行。Flink的出现正好解决了大规模数据计算问题,相比于MapReduce[1]框架,Flink[2]具有流批一体的数据处理语义[3]、基于线程的计算模型和中间结果无须写入磁盘等优点。Flink针对Table API[4]提供了执行计划优化模块,该模块对作业执行计划优化后生成相应的物理执行计划,然后提交到集群运行。该模块提供了灵活的拓展接口,可以对Flink作业进行自定义优化。
在Flink分布式计算架构下,执行多表连接(multi-table join)操作时应考虑以下两个方面。
① 由于Flink提供了基于线程的轻量级计算模型,在集群中可以提供更高的计算并行度,而用户编写的程序在执行多表连接时并不会考虑到表的大小及关联性等特性。因此,本文需要用算法来优化多表连接的并行度,从而提升作业的整体性能。
② 在连接过程中,需要进行大量的数据shuffle操作以完成连接计算,导致过高的网络IO代价。因此,本文需要设计一个算法在并行执行多表连接时尽量减少需要进行shuffle操作的数据量。
现有的多表连接的优化研究主要围绕MapReduce计算框架展开,优化措施主要包括执行计划和执行框架两方面的优化。由于Flink的性能优化和编程模型的差异,已有算法不能充分利用Flink集群的性能优势。本文提出了一个适用于Flink的Multi Bushy Tree算法,用于提高多表连接的并行度。在多表连接过程中,已有算法主要致力于寻找最优的连接顺序以减小中间结果数据的大小,而忽略了利用集群的分布式特点尽可能将多表连接计算并行化。本文充分考虑了表之间关联键的依赖关系,寻找局部星型连接,将不存在依赖关系的连接计算和星型连接中事实表和维表的连接计算并行化,从而缩短多表连接作业的执行时间。
为了进一步提高多表连接的速度,本文在Multi Bushy Tree算法的基础上提出了Semi Join算法。该算法在执行星型连接中事实表和维表的连接计算时,只对事实表中的关联键字段和维表数据执行shuffle操作,连接得到的各中间结果表以事实表中的主键字段作为关联条件,执行连接计算。由于Flink提供的执行计划优化功能[5],多个中间结果表的连接操作可以chain在一个节点执行。整个连接过程需要shuffle操作的数据量大大减少,可以有效减小网络IO代价,提高星型连接的速度。
对于执行计划的优化,文献[6]的处理就是将多表连接看作是链式连接问题,通过将n张表的连接操作拆分成为n-1个2元连接,每个2元连接使用一个MapReduce作业完成,各MapReduce作业之间是串行执行的。当n较大时会导致较高的作业时间复杂度和中间结果的存储代价,不能充分利用分布式作业的优势。文献[7]利用改进的蚁群算法寻找连接树的较优解,在一定程度上避免了局部最优解,同时缩短了搜索时间。文献[8]提出的多表连接算法可以在一个MapReduce作业中完成所有的连接运算,但是随着参与连接的表增加,这种算法通过网络IO进行shuffle操作的数据量将急剧增加,从而导致较高的网络IO代价。文献[9]提出的SmartJoin算法虽然可以使用更少的MapReduce作业数来实现链式多表连接不能完成的大量表之间的连接操作,但是该算法对表的限制太多,它要求参与连接的表必须包括两个大表,其余小表必须存储在执行Reduce任务的节点上,参与连接的两个大表还必须存在关联键。文献[10]将多表连接规模划分为小中大三种类型,对于不同类型采用不同的连接顺序优化方法,并引入线性动态规划优化算法的时间复杂度。上述所有算法都是通过优化多表连接计算的连接顺序来提高作业的运行效率,没有考虑将无依赖关系的连接计算并行化,无法充分利用Flink集群的节点资源。
对于执行框架的优化,文献[11]基于MapReduce框架提出在Map-Reduce两阶段之后新增一个Merge阶段,在新增的Merge阶段执行表连接计算,从而节省了一次MapReduce作业。文献[12]通过修改MapReduce框架,支持数据在各节点之间进行管道式传输,支持在线聚集和持续查询,但是框架修改之后,增加了失败恢复(failover)实现的复杂度,同时对表连接的性能增益有限。文献[13]将传统的Sort-Merge Join算法应用到大规模分布式系统中,突破了单机的内存限制,同时避免了在reduce节点执行两表连接操作时进行笛卡尔积的计算。文献[14]通过计算得到的关联键的布尔分布矩阵,在Map节点利用矩阵对表数据进行过滤,可以有效减小网络IO代价,但是该算法对外连接增益有限而且增加了布尔分布矩阵的计算时间。文献[15]利用广播变量,在执行两表连接时,将小表存放到广播变量中,读取大表数据的节点会从广播变量中读取小表中的全部数据,进而执行连接计算,但该方式要求小表中的数据量足够少,否则对小表采用复制广播的方式依然会对网络IO产生极大的压力。Flink基于DataFlow的编程模型[16]与MapReduce实现存在较大差异,无法将已有执行框架优化算法直接应用到Flink上。
论文主要的设计与优化目标是在使用Flink计算引擎执行多表连接作业时,提高作业的运行效率。鉴于此,本文从两个方面进行考虑。
1) 在分布式集群中,计算的并行度对作业的运行效率有很大影响,为提高多表连接计算的并行度,所提出Multi Bushy Tree算法,可以有效地将不存在依赖关系的连接计算并行执行。
2) 对于星型连接,本文提出Semi Join算法,通过拆分事实表的关联键的方式减少在连接计算时需要shuffle的数据量,以此来减小星型连接的网络IO代价。
如图1所示,多表连接可以用一个查询图G=(V,E)来表示,其中:V是图中所有节点的集合,每个节点代表一个参与连接的数据表;E是图中所有边的集合,如果两个节点之间存在连接边表示这两张数据表存在关联关系。图1(a)的查询图包含6个关联关系,为6元连接。同理,图1(b)的查询图为8元连接。
(a) Left-deep Tree(b) Right-deep Tree
(a) 6-table join (b) 8-table join图1 多表连接查询示例
当表的数量较少时,可以通过穷举的方式得到一个最优的连接顺序。但是当参与连接的表的数量超过一定大小时,该问题则不能得到有效的解决。有研究表明多表连接执行计划的最优解确定是一个NP-hard问题[18],传统的做法通常采用连接树模型限定解的一个子空间,并设计算法从生成的连接树中找到一个较优解从而确定连接的顺序。连接树可以描述不同的连接方案但最终都会得到相同的连接结果。如图2所示,主流的连接树模型有Left-deep Tree、Right-deep Tree、Zigzag Tree和Bushy Tree四种。
(c) Zigzag Tree (d) Zigzag Tree图2 四种不同的连接树
本文首先对连接树中涉及到的概念进行说明:用户输入的连接条件涉及到的数据表称为基本表,两个表进行连接计算后得到的中间结果称为中间表。连接树是一棵二叉树,叶子节点永远是基本表,内部节点永远是中间表,边表示数据的流向。对于Left-deep Tree、 Right-deep Tree和Zigzag Tree,其内部节点至少有一个叶子节点作为子节点,表示每个中间表都至少由一个基本表进行连接计算得到。对于Bushy Tree,有些内部节点的子节点是不包含叶子节点的。实际上其他三种连接树都是Bushy Tree的一种特殊形式。
Flink提供的基于线程的轻量级计算模型,使得分布式计算作业能以更高的并行度运行。在多表连接方向,目前已有的算法主要是通过减小中间表大小的方式提高作业的运行效率,且都是针对MapReduce作业,无法充分发挥Flink的并行计算优势,因此并不完全适用于Flink计算作业。本文提出的算法充分考虑了Flink计算引擎的优点,尽量提高多表连接计算的运行效率。基于构建得到的较优Bushy Tree,分析表之间的依赖关系,尽可能提高多表连接计算的并行度。多元连接树(Multi Bushy Tree)算法详细步骤见算法1。
算法1Multi Bushy Tree算法
输入:JP“连接表的集合”。
输出:PT“多元连接树”。
1.BEGIN
2.bushyTree←BuildBushyTree(JP);
3.//将JP根据表之间的关联关系构建二元Bushy Tree
4.FOR EACHnodeINbushyTreeDO
5.IFshouldMerge(node) THEN
6.node←mergeNode(node);
7.END FOR
8.//如果节点的所有子节点(C1,C2,…,Cn)都是基本表且Cn
//基本表的关联键来自于C1,合并所有子节点并替换该节点
9.FOR EACHnodeINbushyTreeDO
10.IFshouldExecute(node) THEN
11.IFisStarJoin(node) THEN
12.parallelExecuteStarJoin(node);
13.ELSE
14.ParallelExecuteNormalJoin(node);
15.ENDFOR
16.RETURNPT;
17.END
图3 二元浓密连接树示意图
② 寻找局部星型连接。对于步骤①构建得到的二元浓密连接树,遍历树中的每一个节点,如果该节点的右孩子节点表示的基本表中的关联键均来自于最左孩子节点表示的基本表或者中间表,则用所有的孩子节点替换该节点,将二叉树变为多叉树。例如,对于步骤①中的查询示例,在分支一中,连接表C时,连接条件为“A.key2=C.key”,表C中的关联键key同样来自于表A中的key2列,满足合并的要求,因此可以使用A、B两个节点代替节点M1,更新后的连接树见图4(a)。针对分支二可以做相同处理,得到的连接树如图4(b)所示。而表A连接表D时,由于表D和表A的连接是笛卡尔积操作,因此不满足合并条件。对整个二元连接树处理之后,得到的最终的多元浓密连接树(Multi Bushy Tree),如图4(b)所示。对于优化得到的多元浓密连接树,如果某个节点包含n(n>2)个子节点,表明该中间结果表是表(T1,T2,…,Tn)通过星型连接运算得到的,其中T1在星型连接中作为事实表,T2-Tn在星型连接中作为维表。如果某个节点只有两个子节点,表明该中间结果表是通过链式连接或者笛卡尔积运算得到。
(a) (b)图4 多元浓密连接树
Multi Bushy Tree算法充分考虑表之间的关联性,将不存在依赖关系的连接计算并行化,提升多表连接作业的整体性能。此外,Multi Bushy Tree算法在构建的较优连接树基础上,充分挖掘局部星型连接,并将星型连接中事实表和各个维表之间的连接计算并行化,极大地提高了多表连接计算的并行度。然而在星型连接中,仍然需要将事实表中的数据通过网络IO进行多次shuffle以完成和各维表的连接计算,这对星型连接的计算性能仍会有较大影响。因此,针对星型连接提出了一个全新的连接算法减小网络IO代价。
这一部分中,针对星型连接,本文提出基于Flink计算引擎可以有效减少需要进行shuffle的数据量的算法。由于在星型连接中,事实表通常作为表连接计算中的左表,此时如果事实表和多张数据表进行连接计算时的关联键相同,Flink提供的执行计划优化器就会把这些连接计算合并在一个节点运行。利用这一特性,本文提出了关联键拆分连接(Semi Join)算法,利用该算法对星型连接优化后,只需要对事实表、维表和中间结果表各执行一次shuffle操作,可以进一步提高星型连接的执行效率,减小网络IO代价。详细算法步骤见算法2。
算法2Semi Join算法
输入:JP“星型连接树”。
输出:PT“星型连接结果”。
1.BEGIN
2.FORi=0 TOnumOf(dimTables) DO
3.joinCols←getJoinCols(dimTable);
4.localFactTables[i]←select(factTable,joinCols);
5.ENDFOR
6.//根据各维表的关联键从事实表中选择外键字段和主键
//字段生成局部事实表集合
7.FORi=0 TOnumOf(factTempTables) DO
8.dimResTables[i]←join(localFactTables[i],dimTables[i]);
9.ENDFOR
10.//对局部事实表和维表执行连接计算,得到各维表的连接
//结果表集合
11.FORi=0 TOnumOf(dimResTables) DO
12.factTable←join(factTable,dimResTables[i]);
13.ENDFOR
14.//各维表的连接结果表和事实表以事实表的主键字段作为
//连接条件进行连接计算,得到最终结果表
15.RETURNfactTable;
16.END
① 生成局部事实表。通过解析Flink执行计划,得到事实表中和维表关联的外键字段。使用Flink提供的“select”函数选择解析出的外键字段和事实表的主键字段作为局部事实表。例如,查询命令“SELECT*FROM A,B WHERE A.fKey=B.key”,事实表A使用fKey字段和维表B进行关联,因此生成的局部事实表T中包含两个字段:fKey和key(事实表的主键)。由于Flink提供的执行计划优化功能,select计算和前面的计算函数会合并在一个节点中执行,因此不会产生数据shuffle。
② 生成各维表的连接结果表。在步骤①中为每个维表生成了连接需要的局部事实表,由于各局部事实表只包含了事实表中的用于连接指定维表的外键字段和事实表的主键字段,因此每个局部事实表的数据量都比较小。将生成的局部事实表和维表按照关联字段进行hash shuffle,关联字段相等的数据会通过网络IO发送到相同节点执行连接计算。由于局部事实表和维表数据量都比较小,采用hash shuffle的方式可以避免将局部事实表进行复制广播的开销,有效减少了需要进行shuffle的数据量。以步骤①的查询为例,局部事实表T(key,fKey)和维表D(key,value1,value2),执行连接计算时,表T计算fKey字段的hash值,对并行度取模之后发送到指定节点,表D对key字段hash取模后发送到指定节点,表T中fKey和表D中key字段相同的数据会发送到同一个节点,这种连接方式保证表中的一条数据只会进行一次网络IO。
③ 生成最终查询结果表。通过步骤(2)得到的各维表的连接结果表中除了包含各维表中的查询字段还包含了事实表中的主键字段,事实表和各连接结果表通过事实表中的主键执行链式连接计算从而得到最终的查询结果。由于事实表和各连接结果表的链式连接计算中所有连接的关联键都相同,Flink提供的执行计划优化功能会将这些连接计算chain在一个节点中执行,所以在本次链式连接中事实表和各连接结果表只需要执行一次数据shuffle操作。
图5给出了基于Flink的星型连接优化算法Semi Join的执行流程。
图5 星型连接示意图
利用Semi Join算法优化之后的星型连接计算,步骤①、步骤②和步骤③是串行的,而步骤②中局部事实表和各维表之间的连接可以并行计算,因此星型连接的时间代价计算式表示为:
CostlocalFact=Max(C1,C2,…,Cn)
CostdimJoin=Max(R1,R2,…,Rn)
(1)
Cost=CostlocalFact+CostdimJoin+CostfinalJoin
式中:C1,C2,…,Cn表示从事实表中查询得到与各维表进行连接的局部事实表的时间代价,由于SELECT计算是并行执行的,所以该阶段时间代价取决于最慢的SELECT计算。R1,R2,…,Rn表示各局部事实表和各维表连接计算的时间代价,各连接计算在集群中并行执行,该阶段的时间代价同样取决于最慢的连接计算的时间代价。最后整个星型连接计算的时间代价为三个串行计算的时间代价之和。
从图5可以看出,基于Flink的星型连接优化算法Semi Join主要包含两种类型的连接操作:局部事实表-维表连接和事实表-结果表连接。出于对计算性能和存储性能考虑,算法中的连接计算采用Hash Join的方式,每次连接都会进行数据shuffle操作,因此网络IO代价计算式表示为:
(2)
Cost=CostdimJoin+CostfinalJoin
式中:L[i]表示第i个局部事实表的大小;D[i]表示第i个维表的大小;R[i]表示第i个连接结果表的大小;F表示事实表的大小。生成连接结果表时,需要对参与连接的局部事实表和维表进行shuffle操作。生成最终结果表时,需要对事实表和各连接结果表进行shuffle操作。整个星型连接需要shuffle操作的总数据量为两个阶段shuffle操作数据量之和。
本文将Multi Bushy Tree+Semi Join与其他两种多表连接算法进行比较。(1) Left-deep Tree连接。Flink提供了基于Left-deep Tree的多表连接方式,连接的顺序取决于用户输入。(2) Bushy Tree连接。通过Bushy Tree的方式构建连接树,Flink可以将不存在依赖关系的两表连接计算并行化。本文实现了这3种连接算法,并从计算并行度、作业运行时间和网络IO三个方面进行评估。
本文搭建了具有20个节点的集群,其中一个节点被用作Master和ResourceManager,负责任务调度和资源管理,另外的19个节点被用作Worker和TaskManager,所有的计算任务都在这些节点上运行。每台机器的硬件配置为4核2.4 GHz的CPU、8 GB内存、40 GB机械硬盘。集群中的每个节点均安装了CentOS 7 64位操作系统,使用的Flink版本为原生的Flink 1.9.0,底层使用YARN 2.9.2作为分布式调度系统。
实验采用的数据集为TPC-H提供的Dbgen工具所生成的模拟数据集。为了评价三种算法在不同的多表连接方式下的性能差异,实验生成了A、B、C和D四种数据集,数据集格式和大小如表1所示,四种数据集基本可以覆盖多表连接的各种情况。
表1 数据集格式
在分布式集群中,提高多表连接计算的并行度可以有效地缩短计算作业的运行时间。由于多表连接计算根据依赖关系可以划分为不同的计算阶段,每个计算阶段的并行度并不相等,因此采用各阶段的平均并行度作为衡量标准,设置每个两表连接计算的并行度为4。表2展示了不同连接算法在执行多表连接计算时的并行度。
表2 多表连接并行度
对于数据集A,由于表之间不存在局部星型连接,Multi Bushy Tree算法无法在Bushy Tree的基础上做进一步优化。在数据集B中,Multi Bushy Tree算法可以基于Bushy Tree算法进一步并行执行局部星型连接中事实表和维表的连接计算,所以并行度进一步提高。在数据集C中,虽然存在两个局部星型连接,但是两者存在关联依赖关系,无法进一步并行化两个星型连接计算,结果表明Multi Bushy Tree算法的平均并行度仍然高于Bushy Tree算法优化后的并行度。在数据集D中,由于Multi Bushy Tree算法可以并行执行两个局部星型连接和星型连接内部事实表和维表的连接计算,平均并行度进一步提高。由于Left-deep Tree算法只能串行执行多个两表连接计算,因此在四个数据集中,Multi Bushy Tree和Bushy Tree算法的并行度均高于Left-deep Tree算法。
实验表明,Multi Bushy Tree算法在多表连接存在局部星型连接的情况下,可以有效提高连接计算的并行度,并且随着表数量的增加,并行度增加更明显。对于不存在局部星型连接的情况,利用二元浓密连接树仍可并行执行不存在依赖关系的两表连接计算。
图6展示了对于不同数据集,每种多表连接算法整体作业的执行时间。由于在Left-deep Tree算法中,多表连接计算只能拆分为n-1个串行执行的两表连接计算,且有可能较早产生笛卡尔积计算,整体作业的执行时间比其他两种算法高很多。在剩下的两种多表连接算法中,Multi Bushy Tree算法比Bushy Tree算法的作业运行时间低很多。主要是因为虽然Multi Bushy Tree+Semi Join花费了不少时间用于寻找局部星型连接和生成局部事实表,但实验结果表明提高多表连接计算的并行度可以极大降低连接作业的运行时间。从整体上看,Multi Bushy Tree通过寻找局部星型连接并并行化星型连接中事实表和维表之间的连接计算,从而降低作业的运行时间,而Semi Join算法减少了星型连接过程中网络IO时间。
图6 不同多表连接算法在不同数据集下的作业运行时间
实验表明,在四种类型的数据集中,Multi Bushy Tree算法均表现最好。相比于其他两种连接算法,在数据集A上优化效果最差,作业运行时间缩短0%和20.80%,在数据集D上优化效果最好,作业运行时间缩短14.98%和44.78%。在所有数据集上,Multi Bushy Tree+Semi Join算法表现最好。
在星型连接中,Semi Join算法通过选择与各维表进行连接的关联键字段和事实表的主键字段得到局部事实表,使用局部事实表和维表进行连接计算得到各维表的连接结果表,最后将多个连接结果表通过事实表的主键字段进行连接计算得到最终查询结果表的方式,避免了对事实表进行多次shuffle的操作,能够极大减少通过网络IO进行shuffle操作的数据量。实验使用数据生成工具Dbgen生成了三个只包含星型连接的数据集Q1、Q2和Q3,除了各数据集中均包含一张事实表外,Q1中包含3张维表,Q2中包含5张维表,Q3中包含10张维表。在三个数据集上分别比较Semi Join算法和Flink提供的级联连接算法为完成连接所产生的网络IO代价。实验结果表明,Semi Join算法可以有效减少进行shuffle的数据量。
根据图7中的数据可知,在星型连接中,维表的数量越多,Semi Join算法对于网络IO的优化效果越明显。在三个数据集中,网络IO的数据量分别减少了65.4%、77.6%和89.8%。主要有以下两个原因。
图7 Semi Join和Chain Join产生的网络IO数据量
(1) Semi Join算法通过生成局部事实表的方式,使得在和各维表连接时,将对整个事实表shuffle操作的需求转化为对各局部事实表的shuffle操作。
(2) 生成最终查询结果表时,由于各连接结果表都是通过事实表的主键进行关联的,因此所有的连接计算都可以chain在一个节点运行,只需要对事实表进行一次shuffle操作。
综上所述,虽然Semi Join算法增加了生成局部事实表的时间开销,但是可以显著减小需要通过网络IO进行shuffle操作的数据量,可以有效缩短整体作业的运行时间,且算法对维表的数量不敏感。所以Semi Join算法在减少网络IO的同时缩短了星型连接作业的运行时间。
基于Flink分布式计算引擎,本文为优化多表连接的计算速度提出优化计算并行度的Multi Bushy Tree算法和Semi Join算法。Multi Bushy Tree算法在二元浓密树的基础上,充分考虑了表之间的关联性,在连接树中寻找局部星型连接,通过并行化不存在依赖关系的连接计算和星型连接中事实表和各维表的连接计算缩短多表连接作业的运行时间。此外,Semi Join算法利用Flink提供的执行计划优化功能,大量减少星型连接中需要通过网络IO进行shuffle操作的数据量。实验结果表明,与其他连接方法相比,Multi Bushy Tree+Semi Join算法可以大幅度提高多表连接时的计算并行度,极大缩短了多表连接作业的运行时间,有效减小网络IO代价。
由于目前在分布式计算领域有多种计算框架,例如Spark、Beam等,都可以提供大规模分布式集群计算。在未来的工作中,我们会将Multi Bushy Tree+Semi Join的多表连接优化方案应用到更多的计算框架中,并进一步提升算法的性能。
我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自各大过期杂志,内容仅供学习参考,不准确地方联系删除处理!