StarRocks 的前世今生

OLAP 场景介绍

一般来说,OLAP 层作为大数据体系的出口,直接服务于业务层,也可以说它面向的是人。所以虽然都是分布式计算引擎,但是它有别于 ETL 引擎:

  • OLAP面向的是人,所以 OLAP 的核心要求是快速
    • 报表等场景要求 2 - 5 秒内出数
    • 点查要求毫秒出数
    • ad-hoc、交互式分析可以容忍几十秒,但也是要秒级出数
  • ETL 一般面向的是存储, 它的特点是“读取大量数据、长时间执行(秒-天级别)”。所以它的核心要求是:高稳定性、高容错性,毕竟重跑 ETL 代价太高。
  • ETL 加工出结果数据后,通过 OLAP 引擎对外提供服务主要有 2 种查询方式:
    • 相对固定的计算逻辑,如固定报表
    • 不固定的计算逻辑,如交互式分析

如前所述,OLAP 面向的业务场景多样,对查询时间的要求不一而足,往往需要在在以下几个维度上去权衡

所以相较于 ETL 层 Spark 和 Flink 一统天下,OLAP 引擎可谓百花齐放[20]。整体来说可分 2 种:

  • MOLAP(Multidimensional OLAP):关键技术是预计算,数据摄入 OLAP 引擎时提前聚合和结果(指标)数据,可做到超大数据量上的快速查询响应。所以主要是服务固定分析逻辑的场景,比如固定报表
    • Kylin: 适配离线分析场景,导入 hive 数据
    • Druid:适配实时分析场景,可以实时摄入数据、实时聚合。
  • ROLAP(Relational OLAP):关键技术是现场计算,不进行预聚合,现场执行计算逻辑,适用于灵活复杂查询场景,比如交互式分析
    • 因为要快,一般都采用 MPP 架构;
      • 一种是只实现了计算引擎,分析外部数据比如 hive 的,所谓 SQL on hadoop。如 impala、presto
      • 另一种是实现了存储和计算的 MPP 数据库,如 doris、starrocks、clickhouse 等
    • 传统的 SQL 引擎如 spark sql/hive 也能服务 OLAP 场景,但是速度比较慢,一般不用他;

但是随着发展,技术也在不断融合和统一,所以就有了 HOLAP(Hybrid OLAP),也就是融合了 MOLAP 和 ROLAP 的特点的引擎。我觉得 Doris/StarRocks 应该就属于这一种了,他支持“预聚合”、也支持“现场计算”。相较于其他 OLAP 引擎,能覆盖更多的场景。而 OLAP 引擎也是在向这个方向发展(个人意见)

StarRocks 介绍

如前所述,OLAP 引擎的要求是,而发展方向也是趋向于一个引擎覆盖多种场景。所以看看 StarRocks 给自己的定位:其实也是在强调这 2 点。
新一代“极速”、“全场景”MPP 数据库

发展史

  • 起初是百度为了解决自己广告报表的问题[21],借鉴 Google Mesa、Apache impala、ORCFile 开发的系统。
    • 存储部分:Data Model 借鉴了 Mesa[6],存储格式借鉴了 ORCFile,采用列式存储
    • 计算部分:借鉴了 impala[7] 的实现,分为 FE、BE 2 种节点
  • 成熟后贡献给 Apache,开源后改名 Doris。随后一波核心开发创业开发了 DorisDb,随后开源的 Doris 和 Doris DB 闹矛盾,于是
    • DorisDB 改名为了 StarRocks 并进行了开源
    • 而 Doris 自己也发布了商业版本 SelectDb

StarRocks RoadMap,关键技术

  • 向量化引擎
  • Primary Key 表
  • Pipeline 引擎
  • 数据湖分析
  • 存算分离

架构

自成一体,开箱即用,不依赖其他大数据组建。整体结构借鉴 impala,分为 2 种节点:

  • FE:java 实现,使用类 Raft 算法保证元数据一致性,所以 FE 节点数得是奇数
    • 负责元数据管理:库表信息、存储信息等
    • 负责管理客户端 JDBC 连接
    • 负责接收 query、解析 query、生成执行计划和执行计划的调度
  • BE:c++ 实现,数据多副本,保证数据不丢,是个 CP 系统(牺牲可用性来保证数据强一致性)
    • 负责数据读写、存储
    • 负责执行 FE 下发的计算任务

存储部分

查询要快,离不开存储方面的支持:

  1. 自建存储,存储计算一体化,减少网络 IO
  2. 自建列存格式和多级索引,减少数据扫描量
  3. 聚合表模型、物化视图等,可以实现数据的实时预聚合,减少查询时的计算量

数据格式

存储结构由 4 层组成:partition、tablet、rowset、segment。
Segment File(类似 SSTable) 是真正的数据文件,是一个“类 parquet” 的列存储格式(OLAP 数据库都是列存)。整体的文件格式分为数据区域,索引区域和 footer 三个部分,如下图所示:

数据模型

存储部分借鉴了 Google Mesa 。Mesa 是一个分析型的数据仓库系统(highly scalable analytic data warehousing system),主要用于谷歌的广告业务(分析、报表)中。它非常强调“实时性”,它要支持:

  • Near Real-Time Update Throughput :实时的大批量的数据更新
    • 实时更新:must support continuous updates
    • tps 达到几百万:millions of rows updated per second
  • Query Performance:point query 的 p99 要达到几百毫秒,就是要满足这种低时延的场景
  • Atomic Update
  • Consistency and Correctness,所以 StarRocks 是一个 CP 系统,保证数据的强一致性
  • Online Data and Metadata Transformation

为了实现“实时分析”,Mesa 设计了“聚合表模型”,可以在随着数据更新实时更新聚合结果,如图所示

  • 如左边所示,表 schema 字段分为维度和指标。最终数据会根据维度 date、publishId、Country 聚合(这里聚合函数是 sum)出指标值 clicks、cost
  • 如右边所示,当有明细数据不断导入时,会自动更新聚合后的结果

那这种预聚合是怎么实现的呢[8]

  • 采用了多数据版本,每个写入批次会生成一个数据版本,利用这种方式实现了 Atomic Update 和 snapshot isolation
  • 后台会不断通过 2 级 compaction 的方式:base compaction、cumulative compaction 来对多个版本的数据进行合并
  • 查询时,如果没有 compaction 完成,会临时进行数据的聚合,即 merge on read


这种实现的好处是:

  1. 类 lsm Tree 的写入方式,保证了支持大批量的写入吞吐
  2. 2 级 compaction 方式的思想是,将查询时的 merge on read 时长分摊到了一个比较长的时间轴上。进而保证较低的查询时延。所以,compaction 的机制设计非常重要,如何不合理,就会导致查询时延无法保证。

题外话:为什么不建议用 insert into 导入大量数据呢,就是因为一个 insert 语句就会生成一个数据版本,100w 个 insert 语句就生成 100w 个数据版本需要后台 compaction,这远远超过了 compaction 的能力。

数据索引

索引有前缀索引和列级索引[17]:

  • 前缀索引:系统创建,使用表中某行数据的维度列所构成的前缀查找前缀索引表,可以确定该行数据所在逻辑数据块的起始行号。
  • 列级索引[18]:
    • 行号索引:系统创建,使用行号索引可以找到列级数据块(表中每列数据都按 64 KB 分块存储。)
    • ZoneMap 索引:系统创建,存储一列的 max、min 等信息
    • BloomFilter 索引:用户自己创建,快速过滤某个 data page 是否包含某个值
    • Bitmap 索引:用户自己创建,能够提高指定列的查询效率,适用于低基数的列

计算部分

在计算部分 StarRocks 通过以下措施,进而达到极致的查询性能,本篇重点介绍下执行阶段。

  • 执行规划阶段:
    • CBO,生成一个 “最佳的”(执行 Cost 最低)的分布式物理执行计划
    • Join 分布式执行策略选择
    • 低基数字典优化
    • Global Runtime Filter 下推
  • 执行阶段:
    • MPP 多机并行机制来**充分利用多机的资源 **
    • 通过 Pipeline 并行机制来充分利用单机上多核的资源
    • 通过向量化执行来充分利用单核的资源,进而达到极致的查询性能
  • cache:
    • page cache,数据库层面的文件缓存
    • query cache,数据库层面的查询结果缓存

执行流程

查询的执行步骤[10]和 spark sql[9] 执行流程类似:

  1. Query Parsing: SQL 先解析为 AST,这里和 spark sql 一样使用了 Antlr4
  2. Query analysis: 基于 AST 检查绑定 db、tb、clumn 等愿信息,并检查 SQL 的合法性
  3. Query Planing: 将 AST 转为 Logical Plan (Single node plan,不能真正的被分布式调度执行)
  4. Query Optimize: Logical Plan 经过 CBO 优化器生成 physical plan (分布式执行计划)
  5. physical plan 按照 exchange boundaries (shuffle 边界) 进行切分,切分为 plan fragment, A plan fragment encapsulates a portion of the plan tree that operates on the same data partition on a single machine. 就是类似 spark 通过宽依赖和窄依赖切分为 stage。

Query Analyze -> Logical Plan

Optimizer(CBO):
StarRocks Optimizer 的输入是一棵逻辑计划树,输出是一棵 Cost “最低” 的分布式物理计划树。
StarRocks 优化器完全自研,主要基于** Cascades 和 ORCA 论文**实现,并结合 StarRocks 执行器和调度器进行了深度定制,优化和创新。

举个例子:比如下推、比如 join 策略的选择。比如这个 impala 的

MPP 执行

MPP 全称 Massive Parallel Processing(如图,sharde nothing 架构),个人理解,这是概念主要是和 SMP、NUMP 做区分;在大数据领域,这个概念变得模糊,其实我认为 Spark 也是 MPP 的结构,或者说大数据处理引擎就没有不是 MPP 的[11]。

回到 StarRocks

  • 一个 Fragment 可以有多个实例,分布在多个 BE 节点上同时执行(同理,saprk 的 1 个 stage 可以有多个 task)
  • 单个 Fragment 实例的并行度可以调整(同理,spark 可以通过调整 shuffle partitions 调整 task 并行度)
  • 对于 Scan 操作的 Fragment 需要考虑 data locality。回顾存储部分内容,数据是被分捅(tablet)存储的,每个 tablet 又会有多个 replica,所以:
    • 1)Scan 可以并行,每个 tablet 可以对应一个 Fragment Instance
    • 2)由于一个 tablet 多副本,所以 Fragment Instance 可以调度到压力较低的 BE 上
  • 对于非 Scan 操作,可以调度到压力比较小的 BE 节点上执行,总体上调度器会保证每个 BE 节点上的负载均衡。

但有几点不同:

  1. 资源常驻,不需要向一个共享的比如 yarn 去申请,这样会拖慢查询时效
  2. 多个 Fragment 之间会以 Pipeline 的方式在内存中并行执行,而不是像批处理引擎那样 Stage By Stage 执行。

Pipeline

注意:这里的 pipeline 不是“MPP 执行”中所述的 pipeline。这里的 Pipeline 指的是单台 BE 上的调度执行框架。那么为什么需要它呢。

如上所述,在 MPP 执行框架中:

  • SQL 被拆分为 Fragment 后,Fragment Instance 会被调度到 BE 上去执行。
  • 在单台 BE 上,Fragment Instance 得调度比较简单。一个 Fragment Instance 就是生成一个线程去执行。


但是这种方式存在一定的问题:

  • 大量的查询会生成大量的 Fragment Instance, 又会单台 BE 上生成大量的线程。
  • 这些 Fragment Instance 中的 Operator 操作如果包含 IO 操作,如 scan、shuffle,又会导致线程挂起。
  • 于是,这种方式在单台 BE 上,会让操作系统进行频繁的线程创建、调度、销毁,而这需要操作系统频繁的在内核态和用户态之间去切换。从而让 cpu 的能力没有充分的用于执行计算逻辑上

为了解决这个问题,开发了 Pipeline 并行执行框架[12]:

  • Pipeline 框架的核心是实现一个用户态的协程调度,不再依赖操作系统的内核态线程调度,减少线程创建、线程销毁、线程上下文切换的成本。
  • 实现形式上,它是一个多级反馈队列(比较类似 os 的线程调度模式):StarRocks 会启动机器 CPU 核数个执行线程,每个执行线程会从一个多级反馈就绪队列中获取 Ready 状态的 Pipeline 去执行。

  • 而 Pipeline 就是被调度的单位,它就是一组算子构成的链,开始算子为 SourceOperator,末尾算子为 SinkOperator。它是由 Fragment 拆分而来,资源隔离能力的 cpu 隔离也是在这个层面去做的。

向量化

随着计算机技术的发展,cpu 对指令集的支持也在不断的发展。首先我们主要了解这 2 种指令集[13][14]:

  1. SISD,即单指令单数据。如图:4 次加法计算需要执行 4 次指令,从内存 load 和 store 4 次数据,即标量(Scalar, 一个标量就是一个单独的数)计算
  2. SIMD[15],单指令多数据。如图:4 次加法计算只需要执行 1 次指令,从内存 load 和 store 1 次数据,每次可以计算一组数据,即向量(vector, 一个向量就是一列数)计算

回到 StarRocks 的计算上来,对单核上的计算优化,如下就是要减少 CPU Time,减少 Cpu Time 的手段有 2:

  • 减少指令数量,这就可以用到 SIMD,即所谓的向量化
  • 减少 CPI,主要是减少分支预测失败和 Cache Miss,这里不展开

看个例子:如图,使用向量化计算后性能可以得到数倍的一个提升

当然,我们知识介绍了向量化计算的思想,如果要在数据库层面实现向量化,是个巨大的工程。需要存储结构层面、算子层面、内存中的数据组织层面都要做适配。比如列式存储、按列处理就更适合向量化计算。详情可见:
https://mp.weixin.qq.com/s/J6_L6ijaiuuO6FXCa-AW9w

展望未来

现在 OLAP 数据库会面临几个问题:

  1. 存储计算一体化,扩存储就得扩计算,浪费资源;
  2. OLAP 场景主要是报表等,有比较强的潮汐现象,白天资源满,满上资源比较空。然后得按照峰值来部署资源,造成资源浪费。

有鉴于此,所以:

还有一个趋势是 All in one

  • HTAP
  • HSAP
  • presto on spark

引用

  1. cascadez cbo:https://zhuanlan.zhihu.com/p/365085770
  2. columbia cbo: https://zhuanlan.zhihu.com/p/464717139
  3. 开源优化器:https://www.infoq.cn/article/5o16ehoz5zk6fzpsjpt2
  4. 每天数百亿用户行为数据,美团点评怎么实现秒级转化分析?
  5. Comparing Three Real-Time OLAP Databases: Apache Pinot, Apache Druid, and ClickHouse
  6. https://zhuanlan.zhihu.com/p/355312398
  7. https://impala.apache.org/
  8. https://li.feishu.cn/docx/KGtndwglEom70NxsgFDcMo9AnMg
  9. https://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf
  10. https://mp.weixin.qq.com/s/AkTAibnAaM29aC5JJT58YQ
  11. https://mentalmodels4life.net/2016/08/14/apache-spark-vs-mpp-databases/
  12. https://mp.weixin.qq.com/s/fgNwk2DyijKM4D9L77LQNA
  13. https://zhuanlan.zhihu.com/p/31271788
  14. https://zhuanlan.zhihu.com/p/296677429
  15. https://en.wikipedia.org/wiki/Single_instruction,_multiple_data
  16. https://www.jianshu.com/p/a078fed07acc
  17. https://docs.starrocks.io/zh-cn/2.4/table_design/StarRocks_table_design
  18. https://mp.weixin.qq.com/s/aJ3FwDI6KprYYUwXzhl_-A
  19. https://blog.bcmeng.com/post/fastest_database.html
  20. https://en.wikipedia.org/wiki/Comparison_of_OLAP_servers
  21. https://xie.infoq.cn/article/4bdf3da72bc868ad78cf6bf4b
  22. https://blog.bcmeng.com/post/mpp-grouped-excution-stage.html