Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,34 @@ private static Optional<TableState> getTableState(Result r) throws IOException {
*/
public static CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(
AsyncTable<AdvancedScanResultConsumer> metaTable, TableName tableName) {
return toHRegionLocations(getTableRegionsAndLocations(metaTable, tableName, true));
}

/**
* Used to get a single-RPC, paginated slice of region locations for the specific table, starting
* at the meta row derived from {@code startKey} and capped at {@code rowLimit} regions.
* {@code startKey} must be a region start-key boundary (e.g. the end key of the previously
* visited region), or {@code null}/empty to start at the first region.
* @param metaTable scanner over meta table
* @param tableName table we're looking for
* @param startKey region start-key to begin scanning from (inclusive); {@code null} or empty
* starts from the first region
* @param rowLimit maximum number of meta rows to return; if {@code <= 0}, the underlying scan is
* unbounded
* @return the list of region locations. The return value will be wrapped by a
* {@link CompletableFuture}.
*/
public static CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(
AsyncTable<AdvancedScanResultConsumer> metaTable, TableName tableName, byte[] startKey,
int rowLimit) {
return toHRegionLocations(
getTableRegionsAndLocations(metaTable, tableName, true, startKey, rowLimit));
}

private static CompletableFuture<List<HRegionLocation>>
toHRegionLocations(CompletableFuture<List<Pair<RegionInfo, ServerName>>> source) {
CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
addListener(getTableRegionsAndLocations(metaTable, tableName, true), (locations, err) -> {
addListener(source, (locations, err) -> {
if (err != null) {
future.completeExceptionally(err);
} else if (locations == null || locations.isEmpty()) {
Expand Down Expand Up @@ -215,6 +241,39 @@ private static CompletableFuture<List<Pair<RegionInfo, ServerName>>> getTableReg
return future;
}

/**
* Variant of {@link #getTableRegionsAndLocations} that scans a bounded slice of meta starting at
* the row derived from {@code startKey} and stopping after at most {@code rowLimit} rows.
*/
private static CompletableFuture<List<Pair<RegionInfo, ServerName>>> getTableRegionsAndLocations(
final AsyncTable<AdvancedScanResultConsumer> metaTable, final TableName tableName,
final boolean excludeOfflinedSplitParents, final byte[] startKey, final int rowLimit) {
CompletableFuture<List<Pair<RegionInfo, ServerName>>> future = new CompletableFuture<>();
if (TableName.META_TABLE_NAME.equals(tableName)) {
future.completeExceptionally(new IOException(
"This method can't be used to locate meta regions;" + " use MetaTableLocator instead"));
return future;
}

CollectRegionLocationsVisitor visitor =
new CollectRegionLocationsVisitor(excludeOfflinedSplitParents);

byte[] metaStart = (startKey == null || startKey.length == 0)
? getTableStartRowForMeta(tableName, QueryType.REGION)
: RegionInfo.createRegionName(tableName, startKey, HConstants.ZEROES, false);
byte[] metaStop = getTableStopRowForMeta(tableName, QueryType.REGION);

addListener(scanMeta(metaTable, metaStart, metaStop, QueryType.REGION, rowLimit, true, visitor),
(v, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
future.complete(visitor.getResults());
});
return future;
}

/**
* Performs a scan of META table for given table.
* @param metaTable scanner over meta table
Expand All @@ -225,22 +284,26 @@ private static CompletableFuture<List<Pair<RegionInfo, ServerName>>> getTableReg
private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultConsumer> metaTable,
TableName tableName, QueryType type, final Visitor visitor) {
return scanMeta(metaTable, getTableStartRowForMeta(tableName, type),
getTableStopRowForMeta(tableName, type), type, Integer.MAX_VALUE, visitor);
getTableStopRowForMeta(tableName, type), type, Integer.MAX_VALUE, false, visitor);
}

/**
* Performs a scan of META table for given table.
* @param metaTable scanner over meta table
* @param startRow Where to start the scan
* @param stopRow Where to stop the scan
* @param type scanned part of meta
* @param maxRows maximum rows to return
* @param visitor Visitor invoked against each row
* @param metaTable scanner over meta table
* @param startRow Where to start the scan
* @param stopRow Where to stop the scan
* @param type scanned part of meta
* @param maxRows maximum rows to return
* @param isPagedScan when {@code true}, the scan is sized so the whole slice (up to
* {@code maxRows}) returns in a single ScannerNext RPC. When {@code false},
* uses the configured {@code hbase.meta.scanner.caching}.
* @param visitor Visitor invoked against each row
*/
private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultConsumer> metaTable,
byte[] startRow, byte[] stopRow, QueryType type, int maxRows, final Visitor visitor) {
byte[] startRow, byte[] stopRow, QueryType type, int maxRows, boolean isPagedScan,
final Visitor visitor) {
int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE;
Scan scan = getMetaScan(metaTable, rowUpperLimit);
Scan scan = getMetaScan(metaTable, rowUpperLimit, isPagedScan);
for (byte[] family : type.getFamilies()) {
scan.addFamily(family);
}
Expand Down Expand Up @@ -437,7 +500,7 @@ void add(Result r) {
}
}

private static Scan getMetaScan(AsyncTable<?> metaTable, int rowUpperLimit) {
private static Scan getMetaScan(AsyncTable<?> metaTable, int rowUpperLimit, boolean isPagedScan) {
Scan scan = new Scan();
int scannerCaching = metaTable.getConfiguration().getInt(HConstants.HBASE_META_SCANNER_CACHING,
HConstants.DEFAULT_HBASE_META_SCANNER_CACHING);
Expand All @@ -447,11 +510,18 @@ private static Scan getMetaScan(AsyncTable<?> metaTable, int rowUpperLimit) {
) {
scan.setConsistency(Consistency.TIMELINE);
}
if (rowUpperLimit <= scannerCaching) {
if (isPagedScan) {
// Caller is doing a bounded paged scan and expects the whole slice back in one ScannerNext
// RPC. Size caching to the slice. Trade-off: a single larger response uses more RegionServer
// heap, fine for meta rows (small).
scan.setLimit(rowUpperLimit);
scan.setCaching(rowUpperLimit);
} else {
if (rowUpperLimit <= scannerCaching) {
scan.setLimit(rowUpperLimit);
}
scan.setCaching(Math.min(rowUpperLimit, scannerCaching));
}
int rows = Math.min(rowUpperLimit, scannerCaching);
scan.setCaching(rows);
return scan;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,11 @@ void addLocationToCache(HRegionLocation loc) {
getTableCache(loc.getRegion().getTable()).regionLocationCache.add(createRegionLocations(loc));
}

RegionLocations getCachedLocation(TableName tableName, byte[] startKey) {
TableCache tableCache = cache.get(tableName);
return tableCache == null ? null : tableCache.regionLocationCache.get(startKey);
}

private HRegionLocation getCachedLocation(HRegionLocation loc) {
TableCache tableCache = cache.get(loc.getRegion().getTable());
if (tableCache == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Pair;
Expand Down Expand Up @@ -112,6 +113,54 @@ default CompletableFuture<List<HRegionLocation>> getRegionLocations(byte[] row)
*/
CompletableFuture<List<HRegionLocation>> getAllRegionLocations();

/**
* Bulk lookup of region locations from {@code hbase:meta} in a single RPC, starting at
* {@code startKey} (region start-key boundary, inclusive) and returning at most {@code limit}
* regions in start-key order.
* <p/>
* The returned list includes all replicas of each region (matching
* {@link #getAllRegionLocations()}), and the result is also written to the connection's region
* location cache.
* <p/>
* Ordering: regions are returned in ascending region start-key order (the natural order of
* {@code hbase:meta} rows for a single table). Within each region, replicas are returned in
* ascending replica-id order (replica 0, then 1, then 2, ...). Split parents are filtered out,
* which may cause a page to contain fewer than {@code limit} regions but never disturbs ordering
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about merged regions ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for flagging — let me share my understanding and please correct me if I've got this wrong.

I think merge parents don't need filtering here because of how splits vs merges record cleanup state in hbase:meta:

  • Split: the parent row stays around with setOffline(true).setSplit(true) (see RegionStateStore#splitRegion) and carries info:splitA/info:splitB pointers. The catalog janitor seems to use that row to track whether daughters still hold HFile references, and only deletes the parent row + HDFS dir once references are gone. So during that window, meta scans do see the parent — which is why excludeOfflinedSplitParents exists.

  • Merge: from what I can see in RegionStateStore#mergeRegions, the same multi-mutate that commits the merge deletes every parent row and writes a single child row with info:merge_0000, info:merge_0001, … pointing back at the parents. So the "waiting on HFile cleanup" bookkeeping lives on the child row, not on the parents.

If that's right, a meta scan should never see merge parents, and the merged child row is itself the live region for that key range — which is what we'd want to return. This also seems to match getAllRegionLocations()'s existing behavior; both share CollectRegionLocationsVisitor(excludeOfflinedSplitParents=true).

Does this line up with your understanding? Happy to add a comment near the visitor call site noting this if it'd help future readers, or revisit the filter if I'm missing a case.

* of the survivors.
* <p/>
* To page through all regions of a table, call repeatedly passing
* {@code last.getRegion().getEndKey()} as the next {@code startKey}, where {@code last} is the
* final element of the previous response. All replicas of a region share the same
* {@link RegionInfo}, so the last entry's end key is the correct cursor regardless of which
* replica it is. Pass {@code null} for the first call. Stop paging when the returned list is
* empty or when the last region's end key is {@link HConstants#EMPTY_END_ROW} (zero-length) -
* that signals the end of the table; passing it back in would re-scan from the beginning since by
* convention an empty start key means "from the first region".
* <p/>
* Unlike {@link #getAllRegionLocations()}, this method performs at most one RPC against
* {@code hbase:meta} per invocation, so its latency is bounded by {@code limit} rather than table
* size. Note that this method does not coordinate with other in-flight meta lookups on the
* connection - aggregate pacing across concurrent callers is the caller's responsibility.
* <p/>
* This method is optional. Implementations that cannot support paginated lookups will return a
* future that completes exceptionally with {@link UnsupportedOperationException} (the default
* behavior); callers should fall back to {@link #getAllRegionLocations()} in that case.
* @param startKey region start-key to begin scanning from (inclusive); {@code null} or empty
* starts from the first region
* @param limit maximum number of regions to return; if &lt;= 0, falls back to
* {@code hbase.meta.scanner.caching}
* @return up to {@code limit} {@link HRegionLocation}s in start-key order, possibly empty when no
* more regions exist; errors are reported via the returned future
*/
default CompletableFuture<List<HRegionLocation>> getRegionLocationsPage(byte[] startKey,
int limit) {
CompletableFuture<List<HRegionLocation>> failed = new CompletableFuture<>();
failed.completeExceptionally(new UnsupportedOperationException(
"getRegionLocationsPage(byte[], int) is not supported by this AsyncTableRegionLocator;"
+ " fall back to getAllRegionLocations()"));
return failed;
}

/**
* Gets the starting row key for every region in the currently open table.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.ClientMetaTableAccessor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -81,6 +83,39 @@ public CompletableFuture<List<HRegionLocation>> getRegionLocations(byte[] row, b
.thenApply(locs -> Arrays.asList(locs.getRegionLocations()));
}

@Override
public CompletableFuture<List<HRegionLocation>> getRegionLocationsPage(byte[] startKey,
int limit) {
return tracedFuture(() -> {
if (TableName.isMetaTableName(tableName)) {
CompletableFuture<List<HRegionLocation>> failed = new CompletableFuture<>();
failed.completeExceptionally(
new IOException("getRegionLocationsPage(startKey, limit) is not supported for hbase:meta;"
+ " use getRegionLocation(EMPTY_START_ROW) instead."));
return failed;
}
int effectiveLimit = limit > 0
? limit
: conn.getConfiguration().getInt(HConstants.HBASE_META_SCANNER_CACHING,
HConstants.DEFAULT_HBASE_META_SCANNER_CACHING);
CompletableFuture<List<HRegionLocation>> future =
ClientMetaTableAccessor.getTableHRegionLocations(conn.getTable(TableName.META_TABLE_NAME),
tableName, startKey, effectiveLimit);
addListener(future, (locs, error) -> {
if (error != null || locs == null) {
return;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if there is error ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — let me explain the intent and please push back if it's not the right call.

The listener at AsyncTableRegionLocatorImpl.java:104 is a side-effect-only hook for cache warm-up. The future returned to the caller is the one from ClientMetaTableAccessor.getTableHRegionLocations, which already completes exceptionally on any meta-scan error (see getTableRegionsAndLocations's future.completeExceptionally(error) paths). So the error propagates to the caller through the returned future regardless of what the listener does.

On the listener side, if error != null there's simply nothing valid to cache, and if locs == null there's nothing to iterate. Either way, returning early is the safe behavior — we don't want to mask the error or double-fail the future, just skip cache population for this call. The next call to getRegionLocationsPage (or any locator path) will populate normally.

That said, I can add a LOG.debug (or LOG.warn) in the error branch so the silent skip is observable, e.g.:

if (error != null) {
  LOG.debug("Skipping cache warm-up for {}: {}", tableName, error.toString());
  return;
}
if (locs == null) {
  return;
}

Happy to add that if you think it's worth it.

for (HRegionLocation loc : locs) {
// the cache assumes that all locations have a serverName. only add if that's true
if (loc.getServerName() != null) {
conn.getLocator().getNonMetaRegionLocator().addLocationToCache(loc);
}
}
});
return future;
}, getClass().getSimpleName() + ".getRegionLocationsPage");
}

@Override
public void clearRegionLocationCache() {
conn.getLocator().clearCache(tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ class ConnectionOverAsyncConnection implements Connection {
this.connConf = new ConnectionConfiguration(conn.getConfiguration());
}

AsyncConnectionImpl getAsyncConnection() {
return conn;
}

@Override
public void abort(String why, Throwable error) {
if (error != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Pair;
Expand Down Expand Up @@ -130,6 +131,53 @@ default List<HRegionLocation> getRegionLocations(byte[] row) throws IOException
*/
List<HRegionLocation> getAllRegionLocations() throws IOException;

/**
* Bulk lookup of region locations from {@code hbase:meta} in a single RPC, starting at
* {@code startKey} (region start-key boundary, inclusive) and returning at most {@code limit}
* regions in start-key order.
* <p/>
* The returned list includes all replicas of each region (matching
* {@link #getAllRegionLocations()}), and the result is also written to the connection's region
* location cache.
* <p/>
* Ordering: regions are returned in ascending region start-key order (the natural order of
* {@code hbase:meta} rows for a single table). Within each region, replicas are returned in
* ascending replica-id order (replica 0, then 1, then 2, ...). Split parents are filtered out,
* which may cause a page to contain fewer than {@code limit} regions but never disturbs ordering
* of the survivors.
* <p/>
* To page through all regions of a table, call repeatedly passing
* {@code last.getRegion().getEndKey()} as the next {@code startKey}, where {@code last} is the
* final element of the previous response. All replicas of a region share the same
* {@link RegionInfo}, so the last entry's end key is the correct cursor regardless of which
* replica it is. Pass {@code null} for the first call. Stop paging when the returned list is
* empty or when the last region's end key is {@link HConstants#EMPTY_END_ROW} (zero-length) -
* that signals the end of the table; passing it back in would re-scan from the beginning since by
* convention an empty start key means "from the first region".
* <p/>
* Unlike {@link #getAllRegionLocations()}, this method performs at most one RPC against
* {@code hbase:meta} per invocation, so its latency is bounded by {@code limit} rather than table
* size.
* <p/>
* This method is optional. Implementations that cannot support paginated lookups should throw
* {@link UnsupportedOperationException} (the default behavior); callers should fall back to
* {@link #getAllRegionLocations()} in that case.
* @param startKey region start-key to begin scanning from (inclusive); {@code null} or empty
* starts from the first region
* @param limit maximum number of regions to return; if &lt;= 0, falls back to
* {@code hbase.meta.scanner.caching}
* @return up to {@code limit} {@link HRegionLocation}s in start-key order, possibly empty when no
* more regions exist
* @throws IOException if a remote or network exception occurs
* @throws UnsupportedOperationException if this implementation does not support paginated lookups
*/
default List<HRegionLocation> getRegionLocationsPage(byte[] startKey, int limit)
throws IOException {
throw new UnsupportedOperationException(
"getRegionLocationsPage(byte[], int) is not supported by this RegionLocator;"
+ " fall back to getAllRegionLocations()");
}

/**
* Gets the starting row key for every region in the currently open table.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ public List<HRegionLocation> getAllRegionLocations() throws IOException {
return get(locator.getAllRegionLocations());
}

@Override
public List<HRegionLocation> getRegionLocationsPage(byte[] startKey, int limit)
throws IOException {
return get(locator.getRegionLocationsPage(startKey, limit));
}

@Override
public TableName getName() {
return locator.getName();
Expand Down
Loading