/** * Like {@link #fetchBlockByteRange}except we start up a second, parallel, * 'hedged' read if the first read is taking longer than configured amount of * time. We then wait on which ever read returns first. */ privatevoidhedgedFetchBlockByteRange(LocatedBlock block, long start, long end, ByteBuffer buf, CorruptedBlocks corruptedBlocks) throws IOException { finalDfsClientConfconf= dfsClient.getConf(); ArrayList<Future<ByteBuffer>> futures = newArrayList<>(); CompletionService<ByteBuffer> hedgedService = newExecutorCompletionService<>(dfsClient.getHedgedReadsThreadPool()); ArrayList<DatanodeInfo> ignored = newArrayList<>(); ByteBuffer bb; intlen= (int) (end - start + 1); inthedgedReadId=0; while (true) { // see HDFS-6591, this metric is used to verify/catch unnecessary loops hedgedReadOpsLoopNumForTesting++; DNAddrPairchosenNode=null; // there is no request already executing. if (futures.isEmpty()) { // chooseDataNode is a commitment. If no node, we go to // the NN to reget block locations. Only go here on first read. chosenNode = chooseDataNode(block, ignored); // Latest block, if refreshed internally block = chosenNode.block; bb = ByteBuffer.allocate(len); Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode( chosenNode, block, start, end, bb, corruptedBlocks, hedgedReadId++); Future<ByteBuffer> firstRequest = hedgedService .submit(getFromDataNodeCallable); futures.add(firstRequest); Future<ByteBuffer> future = null; try { future = hedgedService.poll( conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS); if (future != null) { ByteBufferresult= future.get(); result.flip(); buf.put(result); return; } DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged " + "read", conf.getHedgedReadThresholdMillis(), chosenNode.info); dfsClient.getHedgedReadMetrics().incHedgedReadOps(); // continue; no need to refresh block locations } catch (ExecutionException e) { futures.remove(future); } catch (InterruptedException e) { thrownewInterruptedIOException( "Interrupted while waiting for reading task"); } // Ignore this node on next go around. // If poll timeout and the request still ongoing, don't consider it // again. If read data failed, don't consider it either. ignored.add(chosenNode.info); } else { // We are starting up a 'hedged' read. We have a read already // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode. // If no nodes to do hedged reads against, pass. booleanrefetch=false; try { chosenNode = chooseDataNode(block, ignored, false); if (chosenNode != null) { // Latest block, if refreshed internally block = chosenNode.block; bb = ByteBuffer.allocate(len); Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(chosenNode, block, start, end, bb, corruptedBlocks, hedgedReadId++); Future<ByteBuffer> oneMoreRequest = hedgedService.submit(getFromDataNodeCallable); futures.add(oneMoreRequest); } else { refetch = true; } } catch (IOException ioe) { DFSClient.LOG.debug("Failed getting node for hedged read: {}", ioe.getMessage()); } // if not succeeded. Submit callables for each datanode in a loop, wait // for a fixed interval and get the result from the fastest one. try { ByteBufferresult= getFirstToComplete(hedgedService, futures); // cancel the rest. cancelAll(futures); dfsClient.getHedgedReadMetrics().incHedgedReadWins(); result.flip(); buf.put(result); return; } catch (InterruptedException ie) { // Ignore and retry } if (refetch) { refetchLocations(block, ignored); } // We got here if exception. Ignore this node on next go around IFF // we found a chosenNode to hedge read against. if (chosenNode != null && chosenNode.info != null) { ignored.add(chosenNode.info); } } } }