diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java index 1819dda048dc..e1fd74922f0d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java @@ -356,7 +356,7 @@ public static Result scanByRegionEncodedName(Connection connection, String regio throws IOException { RowFilter rowFilter = new RowFilter(CompareOperator.EQUAL, new SubstringComparator(regionEncodedName)); - Scan scan = getMetaScan(connection.getConfiguration(), 1); + Scan scan = getMetaScan(connection.getConfiguration(), 1, false); scan.setFilter(rowFilter); try (Table table = getMetaHTable(connection); ResultScanner resultScanner = table.getScanner(scan)) { @@ -558,13 +558,13 @@ public static Scan getScanForTableName(Configuration conf, TableName tableName) // Stop key appends the smallest possible char to the table name byte[] stopKey = getTableStopRowForMeta(tableName, QueryType.REGION); - Scan scan = getMetaScan(conf, -1); + Scan scan = getMetaScan(conf, -1, false); scan.setStartRow(startKey); scan.setStopRow(stopKey); return scan; } - private static Scan getMetaScan(Configuration conf, int rowUpperLimit) { + private static Scan getMetaScan(Configuration conf, int rowUpperLimit, boolean isPagedScan) { Scan scan = new Scan(); int scannerCaching = conf.getInt(HConstants.HBASE_META_SCANNER_CACHING, HConstants.DEFAULT_HBASE_META_SCANNER_CACHING); @@ -575,7 +575,14 @@ private static Scan getMetaScan(Configuration conf, int rowUpperLimit) { scan.setLimit(rowUpperLimit); scan.setReadType(Scan.ReadType.PREAD); } - scan.setCaching(scannerCaching); + if (isPagedScan) { + // Caller is doing a bounded paged scan and expects the whole slice back in one ScannerNext + // RPC. Size caching to the slice. Trade-off: a single larger response uses more + // RegionServer heap, fine for meta rows (small). + scan.setCaching(rowUpperLimit); + } else { + scan.setCaching(scannerCaching); + } scan.setPriority(HConstants.INTERNAL_READ_QOS); return scan; } @@ -706,6 +713,25 @@ public static void scanMetaForTableRegions(Connection connection, Visitor visito scanMetaForTableRegions(connection, visitor, tableName, CatalogReplicaMode.NONE); } + /** + * Scan meta for regions of {@code tableName}, starting at the meta row derived from + * {@code startRow} and returning at most {@code rowLimit} rows. {@code startRow} must be a region + * start-key boundary (e.g. the end key of the previously visited region), or {@code null}/empty + * to start at the first region. The scan is sized so that the whole {@code rowLimit}-row slice + * comes back in a single ScannerNext RPC, regardless of the configured + * {@code hbase.meta.scanner.caching}. + */ + public static void scanMetaForTableRegions(Connection connection, Visitor visitor, + TableName tableName, byte[] startRow, int rowLimit, CatalogReplicaMode metaReplicaMode) + throws IOException { + byte[] metaStart = (startRow == null || startRow.length == 0) + ? getTableStartRowForMeta(tableName, QueryType.REGION) + : RegionInfo.createRegionName(tableName, startRow, HConstants.ZEROES, false); + byte[] metaStop = getTableStopRowForMeta(tableName, QueryType.REGION); + scanMeta(connection, metaStart, metaStop, QueryType.REGION, null, rowLimit, true, visitor, + metaReplicaMode); + } + private static void scanMeta(Connection connection, TableName table, QueryType type, int maxRows, final Visitor visitor, CatalogReplicaMode metaReplicaMode) throws IOException { scanMeta(connection, getTableStartRowForMeta(table, type), getTableStopRowForMeta(table, type), @@ -760,8 +786,15 @@ static void scanMeta(Connection connection, @Nullable final byte[] startRow, private static void scanMeta(Connection connection, @Nullable final byte[] startRow, @Nullable final byte[] stopRow, QueryType type, @Nullable Filter filter, int maxRows, final Visitor visitor, CatalogReplicaMode metaReplicaMode) throws IOException { + scanMeta(connection, startRow, stopRow, type, filter, maxRows, false, visitor, metaReplicaMode); + } + + private static void scanMeta(Connection connection, @Nullable final byte[] startRow, + @Nullable final byte[] stopRow, QueryType type, @Nullable Filter filter, int maxRows, + boolean isPagedScan, final Visitor visitor, CatalogReplicaMode metaReplicaMode) + throws IOException { int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE; - Scan scan = getMetaScan(connection.getConfiguration(), rowUpperLimit); + Scan scan = getMetaScan(connection.getConfiguration(), rowUpperLimit, isPagedScan); for (byte[] family : type.getFamilies()) { scan.addFamily(family); @@ -830,7 +863,7 @@ private static void scanMeta(Connection connection, @Nullable final byte[] start private static RegionInfo getClosestRegionInfo(Connection connection, @NonNull final TableName tableName, @NonNull final byte[] row) throws IOException { byte[] searchRow = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false); - Scan scan = getMetaScan(connection.getConfiguration(), 1); + Scan scan = getMetaScan(connection.getConfiguration(), 1, false); scan.setReversed(true); scan.withStartRow(searchRow); try (ResultScanner resultScanner = getMetaHTable(connection).getScanner(scan)) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index e50f57b9fca1..66975cde6223 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -1146,11 +1146,7 @@ rpcControllerFactory, getMetaLookupPool(), connectionConfig.getMetaReadRpcTimeou } } finally { if (lockedUserRegion) { - userRegionLock.unlock(); - // update duration of the lock being held - if (metrics != null) { - metrics.updateUserRegionLockHeld(EnvironmentEdgeManager.currentTime() - lockStartTime); - } + releaseUserRegionLock(lockStartTime); } } try { @@ -1185,6 +1181,19 @@ void takeUserRegionLock() throws IOException { } } + /** + * Release {@link #userRegionLock} previously acquired via {@link #takeUserRegionLock()} and + * record the held duration in metrics. + * @param lockStartTimeMs value of {@link EnvironmentEdgeManager#currentTime()} captured + * immediately after {@link #takeUserRegionLock()} returned + */ + void releaseUserRegionLock(long lockStartTimeMs) { + userRegionLock.unlock(); + if (metrics != null) { + metrics.updateUserRegionLockHeld(EnvironmentEdgeManager.currentTime() - lockStartTimeMs); + } + } + /** * Put a newly discovered HRegionLocation into the cache. * @param tableName The table name. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HRegionLocator.java index 50e4a26c07c5..ee6c36e4c8a9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HRegionLocator.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.trace.TableSpanBuilder; import org.apache.hadoop.hbase.trace.TraceUtil; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; @@ -111,6 +112,63 @@ public List getAllRegionLocations() throws IOException { }, HRegionLocator::getRegionNames, supplier); } + @Override + public List getRegionLocationsPage(byte[] startKey, int limit) + throws IOException { + if (TableName.isMetaTableName(tableName)) { + throw new IOException( + "getRegionLocationsPage(startKey, limit) is not supported for hbase:meta;" + + " use getRegionLocation(EMPTY_START_ROW) instead."); + } + final int effectiveLimit = limit > 0 + ? limit + : connection.getConfiguration().getInt(HConstants.HBASE_META_SCANNER_CACHING, + HConstants.DEFAULT_HBASE_META_SCANNER_CACHING); + final byte[] effectiveStart = startKey == null ? HConstants.EMPTY_START_ROW : startKey; + final CatalogReplicaMode metaReplicaMode = CatalogReplicaMode.fromString(connection + .getConfiguration().get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString())); + + final Supplier supplier = new TableSpanBuilder(connection) + .setName("HRegionLocator.getRegionLocationsPage").setTableName(tableName); + return tracedLocationFuture(() -> { + final List out = new ArrayList<>(effectiveLimit); + MetaTableAccessor.Visitor visitor = new MetaTableAccessor.TableVisitorBase(tableName) { + @Override + public boolean visitInternal(Result result) throws IOException { + RegionLocations locs = MetaTableAccessor.getRegionLocations(result); + if (locs == null) { + return true; + } + for (HRegionLocation loc : locs.getRegionLocations()) { + if (loc != null) { + out.add(loc); + } + } + RegionLocations cleaned = locs.removeElementsWithNullLocation(); + if (cleaned != null) { + connection.cacheLocation(tableName, cleaned); + } + return true; + } + }; + + boolean locked = false; + long lockStart = 0; + try { + connection.takeUserRegionLock(); + lockStart = EnvironmentEdgeManager.currentTime(); + locked = true; + MetaTableAccessor.scanMetaForTableRegions(connection, visitor, tableName, effectiveStart, + effectiveLimit, metaReplicaMode); + } finally { + if (locked) { + connection.releaseUserRegionLock(lockStart); + } + } + return out; + }, HRegionLocator::getRegionNames, supplier); + } + private static List getRegionNames(List locations) { if (CollectionUtils.isEmpty(locations)) { return Collections.emptyList(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java index 40f31b06f25f..40a5678f3b7a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.List; import java.util.stream.Collectors; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Pair; @@ -130,6 +131,54 @@ default List getRegionLocations(byte[] row) throws IOException */ List getAllRegionLocations() throws IOException; + /** + * Bulk lookup of region locations from {@code hbase:meta} in a single RPC, starting at + * {@code startKey} (region start-key boundary, inclusive) and returning at most {@code limit} + * regions in start-key order. + *

+ * The returned list includes all replicas of each region (matching + * {@link #getAllRegionLocations()}), and the result is also written to the connection's region + * location cache. + *

+ * Ordering: regions are returned in ascending region start-key order (the natural order of + * {@code hbase:meta} rows for a single table). Within each region, replicas are returned in + * ascending replica-id order (replica 0, then 1, then 2, ...). Split parents and offline regions + * are filtered out, which may cause a page to contain fewer than {@code limit} regions but never + * disturbs ordering of the survivors. + *

+ * To page through all regions of a table, call repeatedly passing + * {@code last.getRegion().getEndKey()} as the next {@code startKey}, where {@code last} is the + * final element of the previous response. All replicas of a region share the same + * {@link RegionInfo}, so the last entry's end key is the correct cursor regardless of which + * replica it is. Pass {@code null} for the first call. Stop paging when the returned list is + * empty or when the last region's end key is {@link HConstants#EMPTY_END_ROW} (zero-length) - + * that signals the end of the table; passing it back in would re-scan from the beginning since by + * convention an empty start key means "from the first region". + *

+ * Unlike {@link #getAllRegionLocations()}, this method performs at most one RPC against + * {@code hbase:meta} per invocation, so its latency is bounded by {@code limit} rather than table + * size. Suitable for callers that wrap meta lookups in a lock with a fixed timeout, e.g. for bulk + * region-cache warmup. + *

+ * This method is optional. Implementations that cannot support paginated lookups should throw + * {@link UnsupportedOperationException} (the default behavior); callers should fall back to + * {@link #getAllRegionLocations()} in that case. + * @param startKey region start-key to begin scanning from (inclusive); {@code null} or empty + * starts from the first region + * @param limit maximum number of regions to return; if <= 0, falls back to + * {@code hbase.meta.scanner.caching} + * @return up to {@code limit} {@link HRegionLocation}s in start-key order, possibly empty when no + * more regions exist + * @throws IOException if a remote or network exception occurs + * @throws UnsupportedOperationException if this implementation does not support paginated lookups + */ + default List getRegionLocationsPage(byte[] startKey, int limit) + throws IOException { + throw new UnsupportedOperationException( + "getRegionLocationsPage(byte[], int) is not supported by this RegionLocator;" + + " fall back to getAllRegionLocations()"); + } + /** * Gets the starting row key for every region in the currently open table. *

diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocator.java index 531e36175766..2198defd16ae 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocator.java @@ -17,17 +17,29 @@ */ package org.apache.hadoop.hbase.client; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + import java.io.IOException; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; +import org.junit.Test; import org.junit.experimental.categories.Category; @Category({ MediumTests.class, ClientTests.class }) @@ -97,4 +109,185 @@ protected void clearCache(TableName tableName) throws IOException { locator.clearRegionLocationCache(); } } + + @Test + public void testGetRegionLocationsFirstPage() throws IOException { + try (RegionLocator locator = UTIL.getConnection().getRegionLocator(TABLE_NAME)) { + List page = locator.getRegionLocationsPage(HConstants.EMPTY_START_ROW, 3); + assertEquals(3 * REGION_REPLICATION, page.size()); + // Contract: regions in ascending start-key order, replicas in ascending replicaId order + // within each region. + byte[][] expectedStartKeys = + new byte[][] { HConstants.EMPTY_START_ROW, SPLIT_KEYS[0], SPLIT_KEYS[1] }; + for (int i = 0; i < 3; i++) { + for (int replicaId = 0; replicaId < REGION_REPLICATION; replicaId++) { + HRegionLocation loc = page.get(i * REGION_REPLICATION + replicaId); + assertArrayEquals("region " + i + " replica " + replicaId + " start key", + expectedStartKeys[i], loc.getRegion().getStartKey()); + assertEquals( + "region " + i + " replica id at index " + (i * REGION_REPLICATION + replicaId), + replicaId, loc.getRegion().getReplicaId()); + } + } + } + } + + @Test + public void testGetRegionLocationsPagination() throws IOException { + try (RegionLocator locator = UTIL.getConnection().getRegionLocator(TABLE_NAME)) { + List all = locator.getAllRegionLocations(); + Set expectedRegionNames = new HashSet<>(); + for (HRegionLocation l : all) { + expectedRegionNames.add(l.getRegion().getRegionNameAsString()); + } + + Set seen = new HashSet<>(); + byte[] cursor = null; + int pages = 0; + while (true) { + List page = locator.getRegionLocationsPage(cursor, 4); + if (page.isEmpty()) { + break; + } + pages++; + for (HRegionLocation l : page) { + seen.add(l.getRegion().getRegionNameAsString()); + } + byte[] lastEnd = page.get(page.size() - 1).getRegion().getEndKey(); + if (lastEnd.length == 0) { + break; + } + cursor = lastEnd; + } + assertEquals(expectedRegionNames, seen); + // 10 regions, page size 4 → exactly 3 pages: [reg0..reg3], [reg4..reg7], [reg8..reg9]. + assertEquals(3, pages); + } + } + + @Test + public void testGetRegionLocationsEmptyAfterEnd() throws IOException { + try (RegionLocator locator = UTIL.getConnection().getRegionLocator(TABLE_NAME)) { + // Use a startKey lexicographically after all split keys: SPLIT_KEYS go "1".."9", so "z". + List page = locator.getRegionLocationsPage(Bytes.toBytes("z"), 5); + assertTrue("expected empty page past the last region; got " + page.size() + " entries", + page.isEmpty()); + } + } + + @Test + public void testGetRegionLocationsCursorMatchesAllReplicas() throws IOException { + try (RegionLocator locator = UTIL.getConnection().getRegionLocator(TABLE_NAME)) { + List page = locator.getRegionLocationsPage(HConstants.EMPTY_START_ROW, 2); + assertEquals(2 * REGION_REPLICATION, page.size()); + // Last REGION_REPLICATION entries are all replicas of the last region — same RegionInfo, + // so same end key regardless of which one the caller picks as the cursor. + byte[] expectedCursor = page.get(page.size() - 1).getRegion().getEndKey(); + for (int i = 1; i <= REGION_REPLICATION; i++) { + byte[] cursor = page.get(page.size() - i).getRegion().getEndKey(); + assertArrayEquals("replica " + i + " end key disagrees", expectedCursor, cursor); + } + } + } + + @Test + public void testGetRegionLocationsLimitFallsBackToConfig() throws IOException { + // Default HBASE_META_SCANNER_CACHING is 100, table has 10 regions; limit=0 must fall back + // to the config and return everything in one shot. + try (RegionLocator locator = UTIL.getConnection().getRegionLocator(TABLE_NAME)) { + List page = locator.getRegionLocationsPage(HConstants.EMPTY_START_ROW, 0); + assertEquals(REGION_REPLICATION * (SPLIT_KEYS.length + 1), page.size()); + } + } + + @Test + public void testGetRegionLocationsHoldsUserRegionLock() throws IOException { + Configuration conf = new Configuration(UTIL.getConfiguration()); + conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true); + try ( + ConnectionImplementation conn = + (ConnectionImplementation) ConnectionFactory.createConnection(conf); + RegionLocator locator = conn.getRegionLocator(TABLE_NAME)) { + MetricsConnection metrics = conn.getConnectionMetrics(); + long before = metrics.getUserRegionLockHeldTimer().getCount(); + locator.getRegionLocationsPage(HConstants.EMPTY_START_ROW, 3); + long after = metrics.getUserRegionLockHeldTimer().getCount(); + assertEquals( + "userRegionLock held-timer should have incremented exactly once for the bulk" + " lookup", + before + 1, after); + } + } + + /** + * Directly verify that the new API writes into the same {@code metaCache} that + * {@code ConnectionImplementation.locateRegionInMeta} reads from: after the bulk call, looking up + * each returned region's start key via the package-private cache accessor must return non-null. + */ + @Test + public void testGetRegionLocationsPopulatesMetaCacheDirect() throws IOException { + ConnectionImplementation conn = (ConnectionImplementation) UTIL.getConnection(); + conn.clearRegionCache(TABLE_NAME); + try (RegionLocator locator = conn.getRegionLocator(TABLE_NAME)) { + List page = locator.getRegionLocationsPage(HConstants.EMPTY_START_ROW, 4); + assertEquals(4 * REGION_REPLICATION, page.size()); + for (HRegionLocation loc : page) { + byte[] startKey = loc.getRegion().getStartKey(); + RegionLocations cached = conn.getCachedLocation(TABLE_NAME, startKey); + assertNotNull("metaCache miss for region starting at " + Bytes.toStringBinary(startKey) + + " — bulk API did not populate the same cache locateRegionInMeta uses", cached); + HRegionLocation cachedLoc = cached.getRegionLocation(loc.getRegion().getReplicaId()); + assertNotNull("metaCache had region but missing replica " + loc.getRegion().getReplicaId(), + cachedLoc); + assertEquals("cached server differs from server returned by bulk API", loc.getServerName(), + cachedLoc.getServerName()); + } + } + } + + /** + * Indirect verification of the same property: after the bulk call, + * {@code RegionLocator.getRegionLocation(row, useCache=true)} for a row inside any returned + * region must be served from cache — i.e. it must NOT acquire the user-region lock (which is only + * taken when {@code locateRegionInMeta} actually issues a meta RPC). This is the end-to-end proof + * that the bulk API and the single-region API share the cache. + */ + @Test + public void testGetRegionLocationsAvoidsMetaRpcForCachedRows() throws IOException { + Configuration conf = new Configuration(UTIL.getConfiguration()); + conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true); + try ( + ConnectionImplementation conn = + (ConnectionImplementation) ConnectionFactory.createConnection(conf); + RegionLocator locator = conn.getRegionLocator(TABLE_NAME)) { + conn.clearRegionCache(TABLE_NAME); + MetricsConnection metrics = conn.getConnectionMetrics(); + + List page = locator.getRegionLocationsPage(HConstants.EMPTY_START_ROW, 4); + long afterBulk = metrics.getUserRegionLockHeldTimer().getCount(); + + // For each returned region, look up a row inside it via the single-region API. Each lookup + // must be a cache hit (no user-region lock acquired) because the bulk call already + // populated the shared metaCache. + for (HRegionLocation loc : page) { + // Skip non-default replicas — getRegionLocation(row) without a replicaId resolves only + // the default replica, and the cache check in locateRegionInMeta is per replicaId. + if (loc.getRegion().getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { + continue; + } + byte[] startKey = loc.getRegion().getStartKey(); + // EMPTY_START_ROW belongs to the first region; any byte works. + byte[] probe = startKey.length == 0 ? new byte[] { 0x00 } : startKey; + HRegionLocation viaCache = locator.getRegionLocation(probe, false); + assertEquals("single-region lookup returned a different server than the bulk API for " + + Bytes.toStringBinary(startKey), loc.getServerName(), viaCache.getServerName()); + } + + long afterPointLookups = metrics.getUserRegionLockHeldTimer().getCount(); + assertEquals( + "user-region lock held-timer should not have advanced — every point lookup should have" + + " been a metaCache hit, but " + (afterPointLookups - afterBulk) + + " meta RPCs were issued", + afterBulk, afterPointLookups); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocatorPagedScanRpcCount.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocatorPagedScanRpcCount.java new file mode 100644 index 000000000000..06a3c198af6e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocatorPagedScanRpcCount.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.hbase.CatalogReplicaMode; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Asserts the single-RPC promise of the paginated meta-scan path + * ({@link MetaTableAccessor#scanMetaForTableRegions(Connection, MetaTableAccessor.Visitor, TableName, byte[], int, CatalogReplicaMode)}) + * by wrapping the meta {@link Table} so we can read {@link ScanMetrics#countOfRPCcalls} from the + * issued {@link ResultScanner} - one per ScannerNext server response. + *

+ * Cluster runs with {@code hbase.meta.scanner.caching = 2} so the {@code limit > caching} branch is + * exercised cheaply with a small table (5 user regions). + */ +@Category({ MediumTests.class, ClientTests.class }) +public class TestRegionLocatorPagedScanRpcCount { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRegionLocatorPagedScanRpcCount.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static final TableName TABLE_NAME = TableName.valueOf("LocatorPaged"); + private static final byte[] FAMILY = Bytes.toBytes("family"); + + /** Caching is small enough that {@code limit > META_CACHING} is easy to set up. */ + private static final int META_CACHING = 2; + private static final int NUM_REGIONS = 5; + + @BeforeClass + public static void setUp() throws Exception { + UTIL.getConfiguration().setInt(HConstants.HBASE_META_SCANNER_CACHING, META_CACHING); + UTIL.startMiniCluster(1); + byte[][] splitKeys = new byte[NUM_REGIONS - 1][]; + for (int i = 0; i < splitKeys.length; i++) { + splitKeys[i] = Bytes.toBytes(Integer.toString(i + 1)); + } + TableDescriptor td = TableDescriptorBuilder.newBuilder(TABLE_NAME) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build(); + UTIL.getAdmin().createTable(td, splitKeys); + UTIL.waitTableAvailable(TABLE_NAME); + UTIL.getAdmin().balancerSwitch(false, true); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void testSingleRpcWhenLimitWithinCaching() throws Exception { + // limit (2) <= caching (2): trivially one ScannerNext. Baseline. + long rpcs = runPagedScanAndCountRpcs(2); + assertEquals("expected exactly one ScannerNext RPC for limit <= caching", 1L, rpcs); + } + + @Test + public void testSingleRpcWhenLimitExceedsCaching() throws Exception { + // limit (5) > caching (2): without the isPagedScan fix this would be ceil(5/2) = 3 + // ScannerNext RPCs. With the fix, getMetaScan sizes caching to limit -> 1 RPC. + long rpcs = runPagedScanAndCountRpcs(NUM_REGIONS); + assertEquals("expected exactly one ScannerNext RPC for paged scan even when limit > caching", + 1L, rpcs); + } + + @Test + public void testUnboundedPathStillUsesConfiguredCaching() throws Exception { + // The unbounded scanMetaForTableRegions(connection, visitor, tableName) overload (no rowLimit) + // must continue to use the configured caching (META_CACHING=2). For NUM_REGIONS=5 user + // regions, expect ceil(5 / 2) = 3 ScannerNext batches. Asserts the isPagedScan flag did not + // bleed into the unbounded path. + AtomicLong rpcCount = new AtomicLong(); + Connection wrapped = wrapConnection(UTIL.getConnection(), rpcCount); + CollectingVisitor visitor = new CollectingVisitor(TABLE_NAME); + MetaTableAccessor.scanMetaForTableRegions(wrapped, visitor, TABLE_NAME); + assertEquals(NUM_REGIONS, visitor.locations.size()); + long expected = (long) Math.ceil((double) NUM_REGIONS / META_CACHING); + assertEquals( + "unbounded scan should split across ceil(NUM_REGIONS / caching) ScannerNext batches", + expected, rpcCount.get()); + } + + private long runPagedScanAndCountRpcs(int limit) throws IOException { + AtomicLong rpcCount = new AtomicLong(); + Connection wrapped = wrapConnection(UTIL.getConnection(), rpcCount); + CollectingVisitor visitor = new CollectingVisitor(TABLE_NAME); + MetaTableAccessor.scanMetaForTableRegions(wrapped, visitor, TABLE_NAME, null, limit, + CatalogReplicaMode.NONE); + assertEquals("paged call returned wrong number of regions", limit, visitor.locations.size()); + return rpcCount.get(); + } + + /** Visitor that just collects every region-row's first {@link HRegionLocation}. */ + private static final class CollectingVisitor extends MetaTableAccessor.TableVisitorBase { + final List locations = new ArrayList<>(); + + CollectingVisitor(TableName tableName) { + super(tableName); + } + + @Override + public boolean visitInternal(Result rowResult) throws IOException { + RegionLocations locs = MetaTableAccessor.getRegionLocations(rowResult); + if (locs != null && locs.getRegionLocation() != null) { + locations.add(locs.getRegionLocation()); + } + return true; + } + } + + /** + * Returns a delegating proxy {@link Connection} that intercepts {@code getTable(META_TABLE_NAME)} + * and returns a wrapped {@link Table}; the wrapped Table intercepts {@code getScanner(Scan)} to + * enable scan metrics and wrap the returned {@link ResultScanner}. The wrapped scanner intercepts + * {@code close()} so we can read {@link ScanMetrics#countOfRPCcalls} into {@code rpcCount} before + * the underlying scanner is closed. + */ + private static Connection wrapConnection(Connection delegate, AtomicLong rpcCount) { + InvocationHandler handler = new InvocationHandler() { + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + if ( + "getTable".equals(method.getName()) && args != null && args.length == 1 + && TableName.META_TABLE_NAME.equals(args[0]) + ) { + Table realTable = (Table) method.invoke(delegate, args); + return wrapTable(realTable, rpcCount); + } + return method.invoke(delegate, args); + } + }; + return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), + new Class[] { Connection.class }, handler); + } + + private static Table wrapTable(Table delegate, AtomicLong rpcCount) { + InvocationHandler handler = new InvocationHandler() { + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + if ( + "getScanner".equals(method.getName()) && args != null && args.length == 1 + && args[0] instanceof Scan + ) { + Scan scan = (Scan) args[0]; + scan.setScanMetricsEnabled(true); + ResultScanner realScanner = (ResultScanner) method.invoke(delegate, scan); + return wrapScanner(realScanner, rpcCount); + } + return method.invoke(delegate, args); + } + }; + return (Table) Proxy.newProxyInstance(Table.class.getClassLoader(), + new Class[] { Table.class }, handler); + } + + private static ResultScanner wrapScanner(ResultScanner delegate, AtomicLong rpcCount) { + InvocationHandler handler = new InvocationHandler() { + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + if ("close".equals(method.getName())) { + // Read metrics BEFORE closing - the metrics object is the live counter attached to the + // scanner. After close() the underlying client scanner may detach. + ScanMetrics metrics = delegate.getScanMetrics(); + if (metrics != null) { + rpcCount.set(metrics.countOfRPCcalls.get()); + } + } + return method.invoke(delegate, args); + } + }; + return (ResultScanner) Proxy.newProxyInstance(ResultScanner.class.getClassLoader(), + new Class[] { ResultScanner.class }, handler); + } +}