使用Hdfs Hedged Read机制解决Hive外表查询慢的问题

问题描述

我们使用 StarRocks 的 Hive 外表来进行 Hive 的查询加速,但是发现一个问题:有一些 SQL 查询会偶发性的慢,如 SQL A 大部分时间都是 1s 完成查询,但是偶尔就会超过 30s 且马上再查就很快;经过对查询变慢时间段的监控查看,资源都是充足的。

于是进行进一步排查。

排查过程

  1. 复现慢的 SQL 后,分析 profile 文件,发现:有 2 个表 __MAX_OF_ScanTime 比较大
1
2
3
- ScanTime: 371.20ms
- __MAX_OF_ScanTime: 30s708ms
- __MIN_OF_ScanTime: 21.921ms
  1. 看起来是 Scan hdfs 文件比较慢,但是经过查看后,查询的 hdfs 文件都只有几 kb 很小,不应该花这么长时间
  2. 查看 BE 节点的 be.out 日志,寻找该 SQL 读取 hdfs 文件的操作记录如下,发现读取大部分 hdfs 文件都很快,就是最后一个 hdfs 文件读的很慢。如下最后一个文件成功打开时间和倒数第二个文件刚好差了 30s,说明就是这个文件慢;

  1. 但是最后这个文件也很小,为什么会慢呢;对比同 SQL 执行很快时的 be.out 日志,发现这个文件也是读的很快的。
  2. 根据 open file success定位 BE 的实现代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Status HdfsScanner::open(RuntimeState* runtime_state) {
if (_opened) {
return Status::OK();
}
_build_scanner_context();
auto status = do_open(runtime_state);
if (status.ok()) {
_opened = true;
if (_scanner_params.open_limit != nullptr) {
_scanner_params.open_limit->fetch_add(1, std::memory_order_relaxed);
}
VLOG_FILE << "open file success: " << _scanner_params.path;
}
return status;
}
  1. 查看 namenode 的 hdfs-audit.log 日志,发现 namenode 接收到读这个文件的请求是很快的,没有过多延迟;所以应该是慢再了从 hdfs 读数据上。也就是慢在了这个 do_open(runtime_state) 方法上
1
2
3
4
5
6
7
8
9
Status HdfsParquetScanner::do_open(RuntimeState* runtime_state) {
RETURN_IF_ERROR(open_random_access_file());
// create file reader
_reader = std::make_shared<parquet::FileReader>(runtime_state->chunk_size(), _file.get(),
_scanner_params.scan_ranges[0]->file_length);
SCOPED_RAW_TIMER(&_stats.reader_init_ns);
RETURN_IF_ERROR(_reader->init(&_scanner_ctx));
return Status::OK();
}
  1. 而 BE 是用 c++ 开发的,读取 hdfs 文件使用的是 libhdfs,调用的方法是这个;读 hdfs 文件原理可见:https://blog.csdn.net/qq_45744501/article/details/120629378
1
hdfsFile file = hdfsOpenFile(handle.hdfs_fs, path.c_str(), flags, hdfs_write_buffer_size, 0, 0);
  1. 既然读取 hdfs 慢,那可能是 datanode 的问题,查看慢的文件所在 datanode 的监控,发现在查询慢的时候 cpu 占比很好;至此定位了原因是: hdfs datanode 节点 cpu 过高导致响应慢的问题。

解决方案

我们都知道 hdfs 是 3 副本,同一个文件会存在 3 个 datanode 上。而 hdfs client 刚好读取到了那个慢的节点导致的慢。这个问题可以使用 Hdfs Hedged Read 机制来解决。这个机制的原理就是:

  1. 先从一个 datanode 获取 block
  2. 如果限定时间内获取不到,就尝试从第二个 datanode 获取,依次类推。

datanode 同时慢的概率还是比较低的。使用上也很简单,在 hdfs 的 conf 中配置下下就行如下:

1
2
3
4
5
6
7
8
9
<property>
<name>dfs.client.hedged.read.threadpool.size</name>
<value>20</value>
</property>

<property>
<name>dfs.client.hedged.read.threshold.millis</name>
<value>6000</value>
</property>

Hdfs Hedged Read 的实现代码:

  • 位置:hadoop-hdfs-client: src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
/**
* Like {@link #fetchBlockByteRange}except we start up a second, parallel,
* 'hedged' read if the first read is taking longer than configured amount of
* time. We then wait on which ever read returns first.
*/
private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
long end, ByteBuffer buf, CorruptedBlocks corruptedBlocks)
throws IOException {
final DfsClientConf conf = dfsClient.getConf();
ArrayList<Future<ByteBuffer>> futures = new ArrayList<>();
CompletionService<ByteBuffer> hedgedService =
new ExecutorCompletionService<>(dfsClient.getHedgedReadsThreadPool());
ArrayList<DatanodeInfo> ignored = new ArrayList<>();
ByteBuffer bb;
int len = (int) (end - start + 1);
int hedgedReadId = 0;
while (true) {
// see HDFS-6591, this metric is used to verify/catch unnecessary loops
hedgedReadOpsLoopNumForTesting++;
DNAddrPair chosenNode = null;
// there is no request already executing.
if (futures.isEmpty()) {
// chooseDataNode is a commitment. If no node, we go to
// the NN to reget block locations. Only go here on first read.
chosenNode = chooseDataNode(block, ignored);
// Latest block, if refreshed internally
block = chosenNode.block;
bb = ByteBuffer.allocate(len);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
chosenNode, block, start, end, bb,
corruptedBlocks, hedgedReadId++);
Future<ByteBuffer> firstRequest = hedgedService
.submit(getFromDataNodeCallable);
futures.add(firstRequest);
Future<ByteBuffer> future = null;
try {
future = hedgedService.poll(
conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS);
if (future != null) {
ByteBuffer result = future.get();
result.flip();
buf.put(result);
return;
}
DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged "
+ "read", conf.getHedgedReadThresholdMillis(), chosenNode.info);
dfsClient.getHedgedReadMetrics().incHedgedReadOps();
// continue; no need to refresh block locations
} catch (ExecutionException e) {
futures.remove(future);
} catch (InterruptedException e) {
throw new InterruptedIOException(
"Interrupted while waiting for reading task");
}
// Ignore this node on next go around.
// If poll timeout and the request still ongoing, don't consider it
// again. If read data failed, don't consider it either.
ignored.add(chosenNode.info);
} else {
// We are starting up a 'hedged' read. We have a read already
// ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
// If no nodes to do hedged reads against, pass.
boolean refetch = false;
try {
chosenNode = chooseDataNode(block, ignored, false);
if (chosenNode != null) {
// Latest block, if refreshed internally
block = chosenNode.block;
bb = ByteBuffer.allocate(len);
Callable<ByteBuffer> getFromDataNodeCallable =
getFromOneDataNode(chosenNode, block, start, end, bb,
corruptedBlocks, hedgedReadId++);
Future<ByteBuffer> oneMoreRequest =
hedgedService.submit(getFromDataNodeCallable);
futures.add(oneMoreRequest);
} else {
refetch = true;
}
} catch (IOException ioe) {
DFSClient.LOG.debug("Failed getting node for hedged read: {}",
ioe.getMessage());
}
// if not succeeded. Submit callables for each datanode in a loop, wait
// for a fixed interval and get the result from the fastest one.
try {
ByteBuffer result = getFirstToComplete(hedgedService, futures);
// cancel the rest.
cancelAll(futures);
dfsClient.getHedgedReadMetrics().incHedgedReadWins();
result.flip();
buf.put(result);
return;
} catch (InterruptedException ie) {
// Ignore and retry
}
if (refetch) {
refetchLocations(block, ignored);
}
// We got here if exception. Ignore this node on next go around IFF
// we found a chosenNode to hedge read against.
if (chosenNode != null && chosenNode.info != null) {
ignored.add(chosenNode.info);
}
}
}
}

又出问题

定位了原因,又有解决方案,于是我们开开心心的来测试 Hdfs Hedged Read 功能。通过加日志的方式发现一个奇怪的现象。

测试步骤:

  • 打开 Hdfs Hedged Read,dfs.client.hedged.read.threshold.millis 设置为 6s
  • 禁用一个 hdfs 文件所在的 datanode 的网络
1
sudo iptables -A OUTPUT -d xxx -j DROP
  • 这样测试的 SQL,预期应该读取这个 hdfs 文件时,从第一个 datanode 读取等待 6s,然后会从第二个 datanode 读取。所以这个 SQL 应该比正常执行慢 6s。

BUT,经过测试发现实际这个 SQL 慢了 12s。这是为什么呢?经过又一轮分析,发现是因为:StarRocks 读取 hdfs parquet 文件分 2 步

  1. 第一步读取 footer
  2. 第二步读取数据

这 2 步都会调用 hdfs client 读文件,所以导致每一步都触发了测试的逻辑慢了 6s,2 步加起来就慢了 12s 多。其实这个文件非常小只有几 kb,完全可以一次性读完,这也算是 StarRocks 可以优化的一个点吧。如下 profile 所示:GroupChunkRead 是读文件,ReaderInitFooterRead 是读 footer。可以看到都是用了 6s 多。

StarRocks 读取 hdfs parquet 文件的 2 个步实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Status FileReader::init(vectorized::HdfsScannerContext* ctx) {
_scanner_ctx = ctx;
// 读取 footer
RETURN_IF_ERROR(_parse_footer());

std::unordered_set<std::string> names;
_file_metadata->schema().get_field_names(&names);
_scanner_ctx->set_columns_from_file(names);
ASSIGN_OR_RETURN(_is_file_filtered, _scanner_ctx->should_skip_by_evaluating_not_existed_slots());
if (_is_file_filtered) {
return Status::OK();
}
_prepare_read_columns();
// 为读文件做准备, 真正的读取动作在 Status FileReader::get_next(vectorized::ChunkPtr* chunk) 方法中
RETURN_IF_ERROR(_init_group_readers());
return Status::OK();
}