Skip to content
Open
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
Comment thread
henrib marked this conversation as resolved.
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0

Check warning on line 10 in standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/hive/MetadataLocator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line does not match expected header line of ' * http://www.apache.org/licenses/LICENSE-2.0'.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ2c3HhvLdirLWE7GItv&open=AZ2c3HhvLdirLWE7GItv&pullRequest=6441
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.hive;

import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.client.builder.GetTableProjectionsSpecBuilder;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.ClientPool;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.thrift.TException;

import java.util.Collections;
import java.util.List;

/**
* Fetches the location of a given metadata table.
* <p>Since the location mutates with each transaction, this allows determining if a cached version of the
* table is the latest known in the HMS database.</p>
*/
public class MetadataLocator {
private static final org.slf4j.Logger LOGGER = org.slf4j.LoggerFactory.getLogger(MetadataLocator.class);
static final GetProjectionsSpec PARAM_SPEC = new GetTableProjectionsSpecBuilder()
Comment thread
henrib marked this conversation as resolved.
Outdated
.includeParameters() // only fetches table.parameters
.build();
private final HiveCatalog catalog;

public MetadataLocator(HiveCatalog catalog) {
this.catalog = catalog;
}

public HiveCatalog getCatalog() {
return catalog;
}

/**
* Returns the location of the metadata table identified by the given identifier, or null if the table does not exist or is

Check warning on line 58 in standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/hive/MetadataLocator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line is longer than 120 characters (found 125).

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ3V_dxZmw0RN5VPna7j&open=AZ3V_dxZmw0RN5VPna7j&pullRequest=6441
* not a metadata table.
* <p>This uses the Thrift API to fetch the table parameters, which is more efficient than fetching the entire table object.</p>

Check warning on line 60 in standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/hive/MetadataLocator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line is longer than 120 characters (found 130).

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ2c3HhvLdirLWE7GIty&open=AZ2c3HhvLdirLWE7GIty&pullRequest=6441
* @param identifier the identifier of the metadata table to fetch the location for
* @return the location of the metadata table, or null if the table does not exist or is not a metadata table
Comment thread
henrib marked this conversation as resolved.
Outdated
*/
public String getLocation(TableIdentifier identifier) {
final ClientPool<IMetaStoreClient, TException> clients = catalog.clientPool();

Check failure on line 65 in standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/hive/MetadataLocator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this usage of "clientPool", it is annotated with @VisibleForTesting and should not be accessed from production code.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ2c3HhvLdirLWE7GItu&open=AZ2c3HhvLdirLWE7GItu&pullRequest=6441
final String catName = catalog.name();
final TableIdentifier baseTableIdentifier;
if (!catalog.isValidIdentifier(identifier)) {
if (!isValidMetadataIdentifier(identifier)) {
return null;
} else {
baseTableIdentifier = TableIdentifier.of(identifier.namespace().levels());
}
} else {
baseTableIdentifier = identifier;
}
String database = baseTableIdentifier.namespace().level(0);
String tableName = baseTableIdentifier.name();
try {
List<Table> tables = clients.run(
client -> client.getTables(catName, database, Collections.singletonList(tableName), PARAM_SPEC)
);
return tables == null || tables.isEmpty()
? null
: tables.getFirst().getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
Comment thread
henrib marked this conversation as resolved.
} catch (NoSuchTableException | NoSuchObjectException e) {
LOGGER.info("Table not found {}", baseTableIdentifier, e);
Comment thread
henrib marked this conversation as resolved.
Outdated
} catch (TException e) {
LOGGER.info("Table parameters fetch failed {}", baseTableIdentifier, e);
Comment thread
henrib marked this conversation as resolved.
Outdated
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.error("Interrupted in call to check table existence of {}", baseTableIdentifier, e);
}
return null;
}
Comment thread
henrib marked this conversation as resolved.
Outdated

private boolean isValidMetadataIdentifier(TableIdentifier identifier) {
return MetadataTableType.from(identifier.name()) != null
&& catalog.isValidIdentifier(TableIdentifier.of(identifier.namespace().levels()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,78 +20,276 @@
package org.apache.iceberg.rest;

import com.github.benmanes.caffeine.cache.Ticker;

import java.lang.ref.SoftReference;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

import org.apache.iceberg.BaseMetadataTable;
import org.apache.iceberg.CachingCatalog;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.ViewCatalog;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
Comment thread
henrib marked this conversation as resolved.
Outdated
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.hive.MetadataLocator;
import org.apache.iceberg.view.View;
import org.apache.iceberg.view.ViewBuilder;

import org.jetbrains.annotations.TestOnly;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Class that wraps an Iceberg Catalog to cache tables.
*/
public class HMSCachingCatalog extends CachingCatalog implements SupportsNamespaces, ViewCatalog {
protected static final Logger LOG = LoggerFactory.getLogger(HMSCachingCatalog.class);

private static SoftReference<HMSCachingCatalog> cacheRef = new SoftReference<>(null);
@TestOnly
public static <C extends Catalog> C getLatestCache(Function<HMSCachingCatalog, C> extractor) {
HMSCachingCatalog cache = cacheRef.get();
if (cache == null) {
return null;
}
return extractor == null ? (C) cache : extractor.apply(cache);
}
Comment thread
henrib marked this conversation as resolved.

@TestOnly
public HiveCatalog getCatalog() {
return hiveCatalog;
}

private final HiveCatalog hiveCatalog;

public HMSCachingCatalog(HiveCatalog catalog, long expiration) {
super(catalog, true, expiration, Ticker.systemTicker());
// Metrics counters
private final AtomicLong cacheHitCount = new AtomicLong(0);
private final AtomicLong cacheMissCount = new AtomicLong(0);
private final AtomicLong cacheLoadCount = new AtomicLong(0);
private final AtomicLong cacheInvalidateCount = new AtomicLong(0);
private final AtomicLong cacheMetaLoadCount = new AtomicLong(0);

public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) {
this(catalog, expirationMs, /*caseSensitive*/ true);
}

public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean caseSensitive) {
super(catalog, caseSensitive, expirationMs, Ticker.systemTicker());
this.hiveCatalog = catalog;
if (catalog.getConf().getBoolean("metastore.iceberg.catalog.cache.debug", false)) {
Comment thread
henrib marked this conversation as resolved.
Outdated
cacheRef = new SoftReference<>(this);

Check warning on line 92 in standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this assignment of "cacheRef".

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ3V_d1Imw0RN5VPna7k&open=AZ3V_d1Imw0RN5VPna7k&pullRequest=6441
}
}

/**
* Callback when cache invalidates the entry for a given table identifier.
*
* @param tid the table identifier to invalidate
*/
protected void onCacheInvalidate(TableIdentifier tid) {
cacheInvalidateCount.incrementAndGet();
LOG.debug("Cache invalidate {}: {}", tid, cacheInvalidateCount.get());
}

/**
* Callback when cache loads a table for a given table identifier.
*
* @param tid the table identifier
*/
protected void onCacheLoad(TableIdentifier tid) {
cacheLoadCount.incrementAndGet();
LOG.debug("Cache load {}: {}", tid, cacheLoadCount.get());
Comment thread
henrib marked this conversation as resolved.
Outdated
}

/**
* Callback when cache hit for a given table identifier.
*
* @param tid the table identifier
*/
protected void onCacheHit(TableIdentifier tid) {
cacheHitCount.incrementAndGet();
LOG.debug("Cache hit {} : {}", tid, cacheHitCount.get());
}

/**
* Callback when cache miss occurs for a given table identifier.
*
* @param tid the table identifier
*/
protected void onCacheMiss(TableIdentifier tid) {
cacheMissCount.incrementAndGet();
LOG.debug("Cache miss {}: {}", tid, cacheMissCount.get());
}

/**
* Callback when cache loads a metadata table for a given table identifier.
*
* @param tid the table identifier
*/
protected void onCacheMetaLoad(TableIdentifier tid) {
cacheMetaLoadCount.incrementAndGet();
LOG.debug("Cache meta-load {}: {}", tid, cacheMetaLoadCount.get());
}

// Getter methods for accessing metrics
public long getCacheHitCount() {
return cacheHitCount.get();
}

public long getCacheMissCount() {
return cacheMissCount.get();
}

public long getCacheLoadCount() {
return cacheLoadCount.get();
}

public long getCacheInvalidateCount() {
return cacheInvalidateCount.get();
}

public long getCacheMetaLoadCount() {
return cacheMetaLoadCount.get();
}

public double getCacheHitRate() {
long hits = cacheHitCount.get();
long total = hits + cacheMissCount.get();
return total == 0 ? 0.0 : (double) hits / total;
}

/**
* Generates a map of this cache's performance metrics, including hit count,
* miss count, load count, invalidate count, meta-load count, and hit rate.
* This can be used for monitoring and debugging purposes to understand the effectiveness of the cache.
* @return a map of cache performance metrics
*/
public Map<String, Number> cacheStats() {
return Map.of(
"hit", getCacheHitCount(),
"miss", getCacheMissCount(),
"load", getCacheLoadCount(),
"invalidate", getCacheInvalidateCount(),
"metaload", getCacheMetaLoadCount(),
"hit-rate", getCacheHitRate()
);
}


@Override
public Catalog.TableBuilder buildTable(TableIdentifier identifier, Schema schema) {
return hiveCatalog.buildTable(identifier, schema);
public void createNamespace(Namespace namespace, Map<String, String> map) {
hiveCatalog.createNamespace(namespace, map);
}

@Override
public void createNamespace(Namespace nmspc, Map<String, String> map) {
hiveCatalog.createNamespace(nmspc, map);
public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException {
return hiveCatalog.listNamespaces(namespace);
}

@Override
public List<Namespace> listNamespaces(Namespace nmspc) throws NoSuchNamespaceException {
return hiveCatalog.listNamespaces(nmspc);
public Table loadTable(final TableIdentifier identifier) {
final TableIdentifier canonicalized = identifier.toLowerCase();
Comment thread
henrib marked this conversation as resolved.
Outdated
final Table cachedTable = tableCache.getIfPresent(canonicalized);
if (cachedTable != null) {
final String location = new MetadataLocator(hiveCatalog).getLocation(canonicalized);
Comment thread
henrib marked this conversation as resolved.
Outdated
if (location == null) {
LOG.debug("Table {} has no location, returning cached table without location", canonicalized);
Comment thread
henrib marked this conversation as resolved.
Outdated
} else {
Comment thread
henrib marked this conversation as resolved.
Outdated
String cachedLocation = cachedTable instanceof HasTableOperations tableOps
? tableOps.operations().current().metadataFileLocation()
: null;
if (!location.equals(cachedLocation)) {
LOG.debug("Invalidate table {}, cached {} != actual {}", canonicalized, cachedLocation, location);
// Invalidate the cached table if the location is different
invalidateTable(canonicalized);
onCacheInvalidate(canonicalized);
} else {
onCacheHit(canonicalized);
return cachedTable;
}
}
} else {
onCacheMiss(canonicalized);
}
final Table table = tableCache.get(canonicalized, this::loadTableWithoutCache);
if (table instanceof BaseMetadataTable) {
// Cache underlying table
TableIdentifier originTableIdentifier =
TableIdentifier.of(canonicalized.namespace().levels());
Table originTable = tableCache.get(originTableIdentifier, this::loadTableWithoutCache);
Comment thread
henrib marked this conversation as resolved.
Outdated
// Share TableOperations instance of origin table for all metadata tables, so that metadata
// table instances are refreshed as well when origin table instance is refreshed.
if (originTable instanceof HasTableOperations tableOps) {
TableOperations ops = tableOps.operations();
MetadataTableType type = MetadataTableType.from(canonicalized.name());
Comment thread
henrib marked this conversation as resolved.
Table metadataTable =
MetadataTableUtils.createMetadataTableInstance(
ops, hiveCatalog.name(), originTableIdentifier, canonicalized, type);
tableCache.put(canonicalized, metadataTable);
onCacheMetaLoad(canonicalized);
LOG.debug("Loaded metadata table: {} for origin table: {}", canonicalized, originTableIdentifier);
// Return the metadata table instead of the original table
return metadataTable;
}
}
onCacheLoad(canonicalized);
return table;
}
Comment thread
henrib marked this conversation as resolved.

private Table loadTableWithoutCache(TableIdentifier identifier) {
try {
return hiveCatalog.loadTable(identifier);
} catch (NoSuchTableException exception) {
return null;
Comment thread
henrib marked this conversation as resolved.
Outdated
}
Comment thread
henrib marked this conversation as resolved.
Outdated
}

@Override
public Map<String, String> loadNamespaceMetadata(Namespace nmspc) throws NoSuchNamespaceException {
return hiveCatalog.loadNamespaceMetadata(nmspc);
public Map<String, String> loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException {
return hiveCatalog.loadNamespaceMetadata(namespace);
}

@Override
public boolean dropNamespace(Namespace nmspc) throws NamespaceNotEmptyException {
List<TableIdentifier> tables = listTables(nmspc);
public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException {
List<TableIdentifier> tables = listTables(namespace);
for (TableIdentifier ident : tables) {
invalidateTable(ident);
}
return hiveCatalog.dropNamespace(nmspc);
return hiveCatalog.dropNamespace(namespace);
}

@Override
public boolean setProperties(Namespace nmspc, Map<String, String> map) throws NoSuchNamespaceException {
return hiveCatalog.setProperties(nmspc, map);
public boolean setProperties(Namespace namespace, Map<String, String> map) throws NoSuchNamespaceException {
return hiveCatalog.setProperties(namespace, map);
}

@Override
public boolean removeProperties(Namespace nmspc, Set<String> set) throws NoSuchNamespaceException {
return hiveCatalog.removeProperties(nmspc, set);
public boolean removeProperties(Namespace namespace, Set<String> set) throws NoSuchNamespaceException {
return hiveCatalog.removeProperties(namespace, set);
}

@Override
public boolean namespaceExists(Namespace namespace) {
return hiveCatalog.namespaceExists(namespace);
}

@Override
public Catalog.TableBuilder buildTable(TableIdentifier identifier, Schema schema) {
return hiveCatalog.buildTable(identifier, schema);
}

@Override
public List<TableIdentifier> listViews(Namespace namespace) {
return hiveCatalog.listViews(namespace);
Expand Down
Loading
Loading